aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-11 00:06:50 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-11 00:06:50 -0700
commit0266a0c8a70e0fbaeb0df63031f7a750ffc31a80 (patch)
treecf264a4e9ba80be39b734225e7d4c6bac47129a1
parent0402bd77ec786d1fa6cfd7f9cc3aa97c7ab16fd8 (diff)
downloadspark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.gz
spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.tar.bz2
spark-0266a0c8a70e0fbaeb0df63031f7a750ffc31a80.zip
[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala10
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala35
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala60
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala23
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala16
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala16
10 files changed, 152 insertions, 47 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 36758f3114..46fcfbb9e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -111,6 +111,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AVG = Keyword("AVG")
protected val BY = Keyword("BY")
+ protected val CACHE = Keyword("CACHE")
protected val CAST = Keyword("CAST")
protected val COUNT = Keyword("COUNT")
protected val DESC = Keyword("DESC")
@@ -149,7 +150,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val SEMI = Keyword("SEMI")
protected val STRING = Keyword("STRING")
protected val SUM = Keyword("SUM")
+ protected val TABLE = Keyword("TABLE")
protected val TRUE = Keyword("TRUE")
+ protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
protected val WHERE = Keyword("WHERE")
@@ -189,7 +192,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
- | insert
+ | insert | cache
)
protected lazy val select: Parser[LogicalPlan] =
@@ -220,6 +223,11 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
InsertIntoTable(r, Map[String, Option[String]](), s, overwrite)
}
+ protected lazy val cache: Parser[LogicalPlan] =
+ (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
+ case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
+ }
+
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
protected lazy val projection: Parser[Expression] =
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 7eeb98aea6..0933a31c36 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.types.{StringType, StructType}
+import org.apache.spark.sql.catalyst.types.StructType
import org.apache.spark.sql.catalyst.trees
abstract class LogicalPlan extends QueryPlan[LogicalPlan] {
@@ -97,39 +97,6 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] {
}
/**
- * A logical node that represents a non-query command to be executed by the system. For example,
- * commands can be used by parsers to represent DDL operations.
- */
-abstract class Command extends LeafNode {
- self: Product =>
- def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
-}
-
-/**
- * Returned for commands supported by a given parser, but not catalyst. In general these are DDL
- * commands that are passed directly to another system.
- */
-case class NativeCommand(cmd: String) extends Command
-
-/**
- * Commands of the form "SET (key) (= value)".
- */
-case class SetCommand(key: Option[String], value: Option[String]) extends Command {
- override def output = Seq(
- AttributeReference("key", StringType, nullable = false)(),
- AttributeReference("value", StringType, nullable = false)()
- )
-}
-
-/**
- * Returned by a parser when the users only wants to see what query plan would be executed, without
- * actually performing the execution.
- */
-case class ExplainCommand(plan: LogicalPlan) extends Command {
- override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
-}
-
-/**
* A logical plan node with single child.
*/
abstract class UnaryNode extends LogicalPlan with trees.UnaryNode[LogicalPlan] {
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
new file mode 100644
index 0000000000..d05c965275
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+import org.apache.spark.sql.catalyst.types.StringType
+
+/**
+ * A logical node that represents a non-query command to be executed by the system. For example,
+ * commands can be used by parsers to represent DDL operations.
+ */
+abstract class Command extends LeafNode {
+ self: Product =>
+ def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this
+}
+
+/**
+ * Returned for commands supported by a given parser, but not catalyst. In general these are DDL
+ * commands that are passed directly to another system.
+ */
+case class NativeCommand(cmd: String) extends Command
+
+/**
+ * Commands of the form "SET (key) (= value)".
+ */
+case class SetCommand(key: Option[String], value: Option[String]) extends Command {
+ override def output = Seq(
+ AttributeReference("key", StringType, nullable = false)(),
+ AttributeReference("value", StringType, nullable = false)()
+ )
+}
+
+/**
+ * Returned by a parser when the users only wants to see what query plan would be executed, without
+ * actually performing the execution.
+ */
+case class ExplainCommand(plan: LogicalPlan) extends Command {
+ override def output = Seq(AttributeReference("plan", StringType, nullable = false)())
+}
+
+/**
+ * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
+ */
+case class CacheCommand(tableName: String, doCache: Boolean) extends Command
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 021e0e8245..264192ed1a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
}
}
+ /** Returns true if the table is currently cached in-memory. */
+ def isCached(tableName: String): Boolean = {
+ val relation = catalog.lookupRelation(None, tableName)
+ EliminateAnalysisOperators(relation) match {
+ case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true
+ case _ => false
+ }
+ }
+
protected[sql] class SparkPlanner extends SparkStrategies {
val sparkContext = self.sparkContext
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0455748d40..f2f95dfe27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case logical.SetCommand(key, value) =>
Seq(execution.SetCommandPhysical(key, value, plan.output)(context))
case logical.ExplainCommand(child) =>
- val qe = context.executePlan(child)
- Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context))
+ val executedPlan = context.executePlan(child).executedPlan
+ Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context))
+ case logical.CacheCommand(tableName, cache) =>
+ Seq(execution.CacheCommandPhysical(tableName, cache)(context))
case _ => Nil
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 9364506691..be26d19e66 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute])
override def otherCopyArgs = context :: Nil
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+ extends LeafNode {
+
+ lazy val commandSideEffect = {
+ if (doCache) {
+ context.cacheTable(tableName)
+ } else {
+ context.uncacheTable(tableName)
+ }
+ }
+
+ override def execute(): RDD[Row] = {
+ commandSideEffect
+ context.emptyResult
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 0331f90272..ebca3adc2f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest {
TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key")
TestSQLContext.uncacheTable("testData")
}
+
+ test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") {
+ TestSQLContext.sql("CACHE TABLE testData")
+ TestSQLContext.table("testData").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => // Found evidence of caching
+ case _ => fail(s"Table 'testData' should be cached")
+ }
+ assert(TestSQLContext.isCached("testData"), "Table 'testData' should be cached")
+
+ TestSQLContext.sql("UNCACHE TABLE testData")
+ TestSQLContext.table("testData").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached")
+ case _ => // Found evidence of uncaching
+ }
+ assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 4e74d9bc90..b745d8ffd8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -218,15 +218,19 @@ private[hive] object HiveQl {
case Array(key, value) => // "set key=value"
SetCommand(Some(key), Some(value))
}
- } else if (sql.toLowerCase.startsWith("add jar")) {
+ } else if (sql.trim.toLowerCase.startsWith("cache table")) {
+ CacheCommand(sql.drop(12).trim, true)
+ } else if (sql.trim.toLowerCase.startsWith("uncache table")) {
+ CacheCommand(sql.drop(14).trim, false)
+ } else if (sql.trim.toLowerCase.startsWith("add jar")) {
AddJar(sql.drop(8))
- } else if (sql.toLowerCase.startsWith("add file")) {
+ } else if (sql.trim.toLowerCase.startsWith("add file")) {
AddFile(sql.drop(9))
- } else if (sql.startsWith("dfs")) {
+ } else if (sql.trim.startsWith("dfs")) {
DfsCommand(sql)
- } else if (sql.startsWith("source")) {
+ } else if (sql.trim.startsWith("source")) {
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
- } else if (sql.startsWith("!")) {
+ } else if (sql.trim.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
@@ -839,11 +843,11 @@ private[hive] object HiveQl {
case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg))
-
+
/* System functions about string operations */
case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg))
-
+
/* Casts */
case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
Cast(nodeToExpr(arg), StringType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index 041e813598..9386008d02 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
@@ -103,7 +103,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
- new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
+ new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
File.separator + "resources")
}
@@ -130,6 +130,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
override lazy val analyzed = {
val describedTables = logical match {
case NativeCommand(describedTable(tbl)) => tbl :: Nil
+ case CacheCommand(tbl, _) => tbl :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index f9a162ef4e..91ac03ca30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest {
TestHive.uncacheTable("src")
}
}
+
+ test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
+ TestHive.hql("CACHE TABLE src")
+ TestHive.table("src").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => // Found evidence of caching
+ case _ => fail(s"Table 'src' should be cached")
+ }
+ assert(TestHive.isCached("src"), "Table 'src' should be cached")
+
+ TestHive.hql("UNCACHE TABLE src")
+ TestHive.table("src").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached")
+ case _ => // Found evidence of uncaching
+ }
+ assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
+ }
}