aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-10-25 08:42:21 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-25 08:42:21 +0800
commit84a33999082af88ea6365cdb5c7232ed0933b1c6 (patch)
treea250c838432818f6ca3ec90b1a8330cbd8c0bc93 /sql
parent407c3cedf29a4413339dcde758295dc3225a0054 (diff)
downloadspark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.gz
spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.bz2
spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.zip
[SPARK-18028][SQL] simplify TableFileCatalog
## What changes were proposed in this pull request? Simplify/cleanup TableFileCatalog: 1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again 2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc. 3. add `equals` and `hashCode` to `TableFileCatalog` 4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15568 from cloud-fan/table-file-catalog.
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala14
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala54
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala41
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala7
5 files changed, 84 insertions, 36 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9711131d88..3d6eec81c0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -756,6 +756,20 @@ class SessionCatalog(
}
/**
+ * List the metadata of partitions that belong to the specified table, assuming it exists, that
+ * satisfy the given partition-pruning predicate expressions.
+ */
+ def listPartitionsByFilter(
+ tableName: TableIdentifier,
+ predicates: Seq[Expression]): Seq[CatalogTablePartition] = {
+ val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase))
+ val table = formatTableName(tableName.table)
+ requireDbExists(db)
+ requireTableExists(TableIdentifier(table, Option(db)))
+ externalCatalog.listPartitionsByFilter(db, table, predicates)
+ }
+
+ /**
* Verify if the input partition spec exactly matches the existing defined partition spec
* The columns must be the same but the orders could be different.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
index 31a01bc6db..667379b222 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala
@@ -20,36 +20,30 @@ package org.apache.spark.sql.execution.datasources
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.types.StructType
/**
* A [[FileCatalog]] for a metastore catalog table.
*
* @param sparkSession a [[SparkSession]]
- * @param db the table's database name
- * @param table the table's (unqualified) name
- * @param partitionSchema the schema of a partitioned table's partition columns
+ * @param table the metadata of the table
* @param sizeInBytes the table's data size in bytes
- * @param fileStatusCache optional cache implementation to use for file listing
*/
class TableFileCatalog(
sparkSession: SparkSession,
- db: String,
- table: String,
- partitionSchema: Option[StructType],
+ val table: CatalogTable,
override val sizeInBytes: Long) extends FileCatalog {
protected val hadoopConf = sparkSession.sessionState.newHadoopConf
private val fileStatusCache = FileStatusCache.newCache(sparkSession)
- private val externalCatalog = sparkSession.sharedState.externalCatalog
+ assert(table.identifier.database.isDefined,
+ "The table identifier must be qualified in TableFileCatalog")
- private val catalogTable = externalCatalog.getTable(db, table)
-
- private val baseLocation = catalogTable.storage.locationUri
+ private val baseLocation = table.storage.locationUri
override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq
@@ -66,24 +60,32 @@ class TableFileCatalog(
* @param filters partition-pruning filters
*/
def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = {
- val parameters = baseLocation
- .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc))
- .getOrElse(Map.empty)
- partitionSchema match {
- case Some(schema) =>
- val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters)
- val partitions = selectedPartitions.map { p =>
- PartitionPath(p.toRow(schema), p.storage.locationUri.get)
- }
- val partitionSpec = PartitionSpec(schema, partitions)
- new PrunedTableFileCatalog(
- sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
- case None =>
- new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache)
+ if (table.partitionColumnNames.nonEmpty) {
+ val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
+ table.identifier, filters)
+ val partitionSchema = table.partitionSchema
+ val partitions = selectedPartitions.map { p =>
+ PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+ }
+ val partitionSpec = PartitionSpec(partitionSchema, partitions)
+ new PrunedTableFileCatalog(
+ sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec)
+ } else {
+ new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None)
}
}
override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles
+
+ // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member
+ // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to
+ // implement `equals` and `hashCode` here, to make it work with cache lookup.
+ override def equals(o: Any): Boolean = o match {
+ case other: TableFileCatalog => this.table.identifier == other.table.identifier
+ case _ => false
+ }
+
+ override def hashCode(): Int = table.identifier.hashCode()
}
/**
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 44089335e1..6c1585d5f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSchema))
val logicalRelation = cached.getOrElse {
- val db = metastoreRelation.databaseName
- val table = metastoreRelation.tableName
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
val catalog = new TableFileCatalog(
- sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+ sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
} else {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 7d4ef6f26a..ecdf4f14b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.hive
import java.io.File
-import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
@@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("DROP TABLE cachedTable")
}
+
+ test("cache a table using TableFileCatalog") {
+ withTable("test") {
+ sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
+ val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+ val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+
+ val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+ tableMeta.partitionColumnNames.contains(f.name)
+ })
+ val relation = HadoopFsRelation(
+ location = tableFileCatalog,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+
+ val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
+
+ assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
+
+ val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val sameRelation = HadoopFsRelation(
+ location = sameCatalog,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+ val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+
+ assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 346ea0ca43..59639aacf3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(
- spark,
- tableMeta.database,
- tableMeta.identifier.table,
- Some(tableMeta.partitionSchema),
- 0)
+ val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)