diff options
author | xin Wu <xinwu@us.ibm.com> | 2015-10-29 07:42:46 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-10-29 07:42:46 -0700 |
commit | f7a51deebad1b4c3b970a051f25d286110b94438 (patch) | |
tree | 058f32433dba5e7ef791c9301fc15c37391f798e | |
parent | 3bb2a8d7508b507edfcc21bd20912b0ff4a0a248 (diff) | |
download | spark-f7a51deebad1b4c3b970a051f25d286110b94438.tar.gz spark-f7a51deebad1b4c3b970a051f25d286110b94438.tar.bz2 spark-f7a51deebad1b4c3b970a051f25d286110b94438.zip |
[SPARK-11246] [SQL] Table cache for Parquet broken in 1.5
The root cause is that when spark.sql.hive.convertMetastoreParquet=true by default, the cached InMemoryRelation of the ParquetRelation can not be looked up from the cachedData of CacheManager because the key comparison fails even though it is the same LogicalPlan representing the Subquery that wraps the ParquetRelation.
The solution in this PR is overriding the LogicalPlan.sameResult function in Subquery case class to eliminate subquery node first before directly comparing the child (ParquetRelation), which will find the key to the cached InMemoryRelation.
Author: xin Wu <xinwu@us.ibm.com>
Closes #9326 from xwu0226/spark-11246-commit.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala | 5 | ||||
-rw-r--r-- | sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala | 11 |
2 files changed, 16 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 783252e0a2..219dae88e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -62,6 +62,11 @@ case class LogicalRelation( case _ => false } + // When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need + // LogicalRelation.cleanArgs to return Seq(relation), since expectedOutputAttribute's + // expId can be different but the relation is still the same. + override lazy val cleanArgs: Seq[Any] = Seq(relation) + @transient override lazy val statistics: Statistics = Statistics( sizeInBytes = BigInt(relation.sizeInBytes) ) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala index 9adb3780a2..5c2fc7d82f 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.hive import java.io.File import org.apache.spark.sql.columnar.InMemoryColumnarTableScan +import org.apache.spark.sql.execution.datasources.parquet.ParquetRelation import org.apache.spark.sql.hive.test.TestHiveSingleton import org.apache.spark.sql.{AnalysisException, QueryTest, SaveMode} import org.apache.spark.storage.RDDBlockId @@ -203,4 +204,14 @@ class CachedTableSuite extends QueryTest with TestHiveSingleton { sql("DROP TABLE refreshTable") Utils.deleteRecursively(tempPath) } + + test("SPARK-11246 cache parquet table") { + sql("CREATE TABLE cachedTable STORED AS PARQUET AS SELECT 1") + + cacheTable("cachedTable") + val sparkPlan = sql("SELECT * FROM cachedTable").queryExecution.sparkPlan + assert(sparkPlan.collect { case e: InMemoryColumnarTableScan => e }.size === 1) + + sql("DROP TABLE cachedTable") + } } |