From 0266a0c8a70e0fbaeb0df63031f7a750ffc31a80 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 11 Jun 2014 00:06:50 -0700 Subject: [SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian Closes #1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements --- .../org/apache/spark/sql/catalyst/SqlParser.scala | 10 +++- .../sql/catalyst/plans/logical/LogicalPlan.scala | 35 +------------ .../sql/catalyst/plans/logical/commands.scala | 60 ++++++++++++++++++++++ .../scala/org/apache/spark/sql/SQLContext.scala | 9 ++++ .../spark/sql/execution/SparkStrategies.scala | 7 +-- .../org/apache/spark/sql/execution/commands.scala | 23 +++++++++ .../org/apache/spark/sql/CachedTableSuite.scala | 16 ++++++ .../scala/org/apache/spark/sql/hive/HiveQl.scala | 18 ++++--- .../scala/org/apache/spark/sql/hive/TestHive.scala | 5 +- .../apache/spark/sql/hive/CachedTableSuite.scala | 16 ++++++ 10 files changed, 152 insertions(+), 47 deletions(-) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala (limited to 'sql') diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 36758f3114..46fcfbb9e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -111,6 +111,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") protected val BY = Keyword("BY") + protected val CACHE = Keyword("CACHE") protected val CAST = Keyword("CAST") protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") @@ -149,7 +150,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val SEMI = Keyword("SEMI") protected val STRING = Keyword("STRING") protected val SUM = Keyword("SUM") + protected val TABLE = Keyword("TABLE") protected val TRUE = Keyword("TRUE") + protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") protected val WHERE = Keyword("WHERE") @@ -189,7 +192,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { UNION ~ ALL ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Union(q1, q2) } | UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) } ) - | insert + | insert | cache ) protected lazy val select: Parser[LogicalPlan] = @@ -220,6 +223,11 @@ class SqlParser extends StandardTokenParsers with PackratParsers { InsertIntoTable(r, Map[String, Option[String]](), s, overwrite) } + protected lazy val cache: Parser[LogicalPlan] = + (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ { + case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache) + } + protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") protected lazy val projection: Parser[Expression] = diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 7eeb98aea6..0933a31c36 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.plans.logical import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan -import org.apache.spark.sql.catalyst.types.{StringType, StructType} +import org.apache.spark.sql.catalyst.types.StructType import org.apache.spark.sql.catalyst.trees abstract class LogicalPlan extends QueryPlan[LogicalPlan] { @@ -96,39 +96,6 @@ abstract class LeafNode extends LogicalPlan with trees.LeafNode[LogicalPlan] { def references = Set.empty } -/** - * A logical node that represents a non-query command to be executed by the system. For example, - * commands can be used by parsers to represent DDL operations. - */ -abstract class Command extends LeafNode { - self: Product => - def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this -} - -/** - * Returned for commands supported by a given parser, but not catalyst. In general these are DDL - * commands that are passed directly to another system. - */ -case class NativeCommand(cmd: String) extends Command - -/** - * Commands of the form "SET (key) (= value)". - */ -case class SetCommand(key: Option[String], value: Option[String]) extends Command { - override def output = Seq( - AttributeReference("key", StringType, nullable = false)(), - AttributeReference("value", StringType, nullable = false)() - ) -} - -/** - * Returned by a parser when the users only wants to see what query plan would be executed, without - * actually performing the execution. - */ -case class ExplainCommand(plan: LogicalPlan) extends Command { - override def output = Seq(AttributeReference("plan", StringType, nullable = false)()) -} - /** * A logical plan node with single child. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala new file mode 100644 index 0000000000..d05c965275 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute} +import org.apache.spark.sql.catalyst.types.StringType + +/** + * A logical node that represents a non-query command to be executed by the system. For example, + * commands can be used by parsers to represent DDL operations. + */ +abstract class Command extends LeafNode { + self: Product => + def output: Seq[Attribute] = Seq.empty // TODO: SPARK-2081 should fix this +} + +/** + * Returned for commands supported by a given parser, but not catalyst. In general these are DDL + * commands that are passed directly to another system. + */ +case class NativeCommand(cmd: String) extends Command + +/** + * Commands of the form "SET (key) (= value)". + */ +case class SetCommand(key: Option[String], value: Option[String]) extends Command { + override def output = Seq( + AttributeReference("key", StringType, nullable = false)(), + AttributeReference("value", StringType, nullable = false)() + ) +} + +/** + * Returned by a parser when the users only wants to see what query plan would be executed, without + * actually performing the execution. + */ +case class ExplainCommand(plan: LogicalPlan) extends Command { + override def output = Seq(AttributeReference("plan", StringType, nullable = false)()) +} + +/** + * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command. + */ +case class CacheCommand(tableName: String, doCache: Boolean) extends Command + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 021e0e8245..264192ed1a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -188,6 +188,15 @@ class SQLContext(@transient val sparkContext: SparkContext) } } + /** Returns true if the table is currently cached in-memory. */ + def isCached(tableName: String): Boolean = { + val relation = catalog.lookupRelation(None, tableName) + EliminateAnalysisOperators(relation) match { + case SparkLogicalPlan(_: InMemoryColumnarTableScan) => true + case _ => false + } + } + protected[sql] class SparkPlanner extends SparkStrategies { val sparkContext = self.sparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 0455748d40..f2f95dfe27 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -239,10 +239,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.SetCommand(key, value) => Seq(execution.SetCommandPhysical(key, value, plan.output)(context)) case logical.ExplainCommand(child) => - val qe = context.executePlan(child) - Seq(execution.ExplainCommandPhysical(qe.executedPlan, plan.output)(context)) + val executedPlan = context.executePlan(child).executedPlan + Seq(execution.ExplainCommandPhysical(executedPlan, plan.output)(context)) + case logical.CacheCommand(tableName, cache) => + Seq(execution.CacheCommandPhysical(tableName, cache)(context)) case _ => Nil } } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index 9364506691..be26d19e66 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -65,3 +65,26 @@ case class ExplainCommandPhysical(child: SparkPlan, output: Seq[Attribute]) override def otherCopyArgs = context :: Nil } + +/** + * :: DeveloperApi :: + */ +@DeveloperApi +case class CacheCommandPhysical(tableName: String, doCache: Boolean)(@transient context: SQLContext) + extends LeafNode { + + lazy val commandSideEffect = { + if (doCache) { + context.cacheTable(tableName) + } else { + context.uncacheTable(tableName) + } + } + + override def execute(): RDD[Row] = { + commandSideEffect + context.emptyResult + } + + override def output: Seq[Attribute] = Seq.empty +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 0331f90272..ebca3adc2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -70,4 +70,20 @@ class CachedTableSuite extends QueryTest { TestSQLContext.sql("SELECT * FROM testData a JOIN testData b ON a.key = b.key") TestSQLContext.uncacheTable("testData") } + + test("'CACHE TABLE' and 'UNCACHE TABLE' SQL statement") { + TestSQLContext.sql("CACHE TABLE testData") + TestSQLContext.table("testData").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'testData' should be cached") + } + assert(TestSQLContext.isCached("testData"), "Table 'testData' should be cached") + + TestSQLContext.sql("UNCACHE TABLE testData") + TestSQLContext.table("testData").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => fail(s"Table 'testData' should not be cached") + case _ => // Found evidence of uncaching + } + assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached") + } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4e74d9bc90..b745d8ffd8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -218,15 +218,19 @@ private[hive] object HiveQl { case Array(key, value) => // "set key=value" SetCommand(Some(key), Some(value)) } - } else if (sql.toLowerCase.startsWith("add jar")) { + } else if (sql.trim.toLowerCase.startsWith("cache table")) { + CacheCommand(sql.drop(12).trim, true) + } else if (sql.trim.toLowerCase.startsWith("uncache table")) { + CacheCommand(sql.drop(14).trim, false) + } else if (sql.trim.toLowerCase.startsWith("add jar")) { AddJar(sql.drop(8)) - } else if (sql.toLowerCase.startsWith("add file")) { + } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.drop(9)) - } else if (sql.startsWith("dfs")) { + } else if (sql.trim.startsWith("dfs")) { DfsCommand(sql) - } else if (sql.startsWith("source")) { + } else if (sql.trim.startsWith("source")) { SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath }) - } else if (sql.startsWith("!")) { + } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { val tree = getAst(sql) @@ -839,11 +843,11 @@ private[hive] object HiveQl { case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg)) - + /* System functions about string operations */ case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg)) - + /* Casts */ case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) => Cast(nodeToExpr(arg), StringType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index 041e813598..9386008d02 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ @@ -103,7 +103,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) } else { - new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + + new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + File.separator + "resources") } @@ -130,6 +130,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { override lazy val analyzed = { val describedTables = logical match { case NativeCommand(describedTable(tbl)) => tbl :: Nil + case CacheCommand(tbl, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index f9a162ef4e..91ac03ca30 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest { TestHive.uncacheTable("src") } } + + test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") { + TestHive.hql("CACHE TABLE src") + TestHive.table("src").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'src' should be cached") + } + assert(TestHive.isCached("src"), "Table 'src' should be cached") + + TestHive.hql("UNCACHE TABLE src") + TestHive.table("src").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached") + case _ => // Found evidence of uncaching + } + assert(!TestHive.isCached("src"), "Table 'src' should not be cached") + } } -- cgit v1.2.3