aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorgatorsmile <gatorsmile@gmail.com>2016-05-20 14:38:25 +0800
committerWenchen Fan <wenchen@databricks.com>2016-05-20 14:38:25 +0800
commit39fd469078271aa12f3163606000e06e382d35dc (patch)
treeb6e107016534279ea96f5fa52b6122ec051273c6 /sql
parentc94b34ebbf4c6ce353c899c571beb34e8db98917 (diff)
downloadspark-39fd469078271aa12f3163606000e06e382d35dc.tar.gz
spark-39fd469078271aa12f3163606000e06e382d35dc.tar.bz2
spark-39fd469078271aa12f3163606000e06e382d35dc.zip
[SPARK-15367][SQL] Add refreshTable back
#### What changes were proposed in this pull request? `refreshTable` was a method in `HiveContext`. It was deleted accidentally while we were migrating the APIs. This PR is to add it back to `HiveContext`. In addition, in `SparkSession`, we put it under the catalog namespace (`SparkSession.catalog.refreshTable`). #### How was this patch tested? Changed the existing test cases to use the function `refreshTable`. Also added a test case for refreshTable in `hivecontext-compatibility` Author: gatorsmile <gatorsmile@gmail.com> Closes #13156 from gatorsmile/refreshTable.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala21
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala27
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala4
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala4
-rw-r--r--sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala12
7 files changed, 59 insertions, 26 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 49c0742761..a99bc3bff6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -211,4 +211,17 @@ abstract class Catalog {
*/
def clearCache(): Unit
+ /**
+ * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
+ * Spark SQL or the external data source library it uses might cache certain metadata about a
+ * table, such as the location of blocks. When those change outside of Spark SQL, users should
+ * call this function to invalidate the cache.
+ *
+ * If this table is cached as an InMemoryRelation, drop the original cached version and make the
+ * new version cached lazily.
+ *
+ * @since 2.0.0
+ */
+ def refreshTable(tableName: String): Unit
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 68238dbb46..78b1db1682 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -126,24 +126,9 @@ case class RefreshTable(tableIdent: TableIdentifier)
extends RunnableCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
- // Refresh the given table's metadata first.
- sparkSession.sessionState.catalog.refreshTable(tableIdent)
-
- // If this table is cached as a InMemoryColumnarRelation, drop the original
- // cached version and make the new version cached lazily.
- val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
- // Use lookupCachedData directly since RefreshTable also takes databaseName.
- val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
- if (isCached) {
- // Create a data frame to represent the table.
- // TODO: Use uncacheTable once it supports database name.
- val df = Dataset.ofRows(sparkSession, logicalPlan)
- // Uncache the logicalPlan.
- sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
- // Cache it again.
- sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
- }
-
+ // Refresh the given table's metadata. If this table is cached as an InMemoryRelation,
+ // drop the original cached version and make the new version cached lazily.
+ sparkSession.catalog.refreshTable(tableIdent.quotedString)
Seq.empty[Row]
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 473e827f4d..1371abe189 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -345,6 +345,33 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
sparkSession.cacheManager.lookupCachedData(qName).nonEmpty
}
+ /**
+ * Refresh the cache entry for a table, if any. For Hive metastore table, the metadata
+ * is refreshed.
+ *
+ * @group cachemgmt
+ * @since 2.0.0
+ */
+ override def refreshTable(tableName: String): Unit = {
+ val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
+ sessionCatalog.refreshTable(tableIdent)
+
+ // If this table is cached as a InMemoryRelation, drop the original
+ // cached version and make the new version cached lazily.
+ val logicalPlan = sparkSession.sessionState.catalog.lookupRelation(tableIdent)
+ // Use lookupCachedData directly since RefreshTable also takes databaseName.
+ val isCached = sparkSession.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ if (isCached) {
+ // Create a data frame to represent the table.
+ // TODO: Use uncacheTable once it supports database name.
+ val df = Dataset.ofRows(sparkSession, logicalPlan)
+ // Uncache the logicalPlan.
+ sparkSession.cacheManager.tryUncacheQuery(df, blocking = true)
+ // Cache it again.
+ sparkSession.cacheManager.cacheQuery(df, Some(tableIdent.table))
+ }
+ }
+
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index f0b8a83dee..8f7c6f5d0c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -162,10 +162,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
def executePlan(plan: LogicalPlan): QueryExecution = new QueryExecution(sparkSession, plan)
- def refreshTable(tableName: String): Unit = {
- catalog.refreshTable(sqlParser.parseTableIdentifier(tableName))
- }
-
def invalidateTable(tableName: String): Unit = {
catalog.invalidateTable(sqlParser.parseTableIdentifier(tableName))
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 00adb9a44b..686c63065d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -622,7 +622,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("arrayInParquet")
- sessionState.refreshTable("arrayInParquet")
+ sparkSession.catalog.refreshTable("arrayInParquet")
checkAnswer(
sql("SELECT a FROM arrayInParquet"),
@@ -681,7 +681,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
.mode(SaveMode.Append)
.saveAsTable("mapInParquet")
- sessionState.refreshTable("mapInParquet")
+ sparkSession.catalog.refreshTable("mapInParquet")
checkAnswer(
sql("SELECT a FROM mapInParquet"),
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
index 622b043581..5b706b0432 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MultiDatabaseSuite.scala
@@ -217,7 +217,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
df.write.parquet(s"$path/p=2")
sql("ALTER TABLE t ADD PARTITION (p=2)")
- hiveContext.sessionState.refreshTable("t")
+ spark.catalog.refreshTable("t")
checkAnswer(
spark.table("t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
@@ -249,7 +249,7 @@ class MultiDatabaseSuite extends QueryTest with SQLTestUtils with TestHiveSingle
df.write.parquet(s"$path/p=2")
sql(s"ALTER TABLE $db.t ADD PARTITION (p=2)")
- hiveContext.sessionState.refreshTable(s"$db.t")
+ spark.catalog.refreshTable(s"$db.t")
checkAnswer(
spark.table(s"$db.t"),
df.withColumn("p", lit(1)).union(df.withColumn("p", lit(2))))
diff --git a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 75166f6bea..415d4c0049 100644
--- a/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hivecontext-compatibility/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -58,4 +58,16 @@ class HiveContext private[hive](
sparkSession.sharedState.asInstanceOf[HiveSharedState]
}
+ /**
+ * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
+ * Spark SQL or the external data source library it uses might cache certain metadata about a
+ * table, such as the location of blocks. When those change outside of Spark SQL, users should
+ * call this function to invalidate the cache.
+ *
+ * @since 1.3.0
+ */
+ def refreshTable(tableName: String): Unit = {
+ sparkSession.catalog.refreshTable(tableName)
+ }
+
}