aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-04-21 14:48:42 -0700
committerMichael Armbrust <michael@databricks.com>2015-04-21 14:48:42 -0700
commit6265cba00f6141575b4be825735d77d4cea500ab (patch)
tree565c734cbeed4d8dfb2742f809b0d638b2e575a3
parent03fd92167107f1d061c1a7ef216468b508546ac7 (diff)
downloadspark-6265cba00f6141575b4be825735d77d4cea500ab.tar.gz
spark-6265cba00f6141575b4be825735d77d4cea500ab.tar.bz2
spark-6265cba00f6141575b4be825735d77d4cea500ab.zip
[SPARK-6969][SQL] Refresh the cached table when REFRESH TABLE is used
https://issues.apache.org/jira/browse/SPARK-6969 Author: Yin Huai <yhuai@databricks.com> Closes #5583 from yhuai/refreshTableRefreshDataCache and squashes the following commits: 1e5142b [Yin Huai] Add todo. 92b2498 [Yin Huai] Minor updates. 367df92 [Yin Huai] Recache data in the command of REFRESH TABLE.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala17
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala50
2 files changed, 66 insertions, 1 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 2e861b84b7..78d494184e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -347,7 +347,24 @@ private[sql] case class RefreshTable(databaseName: String, tableName: String)
extends RunnableCommand {
override def run(sqlContext: SQLContext): Seq[Row] = {
+ // Refresh the given table's metadata first.
sqlContext.catalog.refreshTable(databaseName, tableName)
+
+ // If this table is cached as a InMemoryColumnarRelation, drop the original
+ // cached version and make the new version cached lazily.
+ val logicalPlan = sqlContext.catalog.lookupRelation(Seq(databaseName, tableName))
+ // Use lookupCachedData directly since RefreshTable also takes databaseName.
+ val isCached = sqlContext.cacheManager.lookupCachedData(logicalPlan).nonEmpty
+ if (isCached) {
+ // Create a data frame to represent the table.
+ // TODO: Use uncacheTable once it supports database name.
+ val df = DataFrame(sqlContext, logicalPlan)
+ // Uncache the logicalPlan.
+ sqlContext.cacheManager.tryUncacheQuery(df, blocking = true)
+ // Cache it again.
+ sqlContext.cacheManager.cacheQuery(df, Some(tableName))
+ }
+
Seq.empty[Row]
}
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index c188264072..fc6c3c3503 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,11 +17,14 @@
package org.apache.spark.sql.hive
+import java.io.File
+
import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.{AnalysisException, DataFrame, QueryTest}
+import org.apache.spark.sql.{SaveMode, AnalysisException, DataFrame, QueryTest}
import org.apache.spark.storage.RDDBlockId
+import org.apache.spark.util.Utils
class CachedTableSuite extends QueryTest {
@@ -155,4 +158,49 @@ class CachedTableSuite extends QueryTest {
assertCached(table("udfTest"))
uncacheTable("udfTest")
}
+
+ test("REFRESH TABLE also needs to recache the data (data source tables)") {
+ val tempPath: File = Utils.createTempDir()
+ tempPath.delete()
+ table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+ sql("DROP TABLE IF EXISTS refreshTable")
+ createExternalTable("refreshTable", tempPath.toString, "parquet")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Cache the table.
+ sql("CACHE TABLE refreshTable")
+ assertCached(table("refreshTable"))
+ // Append new data.
+ table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+ // We are still using the old data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").collect())
+ // Refresh the table.
+ sql("REFRESH TABLE refreshTable")
+ // We are using the new data.
+ assertCached(table("refreshTable"))
+ checkAnswer(
+ table("refreshTable"),
+ table("src").unionAll(table("src")).collect())
+
+ // Drop the table and create it again.
+ sql("DROP TABLE refreshTable")
+ createExternalTable("refreshTable", tempPath.toString, "parquet")
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+ // Refresh the table. REFRESH TABLE command should not make a uncached
+ // table cached.
+ sql("REFRESH TABLE refreshTable")
+ checkAnswer(
+ table("refreshTable"),
+ table("src").unionAll(table("src")).collect())
+ // It is not cached.
+ assert(!isCached("refreshTable"), "refreshTable should not be cached.")
+
+ sql("DROP TABLE refreshTable")
+ Utils.deleteRecursively(tempPath)
+ }
}