diff options
Diffstat (limited to 'sql/hive')
3 files changed, 42 insertions, 10 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 44089335e1..6c1585d5f5 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -226,12 +226,10 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log Some(partitionSchema)) val logicalRelation = cached.getOrElse { - val db = metastoreRelation.databaseName - val table = metastoreRelation.tableName val sizeInBytes = metastoreRelation.statistics.sizeInBytes.toLong val fileCatalog = { val catalog = new TableFileCatalog( - sparkSession, db, table, Some(partitionSchema), sizeInBytes) + sparkSession, metastoreRelation.catalogTable, sizeInBytes) if (lazyPruningEnabled) { catalog } else { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 7d4ef6f26a..ecdf4f14b3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -19,12 +19,15 @@ package org.apache.spark.sql.hive import java.io.File -import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} +import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, SaveMode} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation, TableFileCatalog} +import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.test.SQLTestUtils +import org.apache.spark.sql.types.StructType import org.apache.spark.storage.RDDBlockId import org.apache.spark.util.Utils @@ -317,4 +320,40 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto sql("DROP TABLE cachedTable") } + + test("cache a table using TableFileCatalog") { + withTable("test") { + sql("CREATE TABLE test(i int) PARTITIONED BY (p int) STORED AS parquet") + val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") + val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) + + val dataSchema = StructType(tableMeta.schema.filterNot { f => + tableMeta.partitionColumnNames.contains(f.name) + }) + val relation = HadoopFsRelation( + location = tableFileCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + + val plan = LogicalRelation(relation, catalogTable = Some(tableMeta)) + spark.sharedState.cacheManager.cacheQuery(Dataset.ofRows(spark, plan)) + + assert(spark.sharedState.cacheManager.lookupCachedData(plan).isDefined) + + val sameCatalog = new TableFileCatalog(spark, tableMeta, 0) + val sameRelation = HadoopFsRelation( + location = sameCatalog, + partitionSchema = tableMeta.partitionSchema, + dataSchema = dataSchema, + bucketSpec = None, + fileFormat = new ParquetFileFormat(), + options = Map.empty)(sparkSession = spark) + val samePlan = LogicalRelation(sameRelation, catalogTable = Some(tableMeta)) + + assert(spark.sharedState.cacheManager.lookupCachedData(samePlan).isDefined) + } + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala index 346ea0ca43..59639aacf3 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/PruneFileSourcePartitionsSuite.scala @@ -45,12 +45,7 @@ class PruneFileSourcePartitionsSuite extends QueryTest with SQLTestUtils with Te |LOCATION '${dir.getAbsolutePath}'""".stripMargin) val tableMeta = spark.sharedState.externalCatalog.getTable("default", "test") - val tableFileCatalog = new TableFileCatalog( - spark, - tableMeta.database, - tableMeta.identifier.table, - Some(tableMeta.partitionSchema), - 0) + val tableFileCatalog = new TableFileCatalog(spark, tableMeta, 0) val dataSchema = StructType(tableMeta.schema.filterNot { f => tableMeta.partitionColumnNames.contains(f.name) |