aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-05 17:51:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-05 17:51:59 -0700
commit34b97a067d1b370fbed8ecafab2f48501a35d783 (patch)
treedc035a45d08a2b7b9d4a5cb5e527b880a5125402
parent58f5361caaa2f898e38ae4b3794167881e20a818 (diff)
downloadspark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.gz
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.bz2
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.zip
[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance. This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching. Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2513 from liancheng/eager-caching and squashes the following commits: fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
-rwxr-xr-xsql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala45
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala2
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala15
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala145
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala66
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala78
11 files changed, 265 insertions, 158 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 26336332c0..854b5b461b 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -67,11 +67,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected implicit def asParser(k: Keyword): Parser[String] =
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
+ protected val ABS = Keyword("ABS")
protected val ALL = Keyword("ALL")
protected val AND = Keyword("AND")
+ protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AS = Keyword("AS")
protected val ASC = Keyword("ASC")
- protected val APPROXIMATE = Keyword("APPROXIMATE")
protected val AVG = Keyword("AVG")
protected val BETWEEN = Keyword("BETWEEN")
protected val BY = Keyword("BY")
@@ -80,9 +81,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val COUNT = Keyword("COUNT")
protected val DESC = Keyword("DESC")
protected val DISTINCT = Keyword("DISTINCT")
+ protected val EXCEPT = Keyword("EXCEPT")
protected val FALSE = Keyword("FALSE")
protected val FIRST = Keyword("FIRST")
- protected val LAST = Keyword("LAST")
protected val FROM = Keyword("FROM")
protected val FULL = Keyword("FULL")
protected val GROUP = Keyword("GROUP")
@@ -91,42 +92,42 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
protected val IN = Keyword("IN")
protected val INNER = Keyword("INNER")
protected val INSERT = Keyword("INSERT")
+ protected val INTERSECT = Keyword("INTERSECT")
protected val INTO = Keyword("INTO")
protected val IS = Keyword("IS")
protected val JOIN = Keyword("JOIN")
+ protected val LAST = Keyword("LAST")
+ protected val LAZY = Keyword("LAZY")
protected val LEFT = Keyword("LEFT")
+ protected val LIKE = Keyword("LIKE")
protected val LIMIT = Keyword("LIMIT")
+ protected val LOWER = Keyword("LOWER")
protected val MAX = Keyword("MAX")
protected val MIN = Keyword("MIN")
protected val NOT = Keyword("NOT")
protected val NULL = Keyword("NULL")
protected val ON = Keyword("ON")
protected val OR = Keyword("OR")
- protected val OVERWRITE = Keyword("OVERWRITE")
- protected val LIKE = Keyword("LIKE")
- protected val RLIKE = Keyword("RLIKE")
- protected val UPPER = Keyword("UPPER")
- protected val LOWER = Keyword("LOWER")
- protected val REGEXP = Keyword("REGEXP")
protected val ORDER = Keyword("ORDER")
protected val OUTER = Keyword("OUTER")
+ protected val OVERWRITE = Keyword("OVERWRITE")
+ protected val REGEXP = Keyword("REGEXP")
protected val RIGHT = Keyword("RIGHT")
+ protected val RLIKE = Keyword("RLIKE")
protected val SELECT = Keyword("SELECT")
protected val SEMI = Keyword("SEMI")
+ protected val SQRT = Keyword("SQRT")
protected val STRING = Keyword("STRING")
+ protected val SUBSTR = Keyword("SUBSTR")
+ protected val SUBSTRING = Keyword("SUBSTRING")
protected val SUM = Keyword("SUM")
protected val TABLE = Keyword("TABLE")
protected val TIMESTAMP = Keyword("TIMESTAMP")
protected val TRUE = Keyword("TRUE")
protected val UNCACHE = Keyword("UNCACHE")
protected val UNION = Keyword("UNION")
+ protected val UPPER = Keyword("UPPER")
protected val WHERE = Keyword("WHERE")
- protected val INTERSECT = Keyword("INTERSECT")
- protected val EXCEPT = Keyword("EXCEPT")
- protected val SUBSTR = Keyword("SUBSTR")
- protected val SUBSTRING = Keyword("SUBSTRING")
- protected val SQRT = Keyword("SQRT")
- protected val ABS = Keyword("ABS")
// Use reflection to find the reserved words defined in this class.
protected val reservedWords =
@@ -183,17 +184,15 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
}
protected lazy val cache: Parser[LogicalPlan] =
- CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ {
- case tableName ~ None =>
- CacheCommand(tableName, true)
- case tableName ~ Some(plan) =>
- CacheTableAsSelectCommand(tableName, plan)
+ CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> select) <~ opt(";") ^^ {
+ case isLazy ~ tableName ~ plan =>
+ CacheTableCommand(tableName, plan, isLazy.isDefined)
}
-
+
protected lazy val unCache: Parser[LogicalPlan] =
UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ {
- case tableName => CacheCommand(tableName, false)
- }
+ case tableName => UncacheTableCommand(tableName)
+ }
protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",")
@@ -283,7 +282,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers {
termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } |
termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } |
termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } |
- termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ {
+ termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ {
case e ~ _ ~ el ~ _ ~ eu => And(GreaterThanOrEqual(e, el), LessThanOrEqual(e, eu))
} |
termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } |
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
index 616f1e2ecb..2059a91ba0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala
@@ -87,7 +87,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog {
tableName: String,
alias: Option[String] = None): LogicalPlan = {
val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
- val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: $tableName"))
+ val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName"))
val tableWithQualifiers = Subquery(tblName, table)
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
index 8366639fa0..9a3848cfc6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala
@@ -56,9 +56,15 @@ case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends
}
/**
- * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command.
+ * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command.
*/
-case class CacheCommand(tableName: String, doCache: Boolean) extends Command
+case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean)
+ extends Command
+
+/**
+ * Returned for the "UNCACHE TABLE tableName" command.
+ */
+case class UncacheTableCommand(tableName: String) extends Command
/**
* Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command.
@@ -75,8 +81,3 @@ case class DescribeCommand(
AttributeReference("data_type", StringType, nullable = false)(),
AttributeReference("comment", StringType, nullable = false)())
}
-
-/**
- * Returned for the "CACHE TABLE tableName AS SELECT .." command.
- */
-case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index aebdbb68e4..3bf7382ac6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,14 +91,10 @@ private[sql] trait CacheManager {
}
/** Removes the data for the given SchemaRDD from the cache */
- private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock {
+ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.optimizedPlan
val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
-
- if (dataIndex < 0) {
- throw new IllegalArgumentException(s"Table $query is not cached.")
- }
-
+ require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
}
@@ -135,5 +131,4 @@ private[sql] trait CacheManager {
case _ =>
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index cec82a7f2d..4f79173a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation(
override def newInstance() = {
new InMemoryRelation(
- output.map(_.newInstance),
+ output.map(_.newInstance()),
useCompression,
batchSize,
storageLevel,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cf93d5ad7b..5c16d0c624 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
- case logical.CacheCommand(tableName, cache) =>
- Seq(execution.CacheCommand(tableName, cache)(context))
- case logical.CacheTableAsSelectCommand(tableName, plan) =>
- Seq(execution.CacheTableAsSelectCommand(tableName, plan))
+ case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
+ Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+ case logical.UncacheTableCommand(tableName) =>
+ Seq(execution.UncacheTableCommand(tableName))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f88099ec07..d49633c24a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -138,49 +138,54 @@ case class ExplainCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+case class CacheTableCommand(
+ tableName: String,
+ plan: Option[LogicalPlan],
+ isLazy: Boolean)
extends LeafNode with Command {
override protected lazy val sideEffectResult = {
- if (doCache) {
- context.cacheTable(tableName)
- } else {
- context.uncacheTable(tableName)
+ import sqlContext._
+
+ plan.foreach(_.registerTempTable(tableName))
+ val schemaRDD = table(tableName)
+ schemaRDD.cache()
+
+ if (!isLazy) {
+ // Performs eager caching
+ schemaRDD.count()
}
+
Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
}
+
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command {
-
+case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
override protected lazy val sideEffectResult: Seq[Row] = {
- Row("# Registered as a temporary table", null, null) +:
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ sqlContext.table(tableName).unpersist()
+ Seq.empty[Row]
}
+
+ override def output: Seq[Attribute] = Seq.empty
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan)
+case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
+ @transient context: SQLContext)
extends LeafNode with Command {
-
- override protected[sql] lazy val sideEffectResult = {
- import sqlContext._
- logicalPlan.registerTempTable(tableName)
- cacheTable(tableName)
- Seq.empty[Row]
- }
- override def output: Seq[Attribute] = Seq.empty
-
+ override protected lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 957388e99b..1e624f9700 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,30 +18,39 @@
package org.apache.spark.sql
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.storage.RDDBlockId
case class BigData(s: String)
class CachedTableSuite extends QueryTest {
- import TestSQLContext._
TestData // Load test tables.
- /**
- * Throws a test failed exception when the number of cached tables differs from the expected
- * number.
- */
def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = {
val planWithCaching = query.queryExecution.withCachedData
val cachedData = planWithCaching collect {
case cached: InMemoryRelation => cached
}
- if (cachedData.size != numCachedTables) {
- fail(
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+ assert(
+ cachedData.size == numCachedTables,
+ s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
planWithCaching)
- }
+ }
+
+ def rddIdOf(tableName: String): Int = {
+ val executedPlan = table(tableName).queryExecution.executedPlan
+ executedPlan.collect {
+ case InMemoryColumnarTableScan(_, _, relation) =>
+ relation.cachedColumnBuffers.id
+ case _ =>
+ fail(s"Table $tableName is not cached\n" + executedPlan)
+ }.head
+ }
+
+ def isMaterialized(rddId: Int): Boolean = {
+ sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
test("too big for memory") {
@@ -52,10 +61,33 @@ class CachedTableSuite extends QueryTest {
uncacheTable("bigData")
}
- test("calling .cache() should use inmemory columnar caching") {
+ test("calling .cache() should use in-memory columnar caching") {
table("testData").cache()
+ assertCached(table("testData"))
+ }
+
+ test("calling .unpersist() should drop in-memory columnar cache") {
+ table("testData").cache()
+ table("testData").count()
+ table("testData").unpersist(true)
+ assertCached(table("testData"), 0)
+ }
+
+ test("isCached") {
+ cacheTable("testData")
assertCached(table("testData"))
+ assert(table("testData").queryExecution.withCachedData match {
+ case _: InMemoryRelation => true
+ case _ => false
+ })
+
+ uncacheTable("testData")
+ assert(!isCached("testData"))
+ assert(table("testData").queryExecution.withCachedData match {
+ case _: InMemoryRelation => false
+ case _ => true
+ })
}
test("SPARK-1669: cacheTable should be idempotent") {
@@ -64,32 +96,27 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData")
assertCached(table("testData"))
- cacheTable("testData")
- table("testData").queryExecution.analyzed match {
- case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) =>
- fail("cacheTable is not idempotent")
+ assertResult(1, "InMemoryRelation not found, testData should have been cached") {
+ table("testData").queryExecution.withCachedData.collect {
+ case r: InMemoryRelation => r
+ }.size
+ }
- case _ =>
+ cacheTable("testData")
+ assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
+ table("testData").queryExecution.withCachedData.collect {
+ case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
+ }.size
}
}
test("read from cached table and uncache") {
cacheTable("testData")
-
- checkAnswer(
- table("testData"),
- testData.collect().toSeq
- )
-
+ checkAnswer(table("testData"), testData.collect().toSeq)
assertCached(table("testData"))
uncacheTable("testData")
-
- checkAnswer(
- table("testData"),
- testData.collect().toSeq
- )
-
+ checkAnswer(table("testData"), testData.collect().toSeq)
assertCached(table("testData"), 0)
}
@@ -99,10 +126,12 @@ class CachedTableSuite extends QueryTest {
}
}
- test("SELECT Star Cached Table") {
+ test("SELECT star from cached table") {
sql("SELECT * FROM testData").registerTempTable("selectStar")
cacheTable("selectStar")
- sql("SELECT * FROM selectStar WHERE key = 1").collect()
+ checkAnswer(
+ sql("SELECT * FROM selectStar WHERE key = 1"),
+ Seq(Row(1, "1")))
uncacheTable("selectStar")
}
@@ -120,23 +149,57 @@ class CachedTableSuite extends QueryTest {
sql("CACHE TABLE testData")
assertCached(table("testData"))
- assert(isCached("testData"), "Table 'testData' should be cached")
+ val rddId = rddIdOf("testData")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
sql("UNCACHE TABLE testData")
- assertCached(table("testData"), 0)
assert(!isCached("testData"), "Table 'testData' should not be cached")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
-
- test("CACHE TABLE tableName AS SELECT Star Table") {
+
+ test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
- assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
-
- test("'CACHE TABLE tableName AS SELECT ..'") {
- sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+
+ test("CACHE TABLE tableName AS SELECT ...") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
+
+ test("CACHE LAZY TABLE tableName") {
+ sql("CACHE LAZY TABLE testData")
+ assertCached(table("testData"))
+
+ val rddId = rddIdOf("testData")
+ assert(
+ !isMaterialized(rddId),
+ "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+ sql("SELECT COUNT(*) FROM testData").collect()
+ assert(
+ isMaterialized(rddId),
+ "Lazily cached in-memory table should have been materialized")
+
+ uncacheTable("testData")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
index e7e1cb980c..c5844e92ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical
/**
- * A parser that recognizes all HiveQL constructs together with several Spark SQL specific
+ * A parser that recognizes all HiveQL constructs together with several Spark SQL specific
* extensions like CACHE TABLE and UNCACHE TABLE.
*/
-private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers {
-
+private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers {
+
def apply(input: String): LogicalPlan = {
// Special-case out set commands since the value fields can be
// complex to handle without RegexParsers. Also this approach
@@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr
protected case class Keyword(str: String)
- protected val CACHE = Keyword("CACHE")
- protected val SET = Keyword("SET")
protected val ADD = Keyword("ADD")
- protected val JAR = Keyword("JAR")
- protected val TABLE = Keyword("TABLE")
protected val AS = Keyword("AS")
- protected val UNCACHE = Keyword("UNCACHE")
- protected val FILE = Keyword("FILE")
+ protected val CACHE = Keyword("CACHE")
protected val DFS = Keyword("DFS")
+ protected val FILE = Keyword("FILE")
+ protected val JAR = Keyword("JAR")
+ protected val LAZY = Keyword("LAZY")
+ protected val SET = Keyword("SET")
protected val SOURCE = Keyword("SOURCE")
+ protected val TABLE = Keyword("TABLE")
+ protected val UNCACHE = Keyword("UNCACHE")
protected implicit def asParser(k: Keyword): Parser[String] =
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
@@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr
override val lexical = new SqlLexical(reservedWords)
- protected lazy val query: Parser[LogicalPlan] =
+ protected lazy val query: Parser[LogicalPlan] =
cache | uncache | addJar | addFile | dfs | source | hiveQl
protected lazy val hiveQl: Parser[LogicalPlan] =
- remainingQuery ^^ {
- case r => HiveQl.createPlan(r.trim())
+ restInput ^^ {
+ case statement => HiveQl.createPlan(statement.trim())
}
- /** It returns all remaining query */
- protected lazy val remainingQuery: Parser[String] = new Parser[String] {
+ // Returns the whole input string
+ protected lazy val wholeInput: Parser[String] = new Parser[String] {
def apply(in: Input) =
- Success(
- in.source.subSequence(in.offset, in.source.length).toString,
- in.drop(in.source.length()))
+ Success(in.source.toString, in.drop(in.source.length()))
}
- /** It returns all query */
- protected lazy val allQuery: Parser[String] = new Parser[String] {
+ // Returns the rest of the input string that are not parsed yet
+ protected lazy val restInput: Parser[String] = new Parser[String] {
def apply(in: Input) =
- Success(in.source.toString, in.drop(in.source.length()))
+ Success(
+ in.source.subSequence(in.offset, in.source.length).toString,
+ in.drop(in.source.length()))
}
protected lazy val cache: Parser[LogicalPlan] =
- CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ {
- case tableName ~ None => CacheCommand(tableName, true)
- case tableName ~ Some(plan) =>
- CacheTableAsSelectCommand(tableName, plan)
+ CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ {
+ case isLazy ~ tableName ~ plan =>
+ CacheTableCommand(tableName, plan, isLazy.isDefined)
}
protected lazy val uncache: Parser[LogicalPlan] =
UNCACHE ~ TABLE ~> ident ^^ {
- case tableName => CacheCommand(tableName, false)
+ case tableName => UncacheTableCommand(tableName)
}
protected lazy val addJar: Parser[LogicalPlan] =
- ADD ~ JAR ~> remainingQuery ^^ {
- case rq => AddJar(rq.trim())
+ ADD ~ JAR ~> restInput ^^ {
+ case jar => AddJar(jar.trim())
}
protected lazy val addFile: Parser[LogicalPlan] =
- ADD ~ FILE ~> remainingQuery ^^ {
- case rq => AddFile(rq.trim())
+ ADD ~ FILE ~> restInput ^^ {
+ case file => AddFile(file.trim())
}
protected lazy val dfs: Parser[LogicalPlan] =
- DFS ~> allQuery ^^ {
- case aq => NativeCommand(aq.trim())
+ DFS ~> wholeInput ^^ {
+ case command => NativeCommand(command.trim())
}
protected lazy val source: Parser[LogicalPlan] =
- SOURCE ~> remainingQuery ^^ {
- case rq => SourceCommand(rq.trim())
+ SOURCE ~> restInput ^^ {
+ case file => SourceCommand(file.trim())
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index c0e69393cc..a4354c1379 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.SQLConf
@@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath
/** Sets up the system initially or after a RESET command */
- protected def configure() {
+ protected def configure(): Unit = {
setConf("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
setConf("hive.metastore.warehouse.dir", warehousePath)
@@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override lazy val analyzed = {
val describedTables = logical match {
case NativeCommand(describedTable(tbl)) => tbl :: Nil
- case CacheCommand(tbl, _) => tbl :: Nil
+ case CacheTableCommand(tbl, _, _) => tbl :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 158cfb5bbe..2060e1f1a7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,13 +17,13 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{QueryTest, SchemaRDD}
-import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.{QueryTest, SchemaRDD}
+import org.apache.spark.storage.RDDBlockId
class CachedTableSuite extends QueryTest {
- import TestHive._
-
/**
* Throws a test failed exception when the number of cached tables differs from the expected
* number.
@@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest {
case cached: InMemoryRelation => cached
}
- if (cachedData.size != numCachedTables) {
- fail(
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
- planWithCaching)
- }
+ assert(
+ cachedData.size == numCachedTables,
+ s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+ planWithCaching)
+ }
+
+ def rddIdOf(tableName: String): Int = {
+ val executedPlan = table(tableName).queryExecution.executedPlan
+ executedPlan.collect {
+ case InMemoryColumnarTableScan(_, _, relation) =>
+ relation.cachedColumnBuffers.id
+ case _ =>
+ fail(s"Table $tableName is not cached\n" + executedPlan)
+ }.head
+ }
+
+ def isMaterialized(rddId: Int): Boolean = {
+ sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
test("cache table") {
@@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest {
assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
}
- test("CACHE TABLE AS SELECT") {
- assertCached(sql("SELECT * FROM src"), 0)
- sql("CACHE TABLE test AS SELECT key FROM src")
+ test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
+ assertCached(table("testCacheTable"))
- checkAnswer(
- sql("SELECT * FROM test"),
- sql("SELECT key FROM src").collect().toSeq)
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
- assertCached(sql("SELECT * FROM test"))
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
+
+ test("CACHE TABLE tableName AS SELECT ...") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
- assertCached(sql("SELECT * FROM test JOIN test"), 2)
+ test("CACHE LAZY TABLE tableName") {
+ sql("CACHE LAZY TABLE src")
+ assertCached(table("src"))
+
+ val rddId = rddIdOf("src")
+ assert(
+ !isMaterialized(rddId),
+ "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+ sql("SELECT COUNT(*) FROM src").collect()
+ assert(
+ isMaterialized(rddId),
+ "Lazily cached in-memory table should have been materialized")
+
+ uncacheTable("src")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}