diff options
author | Wenchen Fan <wenchen@databricks.com> | 2016-10-25 08:42:21 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2016-10-25 08:42:21 +0800 |
commit | 84a33999082af88ea6365cdb5c7232ed0933b1c6 (patch) | |
tree | a250c838432818f6ca3ec90b1a8330cbd8c0bc93 /sql | |
parent | 407c3cedf29a4413339dcde758295dc3225a0054 (diff) | |
download | spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.gz spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.bz2 spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.zip |
[SPARK-18028][SQL] simplify TableFileCatalog
## What changes were proposed in this pull request?
Simplify/cleanup TableFileCatalog:
1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again
2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc.
3. add `equals` and `hashCode` to `TableFileCatalog`
4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity.
## How was this patch tested?
existing tests.
Author: Wenchen Fan <wenchen@databricks.com>
Closes #15568 from cloud-fan/table-file-catalog.
Diffstat (limited to 'sql')
5 files changed, 84 insertions, 36 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 9711131d88..3d6eec81c0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -756,6 +756,20 @@ class SessionCatalog( } /** + * List the metadata of partitions that belong to the specified table, assuming it exists, that + * satisfy the given partition-pruning predicate expressions. + */ + def listPartitionsByFilter( + tableName: TableIdentifier, + predicates: Seq[Expression]): Seq[CatalogTablePartition] = { + val db = formatDatabaseName(tableName.database.getOrElse(getCurrentDatabase)) + val table = formatTableName(tableName.table) + requireDbExists(db) + requireTableExists(TableIdentifier(table, Option(db))) + externalCatalog.listPartitionsByFilter(db, table, predicates) + } + + /** * Verify if the input partition spec exactly matches the existing defined partition spec * The columns must be the same but the orders could be different. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala index 31a01bc6db..667379b222 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/TableFileCatalog.scala @@ -20,36 +20,30 @@ package org.apache.spark.sql.execution.datasources import org.apache.hadoop.fs.Path import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.types.StructType /** * A [[FileCatalog]] for a metastore catalog table. * * @param sparkSession a [[SparkSession]] - * @param db the table's database name - * @param table the table's (unqualified) name - * @param partitionSchema the schema of a partitioned table's partition columns + * @param table the metadata of the table * @param sizeInBytes the table's data size in bytes - * @param fileStatusCache optional cache implementation to use for file listing */ class TableFileCatalog( sparkSession: SparkSession, - db: String, - table: String, - partitionSchema: Option[StructType], + val table: CatalogTable, override val sizeInBytes: Long) extends FileCatalog { protected val hadoopConf = sparkSession.sessionState.newHadoopConf private val fileStatusCache = FileStatusCache.newCache(sparkSession) - private val externalCatalog = sparkSession.sharedState.externalCatalog + assert(table.identifier.database.isDefined, + "The table identifier must be qualified in TableFileCatalog") - private val catalogTable = externalCatalog.getTable(db, table) - - private val baseLocation = catalogTable.storage.locationUri + private val baseLocation = table.storage.locationUri override def rootPaths: Seq[Path] = baseLocation.map(new Path(_)).toSeq @@ -66,24 +60,32 @@ class TableFileCatalog( * @param filters partition-pruning filters */ def filterPartitions(filters: Seq[Expression]): ListingFileCatalog = { - val parameters = baseLocation - .map(loc => Map(PartitioningAwareFileCatalog.BASE_PATH_PARAM -> loc)) - .getOrElse(Map.empty) - partitionSchema match { - case Some(schema) => - val selectedPartitions = externalCatalog.listPartitionsByFilter(db, table, filters) - val partitions = selectedPartitions.map { p => - PartitionPath(p.toRow(schema), p.storage.locationUri.get) - } - val partitionSpec = PartitionSpec(schema, partitions) - new PrunedTableFileCatalog( - sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) - case None => - new ListingFileCatalog(sparkSession, rootPaths, parameters, None, fileStatusCache) + if (table.partitionColumnNames.nonEmpty) { + val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter( + table.identifier, filters) + val partitionSchema = table.partitionSchema + val partitions = selectedPartitions.map { p => + PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get) + } + val partitionSpec = PartitionSpec(partitionSchema, partitions) + new PrunedTableFileCatalog( + sparkSession, new Path(baseLocation.get), fileStatusCache, partitionSpec) + } else { + new ListingFileCatalog(sparkSession, rootPaths, table.storage.properties, None) } } override def inputFiles: Array[String] = filterPartitions(Nil).inputFiles + + // `TableFileCatalog` may be a member of `HadoopFsRelation`, `HadoopFsRelation` may be a member + // of `LogicalRelation`, and `LogicalRelation` may be used as the cache key. So we need to + // implement `equals` and `hashCode` here, to make it work with cache lookup. + override def equals(o: Any): Boolean = o match { + case other: TableFileCatalog => this.table.identifier == other.table.identifier + case _ => false + } + + override def hashCode(): Int = table.identifier.hashCode() } /** diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 44089335e1..6c1585d5f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(partitionSchema)) val logicalRelation = cached.getOrElse { - val db = metastoreRelation.databaseName - val table = metastoreRelation.tableName val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong val fileCatalog = { val catalog = new TableFileCatalog( - sparkSession, db, table, Some(partitionSchema), sizeInBytes) + sparkSession, metastoreRelation.catalogTable, sizeInBytes) if (lazyPruningEnabled) { catalog } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 7d4ef6f26a..ecdf4f14b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -19,12 +19,15 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils @@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("DROP TABLE cachedTable") } + + test("cache a table using TableFileCatalog") { + withTable("test") { + sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet") + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = tableFileCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val plan = LogicalRelation(relation, catalogTable = Some(tableMeta)) + spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan)) + + assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) + + val sameCatalog = new TableFileCatalog(spark, tableMeta, 0) + val sameRelation = HadoopFsRelation( + location = sameCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta)) + + assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 346ea0ca43..59639aacf3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te |LOCATION '${dir.getAbsolutePath}'""".stripMargin) val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val tableFileCatalog = new TableFileCatalog( - spark, - tableMeta.database, - tableMeta.identifier.table, - Some(tableMeta.partitionSchema), - 0) + val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name) |