aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-11 00:06:50 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-11 00:06:50 -0700
commit0266a0c8a70e0fbaeb0df63031f7a750ffc31a80 (patch)
treecf264a4e9ba80be39b734225e7d4c6bac47129a1 /sql/core
parent0402bd77ec786d1fa6cfd7f9cc3aa97c7ab16fd8 (diff)
downloadspark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.gz
spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.bz2
spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.zip
[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala16
4 files changed, 52 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 021e0e8245..264192ed1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
+ /** Returns true if the table is currently cached in-memory. */
+ def isCached(tableName: String): Boolean = {
+ val relation = catalog.lookupRelation(None, tableName)
+ EliminateAnalysisOperators(relation) match {
+ case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+ case _ => false
+ }
+ }
+
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0455748d40..f2f95dfe27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
- val qe = context.executePlan(child)
- Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
+ val executedPlan = context.executePlan(child).executedPlan
+ Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
+ case logical.CacheCommand(tableName, cache) =>
+ Seq(execution.CacheCommandPhysical(tableName, cache)(context))
case _ => Nil
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 9364506691..be26d19e66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
override def otherCopyArgs = context :: Nil
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+ extends LeafNode {
+
+ lazy val commandSideEffect = {
+ if (doCache) {
+ context.cacheTable(tableName)
+ } else {
+ context.uncacheTable(tableName)
+ }
+ }
+
+ override def execute(): RDD[Row] = {
+ commandSideEffect
+ context.emptyResult
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0331f90272..ebca3adc2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
TestSQLContext.uncacheTable("testData")
}
+
+ test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
+ TestSQLContext.sql("CACHE TABLE testData")
+ TestSQLContext.table("testData").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => // Found evidence of caching
+ case _ => fail(s"Table 'testData' should be cached")
+ }
+ assert(TestSQLContext.isCached("testData"), "Table 'testData' should be cached")
+
+ TestSQLContext.sql("UNCACHE TABLE testData")
+ TestSQLContext.table("testData").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached")
+ case _ => // Found evidence of uncaching
+ }
+ assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
+ }
}