aboutsummaryrefslogtreecommitdiff
path: root/sql/hive
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-05 17:51:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-05 17:51:59 -0700
commit34b97a067d1b370fbed8ecafab2f48501a35d783 (patch)
treedc035a45d08a2b7b9d4a5cb5e527b880a5125402 /sql/hive
parent58f5361caaa2f898e38ae4b3794167881e20a818 (diff)
downloadspark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.gz
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.bz2
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.zip
[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance. This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching. Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2513 from liancheng/eager-caching and squashes the following commits: fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
Diffstat (limited to 'sql/hive')
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala66
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala6
-rw-r--r--sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala78
3 files changed, 97 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
index e7e1cb980c..c5844e92ea 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala
@@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.SqlLexical
/**
- * A parser that recognizes all HiveQL constructs together with several Spark SQL specific
+ * A parser that recognizes all HiveQL constructs together with several Spark SQL specific
* extensions like CACHE TABLE and UNCACHE TABLE.
*/
-private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers {
-
+private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers {
+
def apply(input: String): LogicalPlan = {
// Special-case out set commands since the value fields can be
// complex to handle without RegexParsers. Also this approach
@@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr
protected case class Keyword(str: String)
- protected val CACHE = Keyword("CACHE")
- protected val SET = Keyword("SET")
protected val ADD = Keyword("ADD")
- protected val JAR = Keyword("JAR")
- protected val TABLE = Keyword("TABLE")
protected val AS = Keyword("AS")
- protected val UNCACHE = Keyword("UNCACHE")
- protected val FILE = Keyword("FILE")
+ protected val CACHE = Keyword("CACHE")
protected val DFS = Keyword("DFS")
+ protected val FILE = Keyword("FILE")
+ protected val JAR = Keyword("JAR")
+ protected val LAZY = Keyword("LAZY")
+ protected val SET = Keyword("SET")
protected val SOURCE = Keyword("SOURCE")
+ protected val TABLE = Keyword("TABLE")
+ protected val UNCACHE = Keyword("UNCACHE")
protected implicit def asParser(k: Keyword): Parser[String] =
lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _)
@@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr
override val lexical = new SqlLexical(reservedWords)
- protected lazy val query: Parser[LogicalPlan] =
+ protected lazy val query: Parser[LogicalPlan] =
cache | uncache | addJar | addFile | dfs | source | hiveQl
protected lazy val hiveQl: Parser[LogicalPlan] =
- remainingQuery ^^ {
- case r => HiveQl.createPlan(r.trim())
+ restInput ^^ {
+ case statement => HiveQl.createPlan(statement.trim())
}
- /** It returns all remaining query */
- protected lazy val remainingQuery: Parser[String] = new Parser[String] {
+ // Returns the whole input string
+ protected lazy val wholeInput: Parser[String] = new Parser[String] {
def apply(in: Input) =
- Success(
- in.source.subSequence(in.offset, in.source.length).toString,
- in.drop(in.source.length()))
+ Success(in.source.toString, in.drop(in.source.length()))
}
- /** It returns all query */
- protected lazy val allQuery: Parser[String] = new Parser[String] {
+ // Returns the rest of the input string that are not parsed yet
+ protected lazy val restInput: Parser[String] = new Parser[String] {
def apply(in: Input) =
- Success(in.source.toString, in.drop(in.source.length()))
+ Success(
+ in.source.subSequence(in.offset, in.source.length).toString,
+ in.drop(in.source.length()))
}
protected lazy val cache: Parser[LogicalPlan] =
- CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ {
- case tableName ~ None => CacheCommand(tableName, true)
- case tableName ~ Some(plan) =>
- CacheTableAsSelectCommand(tableName, plan)
+ CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ {
+ case isLazy ~ tableName ~ plan =>
+ CacheTableCommand(tableName, plan, isLazy.isDefined)
}
protected lazy val uncache: Parser[LogicalPlan] =
UNCACHE ~ TABLE ~> ident ^^ {
- case tableName => CacheCommand(tableName, false)
+ case tableName => UncacheTableCommand(tableName)
}
protected lazy val addJar: Parser[LogicalPlan] =
- ADD ~ JAR ~> remainingQuery ^^ {
- case rq => AddJar(rq.trim())
+ ADD ~ JAR ~> restInput ^^ {
+ case jar => AddJar(jar.trim())
}
protected lazy val addFile: Parser[LogicalPlan] =
- ADD ~ FILE ~> remainingQuery ^^ {
- case rq => AddFile(rq.trim())
+ ADD ~ FILE ~> restInput ^^ {
+ case file => AddFile(file.trim())
}
protected lazy val dfs: Parser[LogicalPlan] =
- DFS ~> allQuery ^^ {
- case aq => NativeCommand(aq.trim())
+ DFS ~> wholeInput ^^ {
+ case command => NativeCommand(command.trim())
}
protected lazy val source: Parser[LogicalPlan] =
- SOURCE ~> remainingQuery ^^ {
- case rq => SourceCommand(rq.trim())
+ SOURCE ~> restInput ^^ {
+ case file => SourceCommand(file.trim())
}
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
index c0e69393cc..a4354c1379 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand}
+import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.hive._
import org.apache.spark.sql.SQLConf
@@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath
/** Sets up the system initially or after a RESET command */
- protected def configure() {
+ protected def configure(): Unit = {
setConf("javax.jdo.option.ConnectionURL",
s"jdbc:derby:;databaseName=$metastorePath;create=true")
setConf("hive.metastore.warehouse.dir", warehousePath)
@@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) {
override lazy val analyzed = {
val describedTables = logical match {
case NativeCommand(describedTable(tbl)) => tbl :: Nil
- case CacheCommand(tbl, _) => tbl :: Nil
+ case CacheTableCommand(tbl, _, _) => tbl :: Nil
case _ => Nil
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index 158cfb5bbe..2060e1f1a7 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -17,13 +17,13 @@
package org.apache.spark.sql.hive
-import org.apache.spark.sql.{QueryTest, SchemaRDD}
-import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.hive.test.TestHive._
+import org.apache.spark.sql.{QueryTest, SchemaRDD}
+import org.apache.spark.storage.RDDBlockId
class CachedTableSuite extends QueryTest {
- import TestHive._
-
/**
* Throws a test failed exception when the number of cached tables differs from the expected
* number.
@@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest {
case cached: InMemoryRelation => cached
}
- if (cachedData.size != numCachedTables) {
- fail(
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
- planWithCaching)
- }
+ assert(
+ cachedData.size == numCachedTables,
+ s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+ planWithCaching)
+ }
+
+ def rddIdOf(tableName: String): Int = {
+ val executedPlan = table(tableName).queryExecution.executedPlan
+ executedPlan.collect {
+ case InMemoryColumnarTableScan(_, _, relation) =>
+ relation.cachedColumnBuffers.id
+ case _ =>
+ fail(s"Table $tableName is not cached\n" + executedPlan)
+ }.head
+ }
+
+ def isMaterialized(rddId: Int): Boolean = {
+ sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
test("cache table") {
@@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest {
assert(!TestHive.isCached("src"), "Table 'src' should not be cached")
}
- test("CACHE TABLE AS SELECT") {
- assertCached(sql("SELECT * FROM src"), 0)
- sql("CACHE TABLE test AS SELECT key FROM src")
+ test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
+ sql("CACHE TABLE testCacheTable AS SELECT * FROM src")
+ assertCached(table("testCacheTable"))
- checkAnswer(
- sql("SELECT * FROM test"),
- sql("SELECT key FROM src").collect().toSeq)
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
- assertCached(sql("SELECT * FROM test"))
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
+
+ test("CACHE TABLE tableName AS SELECT ...") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
+ uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
- assertCached(sql("SELECT * FROM test JOIN test"), 2)
+ test("CACHE LAZY TABLE tableName") {
+ sql("CACHE LAZY TABLE src")
+ assertCached(table("src"))
+
+ val rddId = rddIdOf("src")
+ assert(
+ !isMaterialized(rddId),
+ "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+ sql("SELECT COUNT(*) FROM src").collect()
+ assert(
+ isMaterialized(rddId),
+ "Lazily cached in-memory table should have been materialized")
+
+ uncacheTable("src")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}