aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-05 17:51:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-05 17:51:59 -0700
commit34b97a067d1b370fbed8ecafab2f48501a35d783 (patch)
treedc035a45d08a2b7b9d4a5cb5e527b880a5125402 /sql/core
parent58f5361caaa2f898e38ae4b3794167881e20a818 (diff)
downloadspark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.gz
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.bz2
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.zip
[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance. This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching. Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2513 from liancheng/eager-caching and squashes the following commits: fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala47
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala145
5 files changed, 137 insertions, 74 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index aebdbb68e4..3bf7382ac6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,14 +91,10 @@ private[sql] trait CacheManager {
}
/** Removes the data for the given SchemaRDD from the cache */
- private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock {
+ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.optimizedPlan
val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
-
- if (dataIndex < 0) {
- throw new IllegalArgumentException(s"Table $query is not cached.")
- }
-
+ require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
}
@@ -135,5 +131,4 @@ private[sql] trait CacheManager {
case _ =>
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index cec82a7f2d..4f79173a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation(
override def newInstance() = {
new InMemoryRelation(
- output.map(_.newInstance),
+ output.map(_.newInstance()),
useCompression,
batchSize,
storageLevel,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cf93d5ad7b..5c16d0c624 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
- case logical.CacheCommand(tableName, cache) =>
- Seq(execution.CacheCommand(tableName, cache)(context))
- case logical.CacheTableAsSelectCommand(tableName, plan) =>
- Seq(execution.CacheTableAsSelectCommand(tableName, plan))
+ case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
+ Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+ case logical.UncacheTableCommand(tableName) =>
+ Seq(execution.UncacheTableCommand(tableName))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f88099ec07..d49633c24a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -138,49 +138,54 @@ case class ExplainCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+case class CacheTableCommand(
+ tableName: String,
+ plan: Option[LogicalPlan],
+ isLazy: Boolean)
extends LeafNode with Command {
override protected lazy val sideEffectResult = {
- if (doCache) {
- context.cacheTable(tableName)
- } else {
- context.uncacheTable(tableName)
+ import sqlContext._
+
+ plan.foreach(_.registerTempTable(tableName))
+ val schemaRDD = table(tableName)
+ schemaRDD.cache()
+
+ if (!isLazy) {
+ // Performs eager caching
+ schemaRDD.count()
}
+
Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
}
+
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command {
-
+case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
override protected lazy val sideEffectResult: Seq[Row] = {
- Row("# Registered as a temporary table", null, null) +:
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ sqlContext.table(tableName).unpersist()
+ Seq.empty[Row]
}
+
+ override def output: Seq[Attribute] = Seq.empty
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan)
+case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
+ @transient context: SQLContext)
extends LeafNode with Command {
-
- override protected[sql] lazy val sideEffectResult = {
- import sqlContext._
- logicalPlan.registerTempTable(tableName)
- cacheTable(tableName)
- Seq.empty[Row]
- }
- override def output: Seq[Attribute] = Seq.empty
-
+ override protected lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 957388e99b..1e624f9700 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -18,30 +18,39 @@
package org.apache.spark.sql
import org.apache.spark.sql.TestData._
-import org.apache.spark.sql.columnar.{InMemoryRelation, InMemoryColumnarTableScan}
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
+import org.apache.spark.sql.test.TestSQLContext._
+import org.apache.spark.storage.RDDBlockId
case class BigData(s: String)
class CachedTableSuite extends QueryTest {
- import TestSQLContext._
TestData // Load test tables.
- /**
- * Throws a test failed exception when the number of cached tables differs from the expected
- * number.
- */
def assertCached(query: SchemaRDD, numCachedTables: Int = 1): Unit = {
val planWithCaching = query.queryExecution.withCachedData
val cachedData = planWithCaching collect {
case cached: InMemoryRelation => cached
}
- if (cachedData.size != numCachedTables) {
- fail(
- s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+ assert(
+ cachedData.size == numCachedTables,
+ s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
planWithCaching)
- }
+ }
+
+ def rddIdOf(tableName: String): Int = {
+ val executedPlan = table(tableName).queryExecution.executedPlan
+ executedPlan.collect {
+ case InMemoryColumnarTableScan(_, _, relation) =>
+ relation.cachedColumnBuffers.id
+ case _ =>
+ fail(s"Table $tableName is not cached\n" + executedPlan)
+ }.head
+ }
+
+ def isMaterialized(rddId: Int): Boolean = {
+ sparkContext.env.blockManager.get(RDDBlockId(rddId, 0)).nonEmpty
}
test("too big for memory") {
@@ -52,10 +61,33 @@ class CachedTableSuite extends QueryTest {
uncacheTable("bigData")
}
- test("calling .cache() should use inmemory columnar caching") {
+ test("calling .cache() should use in-memory columnar caching") {
table("testData").cache()
+ assertCached(table("testData"))
+ }
+
+ test("calling .unpersist() should drop in-memory columnar cache") {
+ table("testData").cache()
+ table("testData").count()
+ table("testData").unpersist(true)
+ assertCached(table("testData"), 0)
+ }
+
+ test("isCached") {
+ cacheTable("testData")
assertCached(table("testData"))
+ assert(table("testData").queryExecution.withCachedData match {
+ case _: InMemoryRelation => true
+ case _ => false
+ })
+
+ uncacheTable("testData")
+ assert(!isCached("testData"))
+ assert(table("testData").queryExecution.withCachedData match {
+ case _: InMemoryRelation => false
+ case _ => true
+ })
}
test("SPARK-1669: cacheTable should be idempotent") {
@@ -64,32 +96,27 @@ class CachedTableSuite extends QueryTest {
cacheTable("testData")
assertCached(table("testData"))
- cacheTable("testData")
- table("testData").queryExecution.analyzed match {
- case InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) =>
- fail("cacheTable is not idempotent")
+ assertResult(1, "InMemoryRelation not found, testData should have been cached") {
+ table("testData").queryExecution.withCachedData.collect {
+ case r: InMemoryRelation => r
+ }.size
+ }
- case _ =>
+ cacheTable("testData")
+ assertResult(0, "Double InMemoryRelations found, cacheTable() is not idempotent") {
+ table("testData").queryExecution.withCachedData.collect {
+ case r @ InMemoryRelation(_, _, _, _, _: InMemoryColumnarTableScan) => r
+ }.size
}
}
test("read from cached table and uncache") {
cacheTable("testData")
-
- checkAnswer(
- table("testData"),
- testData.collect().toSeq
- )
-
+ checkAnswer(table("testData"), testData.collect().toSeq)
assertCached(table("testData"))
uncacheTable("testData")
-
- checkAnswer(
- table("testData"),
- testData.collect().toSeq
- )
-
+ checkAnswer(table("testData"), testData.collect().toSeq)
assertCached(table("testData"), 0)
}
@@ -99,10 +126,12 @@ class CachedTableSuite extends QueryTest {
}
}
- test("SELECT Star Cached Table") {
+ test("SELECT star from cached table") {
sql("SELECT * FROM testData").registerTempTable("selectStar")
cacheTable("selectStar")
- sql("SELECT * FROM selectStar WHERE key = 1").collect()
+ checkAnswer(
+ sql("SELECT * FROM selectStar WHERE key = 1"),
+ Seq(Row(1, "1")))
uncacheTable("selectStar")
}
@@ -120,23 +149,57 @@ class CachedTableSuite extends QueryTest {
sql("CACHE TABLE testData")
assertCached(table("testData"))
- assert(isCached("testData"), "Table 'testData' should be cached")
+ val rddId = rddIdOf("testData")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
sql("UNCACHE TABLE testData")
- assertCached(table("testData"), 0)
assert(!isCached("testData"), "Table 'testData' should not be cached")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
-
- test("CACHE TABLE tableName AS SELECT Star Table") {
+
+ test("CACHE TABLE tableName AS SELECT * FROM anotherTable") {
sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- sql("SELECT * FROM testCacheTable WHERE key = 1").collect()
- assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
-
- test("'CACHE TABLE tableName AS SELECT ..'") {
- sql("CACHE TABLE testCacheTable AS SELECT * FROM testData")
- assert(isCached("testCacheTable"), "Table 'testCacheTable' should be cached")
+
+ test("CACHE TABLE tableName AS SELECT ...") {
+ sql("CACHE TABLE testCacheTable AS SELECT key FROM testData LIMIT 10")
+ assertCached(table("testCacheTable"))
+
+ val rddId = rddIdOf("testCacheTable")
+ assert(
+ isMaterialized(rddId),
+ "Eagerly cached in-memory table should have already been materialized")
+
uncacheTable("testCacheTable")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
+ }
+
+ test("CACHE LAZY TABLE tableName") {
+ sql("CACHE LAZY TABLE testData")
+ assertCached(table("testData"))
+
+ val rddId = rddIdOf("testData")
+ assert(
+ !isMaterialized(rddId),
+ "Lazily cached in-memory table shouldn't be materialized eagerly")
+
+ sql("SELECT COUNT(*) FROM testData").collect()
+ assert(
+ isMaterialized(rddId),
+ "Lazily cached in-memory table should have been materialized")
+
+ uncacheTable("testData")
+ assert(!isMaterialized(rddId), "Uncached in-memory table should have been unpersisted")
}
}