aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorravipesala <ravindra.pesala@huawei.com>2014-09-19 15:31:57 -0700
committerMichael Armbrust <michael@databricks.com>2014-09-19 15:31:57 -0700
commit5522151eb14f4208798901f5c090868edd8e8dde (patch)
treec3cc2ab0316f43c4cc44e8d6e57b12675d2b84ba
parent2c3cc7641d86fa5196406955325a042890f77563 (diff)
downloadspark-5522151eb14f4208798901f5c090868edd8e8dde.tar.gz
spark-5522151eb14f4208798901f5c090868edd8e8dde.tar.bz2
spark-5522151eb14f4208798901f5c090868edd8e8dde.zip
[SPARK-2594][SQL] Support CACHE TABLE <name> AS SELECT ...
This feature allows user to add cache table from the select query. Example : ```CACHE TABLE testCacheTable AS SELECT * FROM TEST_TABLE``` Spark takes this type of SQL as command and it does lazy caching just like ```SQLContext.cacheTable```, ```CACHE TABLE <name>``` does. It can be executed from both SQLContext and HiveContext. Recreated the pull request after rebasing with master.And fixed all the comments raised in previous pull requests. https://github.com/apache/spark/pull/2381 https://github.com/apache/spark/pull/2390 Author : ravipesala ravindra.pesalahuawei.com Author: ravipesala <ravindra.pesala@huawei.com> Closes #2397 from ravipesala/SPARK-2594 and squashes the following commits: a5f0beb [ravipesala] Simplified the code as per Admin comment. 8059cd2 [ravipesala] Changed the behaviour from eager caching to lazy caching. d6e469d [ravipesala] Code review comments by Admin are handled. c18aa38 [ravipesala] Merge remote-tracking branch 'remotes/ravipesala/Add-Cache-table-as' into SPARK-2594 394d5ca [ravipesala] Changed style fb1759b [ravipesala] Updated as per Admin comments 8c9993c [ravipesala] Changed the style d8b37b2 [ravipesala] Updated as per the comments by Admin bc0bffc [ravipesala] Merge remote-tracking branch 'ravipesala/Add-Cache-table-as' into Add-Cache-table-as e3265d0 [ravipesala] Updated the code as per the comments by Admin in pull request. 724b9db [ravipesala] Changed style aaf5b59 [ravipesala] Added comment dc33895 [ravipesala] Updated parser to support add cache table command b5276b2 [ravipesala] Updated parser to support add cache table command eebc0c1 [ravipesala] Add CACHE TABLE <name> AS SELECT ... 6758f80 [ravipesala] Changed style 7459ce3 [ravipesala] Added comment 13c8e27 [ravipesala] Updated parser to support add cache table command 4e858d8 [ravipesala] Updated parser to support add cache table command b803fc8 [ravipesala] Add CACHE TABLE <name> AS SELECT ...
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala14
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala18
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala13
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala30
6 files changed, 69 insertions, 13 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index ca69531c69..862f78702c 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -151,7 +151,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
EXCEPT ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Except(q1, q2)} |
UNION ~ opt(DISTINCT) ^^^ { (q1: LogicalPlan, q2: LogicalPlan) => Distinct(Union(q1, q2)) }
)
- | insert | cache
+ | insert | cache | unCache
)
protected lazy val select: Parser[LogicalPlan] =
@@ -183,9 +183,17 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
}
protected lazy val cache: Parser[LogicalPlan] =
- (CACHE ^^^ true | UNCACHE ^^^ false) ~ TABLE ~ ident ^^ {
- case doCache ~ _ ~ tableName => CacheCommand(tableName, doCache)
+ CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ {
+ case tableName ~ None =>
+ CacheCommand(tableName, true)
+ case tableName ~ Some(plan) =>
+ CacheTableAsSelectCommand(tableName, plan)
}
+
+ protected lazy val unCache: Parser[LogicalPlan] =
+ UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
+ case tableName => CacheCommand(tableName, false)
+ }
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index a01809c1fc..8366639fa0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -75,3 +75,8 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}
+
+/**
+ * Returned for the "CACHE TABLE tableName AS SELECT .." command.
+ */
+case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 7943d6e1b6..45687d9604 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -305,6 +305,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
case logical.CacheCommand(tableName, cache) =>
Seq(execution.CacheCommand(tableName, cache)(context))
+ case logical.CacheTableAsSelectCommand(tableName, plan) =>
+ Seq(execution.CacheTableAsSelectCommand(tableName, plan))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index 94543fc95b..c2f48a902a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -166,3 +166,21 @@ case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
child.output.map(field => Row(field.name, field.dataType.toString, null))
}
}
+
+/**
+ * :: DeveloperApi ::
+ */
+@DeveloperApi
+case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan)
+ extends LeafNode with Command {
+
+ override protected[sql] lazy val sideEffectResult = {
+ import sqlContext._
+ logicalPlan.registerTempTable(tableName)
+ cacheTable(tableName)
+ Seq.empty[Row]
+ }
+
+ override def output: Seq[Attribute] = Seq.empty
+
+}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index befef46d93..591592841e 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -119,4 +119,17 @@ class CachedTableSuite extends QueryTest {
}
assert(!TestSQLContext.isCached("testData"), "Table 'testData' should not be cached")
}
+
+ test("CACHE TABLE tableName AS SELECT Star Table") {
+ TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+ TestSQLContext.sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
+ assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+ TestSQLContext.uncacheTable("testCacheTable")
+ }
+
+ test("'CACHE TABLE tableName AS SELECT ..'") {
+ TestSQLContext.sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
+ assert(TestSQLContext.isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+ TestSQLContext.uncacheTable("testCacheTable")
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index 21ecf17028..0aa6292c01 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -229,7 +229,12 @@ private[hive] object HiveQl {
SetCommand(Some(key), Some(value))
}
} else if (sql.trim.toLowerCase.startsWith("cache table")) {
- CacheCommand(sql.trim.drop(12).trim, true)
+ sql.trim.drop(12).trim.split(" ").toSeq match {
+ case Seq(tableName) =>
+ CacheCommand(tableName, true)
+ case Seq(tableName, _, select @ _*) =>
+ CacheTableAsSelectCommand(tableName, createPlan(select.mkString(" ").trim))
+ }
} else if (sql.trim.toLowerCase.startsWith("uncache table")) {
CacheCommand(sql.trim.drop(14).trim, false)
} else if (sql.trim.toLowerCase.startsWith("add jar")) {
@@ -243,15 +248,7 @@ private[hive] object HiveQl {
} else if (sql.trim.startsWith("!")) {
ShellCommand(sql.drop(1))
} else {
- val tree = getAst(sql)
- if (nativeCommands contains tree.getText) {
- NativeCommand(sql)
- } else {
- nodeToPlan(tree) match {
- case NativePlaceholder => NativeCommand(sql)
- case other => other
- }
- }
+ createPlan(sql)
}
} catch {
case e: Exception => throw new ParseException(sql, e)
@@ -262,6 +259,19 @@ private[hive] object HiveQl {
""".stripMargin)
}
}
+
+ /** Creates LogicalPlan for a given HiveQL string. */
+ def createPlan(sql: String) = {
+ val tree = getAst(sql)
+ if (nativeCommands contains tree.getText) {
+ NativeCommand(sql)
+ } else {
+ nodeToPlan(tree) match {
+ case NativePlaceholder => NativeCommand(sql)
+ case other => other
+ }
+ }
+ }
def parseDdl(ddl: String): Seq[Attribute] = {
val tree =