diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-10-05 17:51:59 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-10-05 17:51:59 -0700 |
commit | 34b97a067d1b370fbed8ecafab2f48501a35d783 (patch) | |
tree | dc035a45d08a2b7b9d4a5cb5e527b880a5125402 /sql/hive | |
parent | 58f5361caaa2f898e38ae4b3794167881e20a818 (diff) | |
download | spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.gz spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.bz2 spark-34b97a067d1b370fbed8ecafab2f48501a35d783.zip |
[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance.
This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching.
Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement.
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #2513 from liancheng/eager-caching and squashes the following commits:
fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
Diffstat (limited to 'sql/hive')
3 files changed, 97 insertions, 53 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index e7e1cb980c..c5844e92ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical /** - * A parser that recognizes all HiveQL constructs together with several Spark SQL specific + * A parser that recognizes all HiveQL constructs together with several Spark SQL specific * extensions like CACHE TABLE and UNCACHE TABLE. */ -private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { - +private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { + def apply(input: String): LogicalPlan = { // Special-case out set commands since the value fields can be // complex to handle without RegexParsers. Also this approach @@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr protected case class Keyword(str: String) - protected val CACHE = Keyword("CACHE") - protected val SET = Keyword("SET") protected val ADD = Keyword("ADD") - protected val JAR = Keyword("JAR") - protected val TABLE = Keyword("TABLE") protected val AS = Keyword("AS") - protected val UNCACHE = Keyword("UNCACHE") - protected val FILE = Keyword("FILE") + protected val CACHE = Keyword("CACHE") protected val DFS = Keyword("DFS") + protected val FILE = Keyword("FILE") + protected val JAR = Keyword("JAR") + protected val LAZY = Keyword("LAZY") + protected val SET = Keyword("SET") protected val SOURCE = Keyword("SOURCE") + protected val TABLE = Keyword("TABLE") + protected val UNCACHE = Keyword("UNCACHE") protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) @@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr override val lexical = new SqlLexical(reservedWords) - protected lazy val query: Parser[LogicalPlan] = + protected lazy val query: Parser[LogicalPlan] = cache | uncache | addJar | addFile | dfs | source | hiveQl protected lazy val hiveQl: Parser[LogicalPlan] = - remainingQuery ^^ { - case r => HiveQl.createPlan(r.trim()) + restInput ^^ { + case statement => HiveQl.createPlan(statement.trim()) } - /** It returns all remaining query */ - protected lazy val remainingQuery: Parser[String] = new Parser[String] { + // Returns the whole input string + protected lazy val wholeInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success( - in.source.subSequence(in.offset, in.source.length).toString, - in.drop(in.source.length())) + Success(in.source.toString, in.drop(in.source.length())) } - /** It returns all query */ - protected lazy val allQuery: Parser[String] = new Parser[String] { + // Returns the rest of the input string that are not parsed yet + protected lazy val restInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success(in.source.toString, in.drop(in.source.length())) + Success( + in.source.subSequence(in.offset, in.source.length).toString, + in.drop(in.source.length())) } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ { - case tableName ~ None => CacheCommand(tableName, true) - case tableName ~ Some(plan) => - CacheTableAsSelectCommand(tableName, plan) + CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan, isLazy.isDefined) } protected lazy val uncache: Parser[LogicalPlan] = UNCACHE ~ TABLE ~> ident ^^ { - case tableName => CacheCommand(tableName, false) + case tableName => UncacheTableCommand(tableName) } protected lazy val addJar: Parser[LogicalPlan] = - ADD ~ JAR ~> remainingQuery ^^ { - case rq => AddJar(rq.trim()) + ADD ~ JAR ~> restInput ^^ { + case jar => AddJar(jar.trim()) } protected lazy val addFile: Parser[LogicalPlan] = - ADD ~ FILE ~> remainingQuery ^^ { - case rq => AddFile(rq.trim()) + ADD ~ FILE ~> restInput ^^ { + case file => AddFile(file.trim()) } protected lazy val dfs: Parser[LogicalPlan] = - DFS ~> allQuery ^^ { - case aq => NativeCommand(aq.trim()) + DFS ~> wholeInput ^^ { + case command => NativeCommand(command.trim()) } protected lazy val source: Parser[LogicalPlan] = - SOURCE ~> remainingQuery ^^ { - case rq => SourceCommand(rq.trim()) + SOURCE ~> restInput ^^ { + case file => SourceCommand(file.trim()) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c0e69393cc..a4354c1379 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf @@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath /** Sets up the system initially or after a RESET command */ - protected def configure() { + protected def configure(): Unit = { setConf("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastorePath;create=true") setConf("hive.metastore.warehouse.dir", warehousePath) @@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override lazy val analyzed = { val describedTables = logical match { case NativeCommand(describedTable(tbl)) => tbl :: Nil - case CacheCommand(tbl, _) => tbl :: Nil + case CacheTableCommand(tbl, _, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 158cfb5bbe..2060e1f1a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{QueryTest, SchemaRDD} -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{QueryTest, SchemaRDD} +import org.apache.spark.storage.RDDBlockId class CachedTableSuite extends QueryTest { - import TestHive._ - /** * Throws a test failed exception when the number of cached tables differs from the expected * number. @@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest { case cached: InMemoryRelation => cached } - if (cachedData.size != numCachedTables) { - fail( - s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + - planWithCaching) - } + assert( + cachedData.size == numCachedTables, + s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + + planWithCaching) + } + + def rddIdOf(tableName: String): Int = { + val executedPlan = table(tableName).queryExecution.executedPlan + executedPlan.collect { + case InMemoryColumnarTableScan(_, _, relation) => + relation.cachedColumnBuffers.id + case _ => + fail(s"Table $tableName is not cached\n" + executedPlan) + }.head + } + + def isMaterialized(rddId: Int): Boolean = { + sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } test("cache table") { @@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest { assert(!TestHive.isCached("src"), "Table 'src' should not be cached") } - test("CACHE TABLE AS SELECT") { - assertCached(sql("SELECT * FROM src"), 0) - sql("CACHE TABLE test AS SELECT key FROM src") + test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM src") + assertCached(table("testCacheTable")) - checkAnswer( - sql("SELECT * FROM test"), - sql("SELECT key FROM src").collect().toSeq) + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - assertCached(sql("SELECT * FROM test")) + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } + + test("CACHE TABLE tableName AS SELECT ...") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } - assertCached(sql("SELECT * FROM test JOIN test"), 2) + test("CACHE LAZY TABLE tableName") { + sql("CACHE LAZY TABLE src") + assertCached(table("src")) + + val rddId = rddIdOf("src") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM src").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + + uncacheTable("src") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } |