aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-10-25 08:42:21 +0800
committerWenchen Fan <wenchen@databricks.com>2016-10-25 08:42:21 +0800
commit84a33999082af88ea6365cdb5c7232ed0933b1c6 (patch)
treea250c838432818f6ca3ec90b1a8330cbd8c0bc93 /sql/hive
parent407c3cedf29a4413339dcde758295dc3225a0054 (diff)
downloadspark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.gz
spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.tar.bz2
spark-84a33999082af88ea6365cdb5c7232ed0933b1c6.zip
[SPARK-18028][SQL] simplify TableFileCatalog
## What changes were proposed in this pull request? Simplify/cleanup TableFileCatalog: 1. pass a `CatalogTable` instead of `databaseName` and `tableName` into `TableFileCatalog`, so that we don't need to fetch table metadata from metastore again 2. In `TableFileCatalog.filterPartitions0`, DO NOT set `PartitioningAwareFileCatalog.BASE_PATH_PARAM`. According to the [classdoc](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningAwareFileCatalog.scala#L189-L209), the default value of `basePath` already satisfies our need. What's more, if we set this parameter, we may break the case 2 which is metioned in the classdoc. 3. add `equals` and `hashCode` to `TableFileCatalog` 4. add `SessionCatalog.listPartitionsByFilter` which handles case sensitivity. ## How was this patch tested? existing tests. Author: Wenchen Fan <wenchen@databricks.com> Closes #15568 from cloud-fan/table-file-catalog.
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala41
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala7
3 files changed, 42 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 44089335e1..6c1585d5f5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
Some(partitionSchema))
val logicalRelation = cached.getOrElse {
- val db = metastoreRelation.databaseName
- val table = metastoreRelation.tableName
val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong
val fileCatalog = {
val catalog = new TableFileCatalog(
- sparkSession, db, table, Some(partitionSchema), sizeInBytes)
+ sparkSession, metastoreRelation.catalogTable, sizeInBytes)
if (lazyPruningEnabled) {
catalog
} else {
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 7d4ef6f26a..ecdf4f14b3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -19,12 +19,15 @@ package org.apache.spark.sql.hive
import java.io.File
-import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode}
+import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode}
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog}
+import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.StructType
import org.apache.spark.storage.RDDBlockId
import org.apache.spark.util.Utils
@@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
sql("DROP TABLE cachedTable")
}
+
+ test("cache a table using TableFileCatalog") {
+ withTable("test") {
+ sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet")
+ val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
+ val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
+
+ val dataSchema = StructType(tableMeta.schema.filterNot { f =>
+ tableMeta.partitionColumnNames.contains(f.name)
+ })
+ val relation = HadoopFsRelation(
+ location = tableFileCatalog,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+
+ val plan = LogicalRelation(relation, catalogTable = Some(tableMeta))
+ spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan))
+
+ assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined)
+
+ val sameCatalog = new TableFileCatalog(spark, tableMeta, 0)
+ val sameRelation = HadoopFsRelation(
+ location = sameCatalog,
+ partitionSchema = tableMeta.partitionSchema,
+ dataSchema = dataSchema,
+ bucketSpec = None,
+ fileFormat = new ParquetFileFormat(),
+ options = Map.empty)(sparkSession = spark)
+ val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta))
+
+ assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined)
+ }
+ }
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
index 346ea0ca43..59639aacf3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala
@@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te
|LOCATION '${dir.getAbsolutePath}'""".stripMargin)
val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test")
- val tableFileCatalog = new TableFileCatalog(
- spark,
- tableMeta.database,
- tableMeta.identifier.table,
- Some(tableMeta.partitionSchema),
- 0)
+ val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0)
val dataSchema = StructType(tableMeta.schema.filterNot { f =>
tableMeta.partitionColumnNames.contains(f.name)