diff options
author | Cheng Lian <lian.cs.zju@gmail.com> | 2014-06-23 13:24:33 -0700 |
---|---|---|
committer | Michael Armbrust <michael@databricks.com> | 2014-06-23 13:24:33 -0700 |
commit | a4bc442ca2c35444de8a33376b6f27c6c2a9003d (patch) | |
tree | 8c4b63eb33d10d5165ba13d66c0cd81f7a60e307 /sql | |
parent | 853a2b951d4c7f6c6c37f53b465b3c7b77691b7c (diff) | |
download | spark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.tar.gz spark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.tar.bz2 spark-a4bc442ca2c35444de8a33376b6f27c6c2a9003d.zip |
[SPARK-1669][SQL] Made cacheTable idempotent
JIRA issue: [SPARK-1669](https://issues.apache.org/jira/browse/SPARK-1669)
Caching the same table multiple times should end up with only 1 in-memory columnar representation of this table.
Before:
```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))))
```
After:
```
scala> loadTestTable("src")
...
scala> cacheTable("src")
...
scala> cacheTable("src")
...
scala> table("src")
...
== Query Plan ==
InMemoryColumnarTableScan [key#2,value#3], (InMemoryRelation [key#2,value#3], false, (HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None))
```
Author: Cheng Lian <lian.cs.zju@gmail.com>
Closes #1183 from liancheng/spark-1669 and squashes the following commits:
68f8a20 [Cheng Lian] Removed an unused import
51bae90 [Cheng Lian] Made cacheTable idempotent
Diffstat (limited to 'sql')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 13 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala | 20 |
2 files changed, 29 insertions, 4 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index c60af28b2a..0bcfbf6f84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -187,10 +187,15 @@ class SQLContext(@transient val sparkContext: SparkContext) /** Caches the specified table in-memory. */ def cacheTable(tableName: String): Unit = { val currentTable = catalog.lookupRelation(None, tableName) - val useCompression = - sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false) - val asInMemoryRelation = - InMemoryRelation(useCompression, executePlan(currentTable).executedPlan) + val asInMemoryRelation = EliminateAnalysisOperators(currentTable.logicalPlan) match { + case _: InMemoryRelation => + currentTable.logicalPlan + + case _ => + val useCompression = + sparkContext.conf.getBoolean("spark.sql.inMemoryColumnarStorage.compressed", false) + InMemoryRelation(useCompression, executePlan(currentTable).executedPlan) + } catalog.registerTable(None, tableName, asInMemoryRelation) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index e9360b0fc7..cca58c0063 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql +import org.apache.spark.sql.catalyst.analysis.EliminateAnalysisOperators import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.test._ /* Implicits */ @@ -405,4 +407,22 @@ class SQLQuerySuite extends QueryTest { clear() } + test("SPARK-1669: cacheTable should be idempotent") { + assume(!table("testData").logicalPlan.isInstanceOf[InMemoryRelation]) + + cacheTable("testData") + EliminateAnalysisOperators(table("testData").logicalPlan) match { + case _: InMemoryRelation => + case _ => + fail("testData should be cached") + } + + cacheTable("testData") + EliminateAnalysisOperators(table("testData").logicalPlan) match { + case InMemoryRelation(_, _, _: InMemoryColumnarTableScan) => + fail("cacheTable is not idempotent") + + case _ => + } + } } |