diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-06-11 00:06:50 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-06-11 00:06:50 -0700 |
commit | 0266a0c8a70e0fbaeb0df63031f7a750ffc31a80 (patch) | |
tree | cf264a4e9ba80be39b734225e7d4c6bac47129a1 /sql/core | |
parent | 0402bd77ec786d1fa6cfd7f9cc3aa97c7ab16fd8 (diff) | |
download | spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.gz spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.bz2 spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.zip |
[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968)
This PR added support for SQL/HiveQL command for caching/uncaching tables:
```
scala> sql("CACHE TABLE src")
...
res0: org.apache.spark.sql.SchemaRDD =
SchemaRDD[0] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, true
scala> table("src")
...
res1: org.apache.spark.sql.SchemaRDD =
SchemaRDD[3] at RDD at SchemaRDD.scala:98
== Query Plan ==
InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false
scala> isCached("src")
res2: Boolean = true
scala> sql("CACHE TABLE src")
...
res3: org.apache.spark.sql.SchemaRDD =
SchemaRDD[4] at RDD at SchemaRDD.scala:98
== Query Plan ==
CacheCommandPhysical src, false
scala> table("src")
...
res4: org.apache.spark.sql.SchemaRDD =
SchemaRDD[11] at RDD at SchemaRDD.scala:98
== Query Plan ==
HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None
scala> isCached("src")
res5: Boolean = false
```
Things also work for `hql`.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1038 from liancheng/sqlCacheTable and squashes the following commits:
ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands
6f4ce42 [Cheng Lian] Moved logical command classes to a separate file
3458a24 [Cheng Lian] Added comment for public API
f0ffacc [Cheng Lian] Added isCached() predicate
15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements
Diffstat (limited to 'sql/core')
4 files changed, 52 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 021e0e8245..264192ed1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext) } } + /** Returns true if the table is currently cached in-memory. */ + def isCached(tableName: String): Boolean = { + val relation = catalog.lookupRelation(None, tableName) + EliminateAnalysisOperators(relation) match { + case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true + case _ => false + } + } + protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0455748d40..f2f95dfe27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.SetCommand(key, value) => Seq(execution.SetCommandPhysical(key, value, plan.output)(context)) case logical.ExplainCommand(child) => - val qe = context.executePlan(child) - Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context)) + val executedPlan = context.executePlan(child).executedPlan + Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context)) + case logical.CacheCommand(tableName, cache) => + Seq(execution.CacheCommandPhysical(tableName, cache)(context)) case _ => Nil } } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 9364506691..be26d19e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) override def otherCopyArgs = context :: Nil } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext) + extends LeafNode { + + lazy val commandSideEffect = { + if (doCache) { + context.cacheTable(tableName) + } else { + context.uncacheTable(tableName) + } + } + + override def execute(): RDD[Row] = { + commandSideEffect + context.emptyResult + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 0331f90272..ebca3adc2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest { TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key") TestSQLContext.uncacheTable("testData") } + + test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { + TestSQLContext.sql("CACHE TABLE testData") + TestSQLContext.table("testData").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'testData' should be cached") + } + assert(TestSQLContext.isCached("testData"), "Table 'testData' should be cached") + + TestSQLContext.sql("UNCACHE TABLE testData") + TestSQLContext.table("testData").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached") + case _ => // Found evidence of uncaching + } + assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") + } } |