aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala28
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala2
3 files changed, 23 insertions, 12 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 76d329a3dd..c4da34ae64 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -116,8 +116,14 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
}
override def refreshTable(databaseName: String, tableName: String): Unit = {
- // refresh table does not eagerly reload the cache. It just invalidate the cache.
+ // refreshTable does not eagerly reload the cache. It just invalidate the cache.
// Next time when we use the table, it will be populated in the cache.
+ // Since we also cache ParquetRealtions converted from Hive Parquet tables and
+ // adding converted ParquetRealtions into the cache is not defined in the load function
+ // of the cache (instead, we add the cache entry in convertToParquetRelation),
+ // it is better at here to invalidate the cache to avoid confusing waring logs from the
+ // cache loader (e.g. cannot find data source provider, which is only defined for
+ // data source table.).
invalidateTable(databaseName, tableName)
}
@@ -242,21 +248,27 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
QualifiedTableName(metastoreRelation.databaseName, metastoreRelation.tableName)
def getCached(
- tableIdentifier: QualifiedTableName,
- pathsInMetastore: Seq[String],
- schemaInMetastore: StructType,
- partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
+ tableIdentifier: QualifiedTableName,
+ pathsInMetastore: Seq[String],
+ schemaInMetastore: StructType,
+ partitionSpecInMetastore: Option[PartitionSpec]): Option[LogicalRelation] = {
cachedDataSourceTables.getIfPresent(tableIdentifier) match {
case null => None // Cache miss
- case logical @ LogicalRelation(parquetRelation: ParquetRelation2) =>
+ case logical@LogicalRelation(parquetRelation: ParquetRelation2) =>
// If we have the same paths, same schema, and same partition spec,
// we will use the cached Parquet Relation.
val useCached =
- parquetRelation.paths == pathsInMetastore &&
+ parquetRelation.paths.toSet == pathsInMetastore.toSet &&
logical.schema.sameType(metastoreSchema) &&
parquetRelation.maybePartitionSpec == partitionSpecInMetastore
- if (useCached) Some(logical) else None
+ if (useCached) {
+ Some(logical)
+ } else {
+ // If the cached relation is not updated, we invalidate it right away.
+ cachedDataSourceTables.invalidate(tableIdentifier)
+ None
+ }
case other =>
logWarning(
s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} shold be stored " +
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4345ffbf30..99dc58646d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -58,12 +58,13 @@ case class DropTable(
try {
hiveContext.cacheManager.tryUncacheQuery(hiveContext.table(tableName))
} catch {
- // This table's metadata is not in
+ // This table's metadata is not in Hive metastore (e.g. the table does not exist).
case _: org.apache.hadoop.hive.ql.metadata.InvalidTableException =>
+ case _: org.apache.spark.sql.catalyst.analysis.NoSuchTableException =>
// Other Throwables can be caused by users providing wrong parameters in OPTIONS
// (e.g. invalid paths). We catch it and log a warning message.
// Users should be able to drop such kinds of tables regardless if there is an error.
- case e: Throwable => log.warn(s"${e.getMessage}")
+ case e: Throwable => log.warn(s"${e.getMessage}", e)
}
hiveContext.invalidateTable(tableName)
hiveContext.runSqlHive(s"DROP TABLE $ifExistsClause$tableName")
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 2ad6e86726..1319c81dfc 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -473,7 +473,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
// Right now, insert into a partitioned Parquet is not supported in data source Parquet.
// So, we expect it is not cached.
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "false")
sql(
"""
|INSERT INTO TABLE test_parquet_partitioned_cache_test
@@ -481,7 +480,6 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
|select a, b from jt
""".stripMargin)
assert(catalog.cachedDataSourceTables.getIfPresent(tableIdentifer) === null)
- conf.setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
// Make sure we can cache the partitioned table.
table("test_parquet_partitioned_cache_test")