From c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 3 Apr 2015 14:40:36 +0800 Subject: [SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai This patch had conflicts when merged, resolved by Committer: Cheng Lian Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: b0e1a42 [Yin Huai] Address comments. 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. --- .../spark/sql/hive/HiveMetastoreCatalog.scala | 28 +++++++++++++++------- .../apache/spark/sql/hive/execution/commands.scala | 5 ++-- .../org/apache/spark/sql/hive/parquetSuites.scala | 2 -- 3 files changed, 23 insertions(+), 12 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 76d329a3dd..c4da34ae64 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with } override def refreshTable(databaseName: String, tableName: String): Unit = { - // refresh table does not eagerly reload the cache. It just invalidate the cache. + // refreshTable does not eagerly reload the cache. It just invalidate the cache. // Next time when we use the table, it will be populated in the cache. + // Since we also cache ParquetRealtions converted from Hive Parquet tables and + // adding converted ParquetRealtions into the cache is not defined in the load function + // of the cache (instead, we add the cache entry in convertToParquetRelation), + // it is better at here to invalidate the cache to avoid confusing waring logs from the + // cache loader (e.g. cannot find data source provider, which is only defined for + // data source table.). invalidateTable(databaseName, tableName) } @@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName) def getCached( - tableIdentifier: QualifiedTableName, - pathsInMetastore: Seq[String], - schemaInMetastore: StructType, - partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { + tableIdentifier: QualifiedTableName, + pathsInMetastore: Seq[String], + schemaInMetastore: StructType, + partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = { cachedDataSourceTables.getIfPresent(tableIdentifier) match { case null => None // Cache miss - case logical @ LogicalRelation(parquetRelation: ParquetRelation2) => + case logical@LogicalRelation(parquetRelation: ParquetRelation2) => // If we have the same paths, same schema, and same partition spec, // we will use the cached Parquet Relation. val useCached = - parquetRelation.paths == pathsInMetastore && + parquetRelation.paths.toSet == pathsInMetastore.toSet && logical.schema.sameType(metastoreSchema) && parquetRelation.maybePartitionSpec == partitionSpecInMetastore - if (useCached) Some(logical) else None + if (useCached) { + Some(logical) + } else { + // If the cached relation is not updated, we invalidate it right away. + cachedDataSourceTables.invalidate(tableIdentifier) + None + } case other => logWarning( s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " + diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 4345ffbf30..99dc58646d 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -58,12 +58,13 @@ case class DropTable( try { hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName)) } catch { - // This table's metadata is not in + // This table's metadata is not in Hive metastore (e.g. the table does not exist). case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException => + case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException => // Other Throwables can be caused by users providing wrong parameters in OPTIONS // (e.g. invalid paths). We catch it and log a warning message. // Users should be able to drop such kinds of tables regardless if there is an error. - case e: Throwable => log.warn(s"${e.getMessage}") + case e: Throwable => log.warn(s"${e.getMessage}", e) } hiveContext.invalidateTable(tableName) hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala index 2ad6e86726..1319c81dfc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala @@ -473,7 +473,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { // Right now, insert into a partitioned Parquet is not supported in data source Parquet. // So, we expect it is not cached. assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false") sql( """ |INSERT INTO TABLE test_parquet_partitioned_cache_test @@ -481,7 +480,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase { |select a, b from jt """.stripMargin) assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null) - conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true") // Make sure we can cache the partitioned table. table("test_parquet_partitioned_cache_test") -- cgit v1.2.3