aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-10-05 17:51:59 -0700
committerMichael Armbrust <michael@databricks.com>2014-10-05 17:51:59 -0700
commit34b97a067d1b370fbed8ecafab2f48501a35d783 (patch)
treedc035a45d08a2b7b9d4a5cb5e527b880a5125402 /sql/core/src/main
parent58f5361caaa2f898e38ae4b3794167881e20a818 (diff)
downloadspark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.gz
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.tar.bz2
spark-34b97a067d1b370fbed8ecafab2f48501a35d783.zip
[SPARK-3645][SQL] Makes table caching eager by default and adds syntax for lazy caching
Although lazy caching for in-memory table seems consistent with the `RDD.cache()` API, it's relatively confusing for users who mainly work with SQL and not familiar with Spark internals. The `CACHE TABLE t; SELECT COUNT(*) FROM t;` pattern is also commonly seen just to ensure predictable performance. This PR makes both the `CACHE TABLE t [AS SELECT ...]` statement and the `SQLContext.cacheTable()` API eager by default, and adds a new `CACHE LAZY TABLE t [AS SELECT ...]` syntax to provide lazy in-memory table caching. Also, took the chance to make some refactoring: `CacheCommand` and `CacheTableAsSelectCommand` are now merged and renamed to `CacheTableCommand` since the former is strictly a special case of the latter. A new `UncacheTableCommand` is added for the `UNCACHE TABLE t` statement. Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #2513 from liancheng/eager-caching and squashes the following commits: fe92287 [Cheng Lian] Makes table caching eager by default and adds syntax for lazy caching
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala9
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala2
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala47
4 files changed, 33 insertions, 33 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
index aebdbb68e4..3bf7382ac6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/CacheManager.scala
@@ -91,14 +91,10 @@ private[sql] trait CacheManager {
}
/** Removes the data for the given SchemaRDD from the cache */
- private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = false): Unit = writeLock {
+ private[sql] def uncacheQuery(query: SchemaRDD, blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.optimizedPlan
val dataIndex = cachedData.indexWhere(_.plan.sameResult(planToCache))
-
- if (dataIndex < 0) {
- throw new IllegalArgumentException(s"Table $query is not cached.")
- }
-
+ require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.cachedColumnBuffers.unpersist(blocking)
cachedData.remove(dataIndex)
}
@@ -135,5 +131,4 @@ private[sql] trait CacheManager {
case _ =>
}
}
-
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index cec82a7f2d..4f79173a26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -111,7 +111,7 @@ private[sql] case class InMemoryRelation(
override def newInstance() = {
new InMemoryRelation(
- output.map(_.newInstance),
+ output.map(_.newInstance()),
useCompression,
batchSize,
storageLevel,
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index cf93d5ad7b..5c16d0c624 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -304,10 +304,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
Seq(execution.SetCommand(key, value, plan.output)(context))
case logical.ExplainCommand(logicalPlan, extended) =>
Seq(execution.ExplainCommand(logicalPlan, plan.output, extended)(context))
- case logical.CacheCommand(tableName, cache) =>
- Seq(execution.CacheCommand(tableName, cache)(context))
- case logical.CacheTableAsSelectCommand(tableName, plan) =>
- Seq(execution.CacheTableAsSelectCommand(tableName, plan))
+ case logical.CacheTableCommand(tableName, optPlan, isLazy) =>
+ Seq(execution.CacheTableCommand(tableName, optPlan, isLazy))
+ case logical.UncacheTableCommand(tableName) =>
+ Seq(execution.UncacheTableCommand(tableName))
case _ => Nil
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
index f88099ec07..d49633c24a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/commands.scala
@@ -138,49 +138,54 @@ case class ExplainCommand(
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheCommand(tableName: String, doCache: Boolean)(@transient context: SQLContext)
+case class CacheTableCommand(
+ tableName: String,
+ plan: Option[LogicalPlan],
+ isLazy: Boolean)
extends LeafNode with Command {
override protected lazy val sideEffectResult = {
- if (doCache) {
- context.cacheTable(tableName)
- } else {
- context.uncacheTable(tableName)
+ import sqlContext._
+
+ plan.foreach(_.registerTempTable(tableName))
+ val schemaRDD = table(tableName)
+ schemaRDD.cache()
+
+ if (!isLazy) {
+ // Performs eager caching
+ schemaRDD.count()
}
+
Seq.empty[Row]
}
override def output: Seq[Attribute] = Seq.empty
}
+
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
- @transient context: SQLContext)
- extends LeafNode with Command {
-
+case class UncacheTableCommand(tableName: String) extends LeafNode with Command {
override protected lazy val sideEffectResult: Seq[Row] = {
- Row("# Registered as a temporary table", null, null) +:
- child.output.map(field => Row(field.name, field.dataType.toString, null))
+ sqlContext.table(tableName).unpersist()
+ Seq.empty[Row]
}
+
+ override def output: Seq[Attribute] = Seq.empty
}
/**
* :: DeveloperApi ::
*/
@DeveloperApi
-case class CacheTableAsSelectCommand(tableName: String, logicalPlan: LogicalPlan)
+case class DescribeCommand(child: SparkPlan, output: Seq[Attribute])(
+ @transient context: SQLContext)
extends LeafNode with Command {
-
- override protected[sql] lazy val sideEffectResult = {
- import sqlContext._
- logicalPlan.registerTempTable(tableName)
- cacheTable(tableName)
- Seq.empty[Row]
- }
- override def output: Seq[Attribute] = Seq.empty
-
+ override protected lazy val sideEffectResult: Seq[Row] = {
+ Row("# Registered as a temporary table", null, null) +:
+ child.output.map(field => Row(field.name, field.dataType.toString, null))
+ }
}