diff options
11 files changed, 265 insertions, 158 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala index 26336332c0..854b5b461b 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala @@ -67,11 +67,12 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) + protected val ABS = Keyword("ABS") protected val ALL = Keyword("ALL") protected val AND = Keyword("AND") + protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AS = Keyword("AS") protected val ASC = Keyword("ASC") - protected val APPROXIMATE = Keyword("APPROXIMATE") protected val AVG = Keyword("AVG") protected val BETWEEN = Keyword("BETWEEN") protected val BY = Keyword("BY") @@ -80,9 +81,9 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val COUNT = Keyword("COUNT") protected val DESC = Keyword("DESC") protected val DISTINCT = Keyword("DISTINCT") + protected val EXCEPT = Keyword("EXCEPT") protected val FALSE = Keyword("FALSE") protected val FIRST = Keyword("FIRST") - protected val LAST = Keyword("LAST") protected val FROM = Keyword("FROM") protected val FULL = Keyword("FULL") protected val GROUP = Keyword("GROUP") @@ -91,42 +92,42 @@ class SqlParser extends StandardTokenParsers with PackratParsers { protected val IN = Keyword("IN") protected val INNER = Keyword("INNER") protected val INSERT = Keyword("INSERT") + protected val INTERSECT = Keyword("INTERSECT") protected val INTO = Keyword("INTO") protected val IS = Keyword("IS") protected val JOIN = Keyword("JOIN") + protected val LAST = Keyword("LAST") + protected val LAZY = Keyword("LAZY") protected val LEFT = Keyword("LEFT") + protected val LIKE = Keyword("LIKE") protected val LIMIT = Keyword("LIMIT") + protected val LOWER = Keyword("LOWER") protected val MAX = Keyword("MAX") protected val MIN = Keyword("MIN") protected val NOT = Keyword("NOT") protected val NULL = Keyword("NULL") protected val ON = Keyword("ON") protected val OR = Keyword("OR") - protected val OVERWRITE = Keyword("OVERWRITE") - protected val LIKE = Keyword("LIKE") - protected val RLIKE = Keyword("RLIKE") - protected val UPPER = Keyword("UPPER") - protected val LOWER = Keyword("LOWER") - protected val REGEXP = Keyword("REGEXP") protected val ORDER = Keyword("ORDER") protected val OUTER = Keyword("OUTER") + protected val OVERWRITE = Keyword("OVERWRITE") + protected val REGEXP = Keyword("REGEXP") protected val RIGHT = Keyword("RIGHT") + protected val RLIKE = Keyword("RLIKE") protected val SELECT = Keyword("SELECT") protected val SEMI = Keyword("SEMI") + protected val SQRT = Keyword("SQRT") protected val STRING = Keyword("STRING") + protected val SUBSTR = Keyword("SUBSTR") + protected val SUBSTRING = Keyword("SUBSTRING") protected val SUM = Keyword("SUM") protected val TABLE = Keyword("TABLE") protected val TIMESTAMP = Keyword("TIMESTAMP") protected val TRUE = Keyword("TRUE") protected val UNCACHE = Keyword("UNCACHE") protected val UNION = Keyword("UNION") + protected val UPPER = Keyword("UPPER") protected val WHERE = Keyword("WHERE") - protected val INTERSECT = Keyword("INTERSECT") - protected val EXCEPT = Keyword("EXCEPT") - protected val SUBSTR = Keyword("SUBSTR") - protected val SUBSTRING = Keyword("SUBSTRING") - protected val SQRT = Keyword("SQRT") - protected val ABS = Keyword("ABS") // Use reflection to find the reserved words defined in this class. protected val reservedWords = @@ -183,17 +184,15 @@ class SqlParser extends StandardTokenParsers with PackratParsers { } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~> select) <~ opt(";") ^^ { - case tableName ~ None => - CacheCommand(tableName, true) - case tableName ~ Some(plan) => - CacheTableAsSelectCommand(tableName, plan) + CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> select) <~ opt(";") ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan, isLazy.isDefined) } - + protected lazy val unCache: Parser[LogicalPlan] = UNCACHE ~ TABLE ~> ident <~ opt(";") ^^ { - case tableName => CacheCommand(tableName, false) - } + case tableName => UncacheTableCommand(tableName) + } protected lazy val projections: Parser[Seq[Expression]] = repsep(projection, ",") @@ -283,7 +282,7 @@ class SqlParser extends StandardTokenParsers with PackratParsers { termExpression ~ ">=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => GreaterThanOrEqual(e1, e2) } | termExpression ~ "!=" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | termExpression ~ "<>" ~ termExpression ^^ { case e1 ~ _ ~ e2 => Not(EqualTo(e1, e2)) } | - termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { + termExpression ~ BETWEEN ~ termExpression ~ AND ~ termExpression ^^ { case e ~ _ ~ el ~ _ ~ eu => And(GreaterThanOrEqual(e, el), LessThanOrEqual(e, eu)) } | termExpression ~ RLIKE ~ termExpression ^^ { case e1 ~ _ ~ e2 => RLike(e1, e2) } | diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 616f1e2ecb..2059a91ba0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -87,7 +87,7 @@ class SimpleCatalog(val caseSensitive: Boolean) extends Catalog { tableName: String, alias: Option[String] = None): LogicalPlan = { val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName) - val table = tables.get(tblName).getOrElse(sys.error(s"Table Not Found: $tableName")) + val table = tables.getOrElse(tblName, sys.error(s"Table Not Found: $tableName")) val tableWithQualifiers = Subquery(tblName, table) // If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala index 8366639fa0..9a3848cfc6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/commands.scala @@ -56,9 +56,15 @@ case class ExplainCommand(plan: LogicalPlan, extended: Boolean = false) extends } /** - * Returned for the "CACHE TABLE tableName" and "UNCACHE TABLE tableName" command. + * Returned for the "CACHE TABLE tableName [AS SELECT ...]" command. */ -case class CacheCommand(tableName: String, doCache: Boolean) extends Command +case class CacheTableCommand(tableName: String, plan: Option[LogicalPlan], isLazy: Boolean) + extends Command + +/** + * Returned for the "UNCACHE TABLE tableName" command. + */ +case class UncacheTableCommand(tableName: String) extends Command /** * Returned for the "DESCRIBE [EXTENDED] [dbName.]tableName" command. @@ -75,8 +81,3 @@ case class DescribeCommand( AttributeReference("data_type", StringType, nullable = false)(), AttributeReference("comment", StringType, nullable = false)()) } - -/** - * Returned for the "CACHE TABLE tableName AS SELECT .." command. - */ -case class CacheTableAsSelectCommand(tableName: String, plan: LogicalPlan) extends Command diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala index aebdbb68e4..3bf7382ac6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala @@ -91,14 +91,10 @@ private[sql] trait CacheManager { } /** Removes the data for the given SchemaRDD from the cache */ - private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock { + private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock { val planToCache = query.queryExecution.optimizedPlan val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache)) - - if (dataIndex < 0) { - throw new IllegalArgumentException(s"Table $query is not cached.") - } - + require(dataIndex >= 0, s"Table $query is not cached.") cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking) cachedData.remove(dataIndex) } @@ -135,5 +131,4 @@ private[sql] trait CacheManager { case _ => } } - } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala index cec82a7f2d..4f79173a26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala @@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation( override def newInstance() = { new InMemoryRelation( - output.map(_.newInstance), + output.map(_.newInstance()), useCompression, batchSize, storageLevel, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index cf93d5ad7b..5c16d0c624 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { Seq(execution.SetCommand(key, value, plan.output)(context)) case logical.ExplainCommand(logicalPlan, extended) => Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context)) - case logical.CacheCommand(tableName, cache) => - Seq(execution.CacheCommand(tableName, cache)(context)) - case logical.CacheTableAsSelectCommand(tableName, plan) => - Seq(execution.CacheTableAsSelectCommand(tableName, plan)) + case logical.CacheTableCommand(tableName, optPlan, isLazy) => + Seq(execution.CacheTableCommand(tableName, optPlan, isLazy)) + case logical.UncacheTableCommand(tableName) => + Seq(execution.UncacheTableCommand(tableName)) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala index f88099ec07..d49633c24a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala @@ -138,49 +138,54 @@ case class ExplainCommand( * :: DeveloperApi :: */ @DeveloperApi -case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext) +case class CacheTableCommand( + tableName: String, + plan: Option[LogicalPlan], + isLazy: Boolean) extends LeafNode with Command { override protected lazy val sideEffectResult = { - if (doCache) { - context.cacheTable(tableName) - } else { - context.uncacheTable(tableName) + import sqlContext._ + + plan.foreach(_.registerTempTable(tableName)) + val schemaRDD = table(tableName) + schemaRDD.cache() + + if (!isLazy) { + // Performs eager caching + schemaRDD.count() } + Seq.empty[Row] } override def output: Seq[Attribute] = Seq.empty } + /** * :: DeveloperApi :: */ @DeveloperApi -case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( - @transient context: SQLContext) - extends LeafNode with Command { - +case class UncacheTableCommand(tableName: String) extends LeafNode with Command { override protected lazy val sideEffectResult: Seq[Row] = { - Row("# Registered as a temporary table", null, null) +: - child.output.map(field => Row(field.name, field.dataType.toString, null)) + sqlContext.table(tableName).unpersist() + Seq.empty[Row] } + + override def output: Seq[Attribute] = Seq.empty } /** * :: DeveloperApi :: */ @DeveloperApi -case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan) +case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])( + @transient context: SQLContext) extends LeafNode with Command { - - override protected[sql] lazy val sideEffectResult = { - import sqlContext._ - logicalPlan.registerTempTable(tableName) - cacheTable(tableName) - Seq.empty[Row] - } - override def output: Seq[Attribute] = Seq.empty - + override protected lazy val sideEffectResult: Seq[Row] = { + Row("# Registered as a temporary table", null, null) +: + child.output.map(field => Row(field.name, field.dataType.toString, null)) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala index 957388e99b..1e624f9700 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala @@ -18,30 +18,39 @@ package org.apache.spark.sql import org.apache.spark.sql.TestData._ -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} -import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} +import org.apache.spark.sql.test.TestSQLContext._ +import org.apache.spark.storage.RDDBlockId case class BigData(s: String) class CachedTableSuite extends QueryTest { - import TestSQLContext._ TestData // Load test tables. - /** - * Throws a test failed exception when the number of cached tables differs from the expected - * number. - */ def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = { val planWithCaching = query.queryExecution.withCachedData val cachedData = planWithCaching collect { case cached: InMemoryRelation => cached } - if (cachedData.size != numCachedTables) { - fail( - s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + + assert( + cachedData.size == numCachedTables, + s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + planWithCaching) - } + } + + def rddIdOf(tableName: String): Int = { + val executedPlan = table(tableName).queryExecution.executedPlan + executedPlan.collect { + case InMemoryColumnarTableScan(_, _, relation) => + relation.cachedColumnBuffers.id + case _ => + fail(s"Table $tableName is not cached\n" + executedPlan) + }.head + } + + def isMaterialized(rddId: Int): Boolean = { + sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } test("too big for memory") { @@ -52,10 +61,33 @@ class CachedTableSuite extends QueryTest { uncacheTable("bigData") } - test("calling .cache() should use inmemory columnar caching") { + test("calling .cache() should use in-memory columnar caching") { table("testData").cache() + assertCached(table("testData")) + } + + test("calling .unpersist() should drop in-memory columnar cache") { + table("testData").cache() + table("testData").count() + table("testData").unpersist(true) + assertCached(table("testData"), 0) + } + + test("isCached") { + cacheTable("testData") assertCached(table("testData")) + assert(table("testData").queryExecution.withCachedData match { + case _: InMemoryRelation => true + case _ => false + }) + + uncacheTable("testData") + assert(!isCached("testData")) + assert(table("testData").queryExecution.withCachedData match { + case _: InMemoryRelation => false + case _ => true + }) } test("SPARK-1669: cacheTable should be idempotent") { @@ -64,32 +96,27 @@ class CachedTableSuite extends QueryTest { cacheTable("testData") assertCached(table("testData")) - cacheTable("testData") - table("testData").queryExecution.analyzed match { - case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => - fail("cacheTable is not idempotent") + assertResult(1, "InMemoryRelation not found, testData should have been cached") { + table("testData").queryExecution.withCachedData.collect { + case r: InMemoryRelation => r + }.size + } - case _ => + cacheTable("testData") + assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") { + table("testData").queryExecution.withCachedData.collect { + case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r + }.size } } test("read from cached table and uncache") { cacheTable("testData") - - checkAnswer( - table("testData"), - testData.collect().toSeq - ) - + checkAnswer(table("testData"), testData.collect().toSeq) assertCached(table("testData")) uncacheTable("testData") - - checkAnswer( - table("testData"), - testData.collect().toSeq - ) - + checkAnswer(table("testData"), testData.collect().toSeq) assertCached(table("testData"), 0) } @@ -99,10 +126,12 @@ class CachedTableSuite extends QueryTest { } } - test("SELECT Star Cached Table") { + test("SELECT star from cached table") { sql("SELECT * FROM testData").registerTempTable("selectStar") cacheTable("selectStar") - sql("SELECT * FROM selectStar WHERE key = 1").collect() + checkAnswer( + sql("SELECT * FROM selectStar WHERE key = 1"), + Seq(Row(1, "1"))) uncacheTable("selectStar") } @@ -120,23 +149,57 @@ class CachedTableSuite extends QueryTest { sql("CACHE TABLE testData") assertCached(table("testData")) - assert(isCached("testData"), "Table 'testData' should be cached") + val rddId = rddIdOf("testData") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") sql("UNCACHE TABLE testData") - assertCached(table("testData"), 0) assert(!isCached("testData"), "Table 'testData' should not be cached") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } - - test("CACHE TABLE tableName AS SELECT Star Table") { + + test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - sql("SELECT * FROM testCacheTable WHERE key = 1").collect() - assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } - - test("'CACHE TABLE tableName AS SELECT ..'") { - sql("CACHE TABLE testCacheTable AS SELECT * FROM testData") - assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached") + + test("CACHE TABLE tableName AS SELECT ...") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } + + test("CACHE LAZY TABLE tableName") { + sql("CACHE LAZY TABLE testData") + assertCached(table("testData")) + + val rddId = rddIdOf("testData") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM testData").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + + uncacheTable("testData") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala index e7e1cb980c..c5844e92ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/ExtendedHiveQlParser.scala @@ -24,11 +24,11 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.SqlLexical /** - * A parser that recognizes all HiveQL constructs together with several Spark SQL specific + * A parser that recognizes all HiveQL constructs together with several Spark SQL specific * extensions like CACHE TABLE and UNCACHE TABLE. */ -private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { - +private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with PackratParsers { + def apply(input: String): LogicalPlan = { // Special-case out set commands since the value fields can be // complex to handle without RegexParsers. Also this approach @@ -54,16 +54,17 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr protected case class Keyword(str: String) - protected val CACHE = Keyword("CACHE") - protected val SET = Keyword("SET") protected val ADD = Keyword("ADD") - protected val JAR = Keyword("JAR") - protected val TABLE = Keyword("TABLE") protected val AS = Keyword("AS") - protected val UNCACHE = Keyword("UNCACHE") - protected val FILE = Keyword("FILE") + protected val CACHE = Keyword("CACHE") protected val DFS = Keyword("DFS") + protected val FILE = Keyword("FILE") + protected val JAR = Keyword("JAR") + protected val LAZY = Keyword("LAZY") + protected val SET = Keyword("SET") protected val SOURCE = Keyword("SOURCE") + protected val TABLE = Keyword("TABLE") + protected val UNCACHE = Keyword("UNCACHE") protected implicit def asParser(k: Keyword): Parser[String] = lexical.allCaseVersions(k.str).map(x => x : Parser[String]).reduce(_ | _) @@ -79,57 +80,56 @@ private[hive] class ExtendedHiveQlParser extends StandardTokenParsers with Packr override val lexical = new SqlLexical(reservedWords) - protected lazy val query: Parser[LogicalPlan] = + protected lazy val query: Parser[LogicalPlan] = cache | uncache | addJar | addFile | dfs | source | hiveQl protected lazy val hiveQl: Parser[LogicalPlan] = - remainingQuery ^^ { - case r => HiveQl.createPlan(r.trim()) + restInput ^^ { + case statement => HiveQl.createPlan(statement.trim()) } - /** It returns all remaining query */ - protected lazy val remainingQuery: Parser[String] = new Parser[String] { + // Returns the whole input string + protected lazy val wholeInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success( - in.source.subSequence(in.offset, in.source.length).toString, - in.drop(in.source.length())) + Success(in.source.toString, in.drop(in.source.length())) } - /** It returns all query */ - protected lazy val allQuery: Parser[String] = new Parser[String] { + // Returns the rest of the input string that are not parsed yet + protected lazy val restInput: Parser[String] = new Parser[String] { def apply(in: Input) = - Success(in.source.toString, in.drop(in.source.length())) + Success( + in.source.subSequence(in.offset, in.source.length).toString, + in.drop(in.source.length())) } protected lazy val cache: Parser[LogicalPlan] = - CACHE ~ TABLE ~> ident ~ opt(AS ~> hiveQl) ^^ { - case tableName ~ None => CacheCommand(tableName, true) - case tableName ~ Some(plan) => - CacheTableAsSelectCommand(tableName, plan) + CACHE ~> opt(LAZY) ~ (TABLE ~> ident) ~ opt(AS ~> hiveQl) ^^ { + case isLazy ~ tableName ~ plan => + CacheTableCommand(tableName, plan, isLazy.isDefined) } protected lazy val uncache: Parser[LogicalPlan] = UNCACHE ~ TABLE ~> ident ^^ { - case tableName => CacheCommand(tableName, false) + case tableName => UncacheTableCommand(tableName) } protected lazy val addJar: Parser[LogicalPlan] = - ADD ~ JAR ~> remainingQuery ^^ { - case rq => AddJar(rq.trim()) + ADD ~ JAR ~> restInput ^^ { + case jar => AddJar(jar.trim()) } protected lazy val addFile: Parser[LogicalPlan] = - ADD ~ FILE ~> remainingQuery ^^ { - case rq => AddFile(rq.trim()) + ADD ~ FILE ~> restInput ^^ { + case file => AddFile(file.trim()) } protected lazy val dfs: Parser[LogicalPlan] = - DFS ~> allQuery ^^ { - case aq => NativeCommand(aq.trim()) + DFS ~> wholeInput ^^ { + case command => NativeCommand(command.trim()) } protected lazy val source: Parser[LogicalPlan] = - SOURCE ~> remainingQuery ^^ { - case rq => SourceCommand(rq.trim()) + SOURCE ~> restInput ^^ { + case file => SourceCommand(file.trim()) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala index c0e69393cc..a4354c1379 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TestHive.scala @@ -32,7 +32,7 @@ import org.apache.hadoop.hive.serde2.avro.AvroSerDe import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.catalyst.analysis._ -import org.apache.spark.sql.catalyst.plans.logical.{CacheCommand, LogicalPlan, NativeCommand} +import org.apache.spark.sql.catalyst.plans.logical.{CacheTableCommand, LogicalPlan, NativeCommand} import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.hive._ import org.apache.spark.sql.SQLConf @@ -67,7 +67,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { lazy val metastorePath = getTempFilePath("sparkHiveMetastore").getCanonicalPath /** Sets up the system initially or after a RESET command */ - protected def configure() { + protected def configure(): Unit = { setConf("javax.jdo.option.ConnectionURL", s"jdbc:derby:;databaseName=$metastorePath;create=true") setConf("hive.metastore.warehouse.dir", warehousePath) @@ -154,7 +154,7 @@ class TestHiveContext(sc: SparkContext) extends HiveContext(sc) { override lazy val analyzed = { val describedTables = logical match { case NativeCommand(describedTable(tbl)) => tbl :: Nil - case CacheCommand(tbl, _) => tbl :: Nil + case CacheTableCommand(tbl, _, _) => tbl :: Nil case _ => Nil } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 158cfb5bbe..2060e1f1a7 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -17,13 +17,13 @@ package org.apache.spark.sql.hive -import org.apache.spark.sql.{QueryTest, SchemaRDD} -import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan} +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.hive.test.TestHive +import org.apache.spark.sql.hive.test.TestHive._ +import org.apache.spark.sql.{QueryTest, SchemaRDD} +import org.apache.spark.storage.RDDBlockId class CachedTableSuite extends QueryTest { - import TestHive._ - /** * Throws a test failed exception when the number of cached tables differs from the expected * number. @@ -34,11 +34,24 @@ class CachedTableSuite extends QueryTest { case cached: InMemoryRelation => cached } - if (cachedData.size != numCachedTables) { - fail( - s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + - planWithCaching) - } + assert( + cachedData.size == numCachedTables, + s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" + + planWithCaching) + } + + def rddIdOf(tableName: String): Int = { + val executedPlan = table(tableName).queryExecution.executedPlan + executedPlan.collect { + case InMemoryColumnarTableScan(_, _, relation) => + relation.cachedColumnBuffers.id + case _ => + fail(s"Table $tableName is not cached\n" + executedPlan) + }.head + } + + def isMaterialized(rddId: Int): Boolean = { + sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty } test("cache table") { @@ -102,16 +115,47 @@ class CachedTableSuite extends QueryTest { assert(!TestHive.isCached("src"), "Table 'src' should not be cached") } - test("CACHE TABLE AS SELECT") { - assertCached(sql("SELECT * FROM src"), 0) - sql("CACHE TABLE test AS SELECT key FROM src") + test("CACHE TABLE tableName AS SELECT * FROM anotherTable") { + sql("CACHE TABLE testCacheTable AS SELECT * FROM src") + assertCached(table("testCacheTable")) - checkAnswer( - sql("SELECT * FROM test"), - sql("SELECT key FROM src").collect().toSeq) + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") - assertCached(sql("SELECT * FROM test")) + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } + + test("CACHE TABLE tableName AS SELECT ...") { + sql("CACHE TABLE testCacheTable AS SELECT key FROM src LIMIT 10") + assertCached(table("testCacheTable")) + + val rddId = rddIdOf("testCacheTable") + assert( + isMaterialized(rddId), + "Eagerly cached in-memory table should have already been materialized") + + uncacheTable("testCacheTable") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") + } - assertCached(sql("SELECT * FROM test JOIN test"), 2) + test("CACHE LAZY TABLE tableName") { + sql("CACHE LAZY TABLE src") + assertCached(table("src")) + + val rddId = rddIdOf("src") + assert( + !isMaterialized(rddId), + "Lazily cached in-memory table shouldn't be materialized eagerly") + + sql("SELECT COUNT(*) FROM src").collect() + assert( + isMaterialized(rddId), + "Lazily cached in-memory table should have been materialized") + + uncacheTable("src") + assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted") } } |