aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-04-02 20:23:08 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-02 20:23:16 -0700
commit0c1c0fb90d025e60c5ab74bd80a7c36482070b80 (patch)
treeba2d3d65e947f2f96baf23dd411ff793b10598e1
parent0ef46b2d8de8dfb6da5a2b9ba808bdfe6d16e27d (diff)
downloadspark-0c1c0fb90d025e60c5ab74bd80a7c36482070b80.tar.gz
spark-0c1c0fb90d025e60c5ab74bd80a7c36482070b80.tar.bz2
spark-0c1c0fb90d025e60c5ab74bd80a7c36482070b80.zip
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai <yhuai@databricks.com> Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. (cherry picked from commit 4b82bd730a24f96d94dfea87420cfaa4253a5ccb) Signed-off-by: Michael Armbrust <michael@databricks.com>
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala61
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala112
2 files changed, 167 insertions, 6 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index bbd920a405..76d329a3dd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,7 +116,9 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
override def refreshTable(databaseName: String, tableName: String): Unit = {
- cachedDataSourceTables.refresh(QualifiedTableName(databaseName, tableName).toLowerCase)
+ // refresh table does not eagerly reload the cache. It just invalidate the cache.
+ // Next time when we use the table, it will be populated in the cache.
+ invalidateTable(databaseName, tableName)
}
def invalidateTable(databaseName: String, tableName: String): Unit = {
@@ -229,13 +231,42 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
private def convertToParquetRelation(metastoreRelation: MetastoreRelation): LogicalRelation = {
val metastoreSchema = StructType.fromAttributes(metastoreRelation.output)
val mergeSchema = hive.convertMetastoreParquetWithSchemaMerging
- val parquetOptions = Map(
- ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
- ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
// NOTE: Instead of passing Metastore schema directly to `ParquetRelation2`, we have to
// serialize the Metastore schema to JSON and pass it as a data source option because of the
// evil case insensitivity issue, which is reconciled within `ParquetRelation2`.
+ val parquetOptions = Map(
+ ParquetRelation2.METASTORE_SCHEMA -> metastoreSchema.json,
+ ParquetRelation2.MERGE_SCHEMA -> mergeSchema.toString)
+ val tableIdentifier =
+ QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
+
+ def getCached(
+ tableIdentifier: QualifiedTableName,
+ pathsInMetastore: Seq[String],
+ schemaInMetastore: StructType,
+ partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+ cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+ case null => None // Cache miss
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+ // If we have the same paths, same schema, and same partition spec,
+ // we will use the cached Parquet Relation.
+ val useCached =
+ parquetRelation.paths == pathsInMetastore &&
+ logical.schema.sameType(metastoreSchema) &&
+ parquetRelation.maybePartitionSpec == partitionSpecInMetastore
+
+ if (useCached) Some(logical) else None
+ case other =>
+ logWarning(
+ s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
+ s"as Parquet. However, we are getting a ${other} from the metastore cache. " +
+ s"This cached entry will be invalidated.")
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
+ }
+ }
+
if (metastoreRelation.hiveQlTable.isPartitioned) {
val partitionSchema = StructType.fromAttributes(metastoreRelation.partitionKeys)
val partitionColumnDataTypes = partitionSchema.map(_.dataType)
@@ -248,10 +279,28 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
val partitionSpec = PartitionSpec(partitionSchema, partitions)
val paths = partitions.map(_.path)
- LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+
+ val cached = getCached(tableIdentifier, paths, metastoreSchema, Some(partitionSpec))
+ val parquetRelation = cached.getOrElse {
+ val created =
+ LogicalRelation(ParquetRelation2(paths, parquetOptions, None, Some(partitionSpec))(hive))
+ cachedDataSourceTables.put(tableIdentifier, created)
+ created
+ }
+
+ parquetRelation
} else {
val paths = Seq(metastoreRelation.hiveQlTable.getDataLocation.toString)
- LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+
+ val cached = getCached(tableIdentifier, paths, metastoreSchema, None)
+ val parquetRelation = cached.getOrElse {
+ val created =
+ LogicalRelation(ParquetRelation2(paths, parquetOptions)(hive))
+ cachedDataSourceTables.put(tableIdentifier, created)
+ created
+ }
+
+ parquetRelation
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index c8eabc9e1c..e1e84618bd 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -26,8 +26,10 @@ import org.apache.spark.sql.{QueryTest, SQLConf, SaveMode}
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
import org.apache.spark.sql.hive.test.TestHive.implicits._
+import org.apache.spark.sql.json.JSONRelation
import org.apache.spark.sql.sources.{InsertIntoDataSource, LogicalRelation}
import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
import org.apache.spark.sql.SaveMode
@@ -389,6 +391,116 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
sql("DROP TABLE ms_convert")
}
+
+ test("Caching converted data source Parquet Relations") {
+ def checkCached(tableIdentifer: catalog.QualifiedTableName): Unit = {
+ // Converted test_parquet should be cached.
+ catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) match {
+ case null => fail("Converted test_parquet should be cached in the cache.")
+ case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => // OK
+ case other =>
+ fail(
+ "The cached test_parquet should be a Parquet Relation. " +
+ s"However, $other is returned form the cache.")
+ }
+ }
+
+ sql("DROP TABLE IF EXISTS test_insert_parquet")
+ sql("DROP TABLE IF EXISTS test_parquet_partitioned_cache_test")
+
+ sql(
+ """
+ |create table test_insert_parquet
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ var tableIdentifer = catalog.QualifiedTableName("default", "test_insert_parquet")
+
+ // First, make sure the converted test_parquet is not cached.
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ // Table lookup will make the table cached.
+ table("test_insert_parquet")
+ checkCached(tableIdentifer)
+ // For insert into non-partitioned table, we will do the conversion,
+ // so the converted test_insert_parquet should be cached.
+ invalidateTable("test_insert_parquet")
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ sql(
+ """
+ |INSERT INTO TABLE test_insert_parquet
+ |select a, b from jt
+ """.stripMargin)
+ checkCached(tableIdentifer)
+ // Make sure we can read the data.
+ checkAnswer(
+ sql("select * from test_insert_parquet"),
+ sql("select a, b from jt").collect())
+ // Invalidate the cache.
+ invalidateTable("test_insert_parquet")
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+ // Create a partitioned table.
+ sql(
+ """
+ |create table test_parquet_partitioned_cache_test
+ |(
+ | intField INT,
+ | stringField STRING
+ |)
+ |PARTITIONED BY (date string)
+ |ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
+ |STORED AS
+ | INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
+ | OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
+ """.stripMargin)
+
+ tableIdentifer = catalog.QualifiedTableName("default", "test_parquet_partitioned_cache_test")
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ sql(
+ """
+ |INSERT INTO TABLE test_parquet_partitioned_cache_test
+ |PARTITION (date='2015-04-01')
+ |select a, b from jt
+ """.stripMargin)
+ // Right now, insert into a partitioned Parquet is not supported in data source Parquet.
+ // So, we expect it is not cached.
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
+ sql(
+ """
+ |INSERT INTO TABLE test_parquet_partitioned_cache_test
+ |PARTITION (date='2015-04-02')
+ |select a, b from jt
+ """.stripMargin)
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+ conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
+
+ // Make sure we can cache the partitioned table.
+ table("test_parquet_partitioned_cache_test")
+ checkCached(tableIdentifer)
+ // Make sure we can read the data.
+ checkAnswer(
+ sql("select STRINGField, date, intField from test_parquet_partitioned_cache_test"),
+ sql(
+ """
+ |select b, '2015-04-01', a FROM jt
+ |UNION ALL
+ |select b, '2015-04-02', a FROM jt
+ """.stripMargin).collect())
+
+ invalidateTable("test_parquet_partitioned_cache_test")
+ assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
+
+ sql("DROP TABLE test_insert_parquet")
+ sql("DROP TABLE test_parquet_partitioned_cache_test")
+ }
}
class ParquetDataSourceOffMetastoreSuite extends ParquetMetastoreSuiteBase {