aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-11 00:06:50 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-11 00:07:03 -0700
commit65ed7793db7a3d97aa244c372ac9a756acfa9447 (patch)
treea9baaac457bf4eeedef9cebf625b05c107b9d78c /sql/hive
parent6d15e9f7cbe9dffe8695519fc5cb6baa59f75776 (diff)
downloadspark-65ed7793db7a3d97aa244c372ac9a756acfa9447.tar.gz
spark-65ed7793db7a3d97aa244c372ac9a756acfa9447.tar.bz2
spark-65ed7793db7a3d97aa244c372ac9a756acfa9447.zip
[SPARK-1968][SQL] SQL/HiveQL command for caching/uncaching tables
JIRA issue: [SPARK-1968](https://issues.apache.org/jira/browse/SPARK-1968) This PR added support for SQL/HiveQL command for caching/uncaching tables: ``` scala> sql("CACHE TABLE src") ... res0: org.apache.spark.sql.SchemaRDD = SchemaRDD[0] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, true scala> table("src") ... res1: org.apache.spark.sql.SchemaRDD = SchemaRDD[3] at RDD at SchemaRDD.scala:98 == Query Plan == InMemoryColumnarTableScan [key#0,value#1], (HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), None), false scala> isCached("src") res2: Boolean = true scala> sql("CACHE TABLE src") ... res3: org.apache.spark.sql.SchemaRDD = SchemaRDD[4] at RDD at SchemaRDD.scala:98 == Query Plan == CacheCommandPhysical src, false scala> table("src") ... res4: org.apache.spark.sql.SchemaRDD = SchemaRDD[11] at RDD at SchemaRDD.scala:98 == Query Plan == HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None scala> isCached("src") res5: Boolean = false ``` Things also work for `hql`. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1038 from liancheng/sqlCacheTable and squashes the following commits: ecb7194 [Cheng Lian] Trimmed the SQL string before parsing special commands 6f4ce42 [Cheng Lian] Moved logical command classes to a separate file 3458a24 [Cheng Lian] Added comment for public API f0ffacc [Cheng Lian] Added isCached() predicate 15ec6d2 [Cheng Lian] Added "(UN)CACHE TABLE" SQL/HiveQL statements (cherry picked from commit 0266a0c8a70e0fbaeb0df63031f7a750ffc31a80) Signed-off-by: Michael Armbrust <michael@databricks.com>
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala18
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala5
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala16
3 files changed, 30 insertions, 9 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 4e74d9bc90..b745d8ffd8 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -218,15 +218,19 @@ private[hive] object HiveQl {
case Array(key, value) => // "set key=value"
SetCommand(Some(key), Some(value))
}
- } else if (sql.toLowerCase.startsWith("add jar")) {
+ } else if (sql.trim.toLowerCase.startsWith("cache table")) {
+ CacheCommand(sql.drop(12).trim, true)
+ } else if (sql.trim.toLowerCase.startsWith("uncache table")) {
+ CacheCommand(sql.drop(14).trim, false)
+ } else if (sql.trim.toLowerCase.startsWith("add jar")) {
AddJar(sql.drop(8))
- } else if (sql.toLowerCase.startsWith("add file")) {
+ } else if (sql.trim.toLowerCase.startsWith("add file")) {
AddFile(sql.drop(9))
- } else if (sql.startsWith("dfs")) {
+ } else if (sql.trim.startsWith("dfs")) {
DfsCommand(sql)
- } else if (sql.startsWith("source")) {
+ } else if (sql.trim.startsWith("source")) {
SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
- } else if (sql.startsWith("!")) {
+ } else if (sql.trim.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
val tree = getAst(sql)
@@ -839,11 +843,11 @@ private[hive] object HiveQl {
case Token("TOK_FUNCTIONDI", Token(SUM(), Nil) :: arg :: Nil) => SumDistinct(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(MAX(), Nil) :: arg :: Nil) => Max(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(MIN(), Nil) :: arg :: Nil) => Min(nodeToExpr(arg))
-
+
/* System functions about string operations */
case Token("TOK_FUNCTION", Token(UPPER(), Nil) :: arg :: Nil) => Upper(nodeToExpr(arg))
case Token("TOK_FUNCTION", Token(LOWER(), Nil) :: arg :: Nil) => Lower(nodeToExpr(arg))
-
+
/* Casts */
case Token("TOK_FUNCTION", Token("TOK_STRING", Nil) :: arg :: Nil) =>
Cast(nodeToExpr(arg), StringType)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index fa7d010459..d199097e06 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
@@ -104,7 +104,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
val inRepoTests = if (System.getProperty("user.dir").endsWith("sql" + File.separator + "hive")) {
new File("src" + File.separator + "test" + File.separator + "resources" + File.separator)
} else {
- new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
+ new File("sql" + File.separator + "hive" + File.separator + "src" + File.separator + "test" +
File.separator + "resources")
}
@@ -131,6 +131,7 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {
override lazy val analyzed = {
val describedTables = logical match {
case NativeCommand(describedTable(tbl)) => tbl :: Nil
+ case CacheCommand(tbl, _) => tbl :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index f9a162ef4e..91ac03ca30 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -56,4 +56,20 @@ class CachedTableSuite extends HiveComparisonTest {
TestHive.uncacheTable("src")
}
}
+
+ test("'CACHE TABLE' and 'UNCACHE TABLE' HiveQL statement") {
+ TestHive.hql("CACHE TABLE src")
+ TestHive.table("src").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => // Found evidence of caching
+ case _ => fail(s"Table 'src' should be cached")
+ }
+ assert(TestHive.isCached("src"), "Table 'src' should be cached")
+
+ TestHive.hql("UNCACHE TABLE src")
+ TestHive.table("src").queryExecution.executedPlan match {
+ case _: InMemoryColumnarTableScan => fail(s"Table 'src' should not be cached")
+ case _ => // Found evidence of uncaching
+ }
+ assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
+ }
}