aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-04-03 14:40:36 +0800
committerCheng Lian <lian@databricks.com>2015-04-03 14:40:53 +0800
commit0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912 (patch)
tree54ee7030ca92e47cbb3fb0e9e8b9e2dadb569f2c
parentac705aa83632622aecd641f7c8619cd78a3cad74 (diff)
downloadspark-0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912.tar.gz
spark-0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912.tar.bz2
spark-0c1b78b72bfb5ef806a6c5258a4e5b021b8d1912.zip
[SPARK-6575][SQL] Converted Parquet Metastore tables no longer cache metadata
https://issues.apache.org/jira/browse/SPARK-6575 Author: Yin Huai <yhuai@databricks.com> This patch had conflicts when merged, resolved by Committer: Cheng Lian <lian@databricks.com> Closes #5339 from yhuai/parquetRelationCache and squashes the following commits: b0e1a42 [Yin Huai] Address comments. 83d9846 [Yin Huai] Remove unnecessary change. c0dc7a4 [Yin Huai] Cache converted parquet relations. (cherry picked from commit c42c3fc7f7b79a1f6ce990d39b5d9d14ab19fcf0) Signed-off-by: Cheng Lian <lian@databricks.com>
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala28
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
3 files changed, 23 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 76d329a3dd..c4da34ae64 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
override def refreshTable(databaseName: String, tableName: String): Unit = {
- // refresh table does not eagerly reload the cache. It just invalidate the cache.
+ // refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
+ // Since we also cache ParquetRealtions converted from Hive Parquet tables and
+ // adding converted ParquetRealtions into the cache is not defined in the load function
+ // of the cache (instead, we add the cache entry in convertToParquetRelation),
+ // it is better at here to invalidate the cache to avoid confusing waring logs from the
+ // cache loader (e.g. cannot find data source provider, which is only defined for
+ // data source table.).
invalidateTable(databaseName, tableName)
}
@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
def getCached(
- tableIdentifier: QualifiedTableName,
- pathsInMetastore: Seq[String],
- schemaInMetastore: StructType,
- partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+ tableIdentifier: QualifiedTableName,
+ pathsInMetastore: Seq[String],
+ schemaInMetastore: StructType,
+ partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
- parquetRelation.paths == pathsInMetastore &&
+ parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
- if (useCached) Some(logical) else None
+ if (useCached) {
+ Some(logical)
+ } else {
+ // If the cached relation is not updated, we invalidate it right away.
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
+ }
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4345ffbf30..99dc58646d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -58,12 +58,13 @@ case class DropTable(
try {
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
- // This table's metadata is not in
+ // This table's metadata is not in Hive metastore (e.g. the table does not exist).
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+ case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
// (e.g. invalid paths). We catch it and log a warning message.
// Users should be able to drop such kinds of tables regardless if there is an error.
- case e: Throwable => log.warn(s"${e.getMessage}")
+ case e: Throwable => log.warn(s"${e.getMessage}", e)
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index e1e84618bd..dccac4d128 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -472,7 +472,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -480,7 +479,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")