From 65ed7793db7a3d97aa244c372ac9a756acfa9447 Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Wed, 11 Jun 2014 00:06:50 -0700 Subject: [SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian Closes #1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements (cherry picked from commit 0266a0c8a70e0fbaeb0df63031f7a750ffc31a80) Signed-off-by: Michael Armbrust --- .../main/scala/org/apache/spark/sql/hive/HiveQl.scala | 18 +++++++++++------- .../scala/org/apache/spark/sql/hive/TestHive.scala | 5 +++-- .../org/apache/spark/sql/hive/CachedTableSuite.scala | 16 ++++++++++++++++ 3 files changed, 30 insertions(+), 9 deletions(-) (limited to 'sql/hive') diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 4e74d9bc90..b745d8ffd8 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -218,15 +218,19 @@ private[hive] object HiveQl { case Array(key, value) => // "set key=value" SetCommand(Some(key), Some(value)) } - } else if (sql.toLowerCase.startsWith("add jar")) { + } else if (sql.trim.toLowerCase.startsWith("cache table")) { + CacheCommand(sql.drop(12).trim, true) + } else if (sql.trim.toLowerCase.startsWith("uncache table")) { + CacheCommand(sql.drop(14).trim, false) + } else if (sql.trim.toLowerCase.startsWith("add jar")) { AddJar(sql.drop(8)) - } else if (sql.toLowerCase.startsWith("add file")) { + } else if (sql.trim.toLowerCase.startsWith("add file")) { AddFile(sql.drop(9)) - } else if (sql.startsWith("dfs")) { + } else if (sql.trim.startsWith("dfs")) { DfsCommand(sql) - } else if (sql.startsWith("source")) { + } else if (sql.trim.startsWith("source")) { SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath }) - } else if (sql.startsWith("!")) { + } else if (sql.trim.startsWith("!")) { ShellCommand(sql.drop(1)) } else { val tree = getAst(sql) @@ -839,11 +843,11 @@ private[hive] object HiveQl { case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg)) - + /* System functions about string operations */ case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg)) case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg)) - + /* Casts */ case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) => Cast(nodeToExpr(arg), StringType) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index fa7d010459..d199097e06 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ @@ -104,7 +104,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) { new File("src" + File.separator + "test" + File.separator + "resources" + File.separator) } else { - new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + + new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" + File.separator + "resources") } @@ -131,6 +131,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) { override lazy val analyzed = { val describedTables = logical match { case NativeCommand(describedTable(tbl)) => tbl :: Nil + case CacheCommand(tbl, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index f9a162ef4e..91ac03ca30 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest { TestHive.uncacheTable("src") } } + + test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") { + TestHive.hql("CACHE TABLE src") + TestHive.table("src").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => // Found evidence of caching + case _ => fail(s"Table 'src' should be cached") + } + assert(TestHive.isCached("src"), "Table 'src' should be cached") + + TestHive.hql("UNCACHE TABLE src") + TestHive.table("src").queryExecution.executedPlan match { + case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached") + case _ => // Found evidence of uncaching + } + assert(!TestHive.isCached("src"), "Table 'src' should not be cached") + } } -- cgit v1.2.3