aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheng Lian <lian.cs.zju@gmail.com>2014-06-23 13:24:33 -0700
committerMichael Armbrust <michael@databricks.com>2014-06-23 13:24:33 -0700
commita4bc442ca2c35444de8a33376b6f27c6c2a9003d (patch)
tree8c4b63eb33d10d5165ba13d66c0cd81f7a60e307
parent853a2b951d4c7f6c6c37f53b465b3c7b77691b7c (diff)
downloadspark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.tar.gz
spark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.tar.bz2
spark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.zip
[SPARK-1669][SQL] Made cacheTable idempotent
JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669) Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table. Before: ``` scala> loadTestTable("src") ... scala> cacheTable("src") ... scala> cacheTable("src") ... scala> table("src") ... == Query Plan == InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None)))) ``` After: ``` scala> loadTestTable("src") ... scala> cacheTable("src") ... scala> cacheTable("src") ... scala> table("src") ... == Query Plan == InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None)) ``` Author: Cheng Lian <lian.cs.zju@gmail.com> Closes #1183 from liancheng/spark-1669 and squashes the following commits: 68f8a20 [Cheng Lian] Removed an unused import 51bae90 [Cheng Lian] Made cacheTable idempotent
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala13
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala20
2 files changed, 29 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index c60af28b2a..0bcfbf6f84 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext)
/** Caches the specified table in-memory. */
def cacheTable(tableName: String): Unit = {
val currentTable = catalog.lookupRelation(None, tableName)
- val useCompression =
- sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
- val asInMemoryRelation =
- InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+ val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match {
+ case _: InMemoryRelation =>
+ currentTable.logicalPlan
+
+ case _ =>
+ val useCompression =
+ sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false)
+ InMemoryRelation(useCompression, executePlan(currentTable).executedPlan)
+ }
catalog.registerTable(None, tableName, asInMemoryRelation)
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index e9360b0fc7..cca58c0063 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -17,7 +17,9 @@
package org.apache.spark.sql
+import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators
import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
import org.apache.spark.sql.test._
/* Implicits */
@@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest {
clear()
}
+ test("SPARK-1669: cacheTable should be idempotent") {
+ assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation])
+
+ cacheTable("testData")
+ EliminateAnalysisOperators(table("testData").logicalPlan) match {
+ case _: InMemoryRelation =>
+ case _ =>
+ fail("testData should be cached")
+ }
+
+ cacheTable("testData")
+ EliminateAnalysisOperators(table("testData").logicalPlan) match {
+ case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) =>
+ fail("cacheTable is not idempotent")
+
+ case _ =>
+ }
+ }
}