aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTakuya UESHIN <ueshin@happy-camper.st>2016-06-14 10:52:13 -0700
committerMichael Armbrust <michael@databricks.com>2016-06-14 10:52:13 -0700
commitc5b735581922c52a1b1cc6cd8c7b5878d3cf8f20 (patch)
treef9f39e06bec1c65ff416befe9bdd38eb978330ee
parentbc02d011294fcd1ab07b9baf1011c3f2bdf749d9 (diff)
downloadspark-c5b735581922c52a1b1cc6cd8c7b5878d3cf8f20.tar.gz
spark-c5b735581922c52a1b1cc6cd8c7b5878d3cf8f20.tar.bz2
spark-c5b735581922c52a1b1cc6cd8c7b5878d3cf8f20.zip
[SPARK-15915][SQL] Logical plans should use canonicalized plan when override sameResult.
## What changes were proposed in this pull request? `DataFrame` with plan overriding `sameResult` but not using canonicalized plan to compare can't cacheTable. The example is like: ``` val localRelation = Seq(1, 2, 3).toDF() localRelation.createOrReplaceTempView("localRelation") spark.catalog.cacheTable("localRelation") assert( localRelation.queryExecution.withCachedData.collect { case i: InMemoryRelation => i }.size == 1) ``` and this will fail as: ``` ArrayBuffer() had size 0 instead of expected size 1 ``` The reason is that when do `spark.catalog.cacheTable("localRelation")`, `CacheManager` tries to cache for the plan wrapped by `SubqueryAlias` but when planning for the DataFrame `localRelation`, `CacheManager` tries to find cached table for the not-wrapped plan because the plan for DataFrame `localRelation` is not wrapped. Some plans like `LocalRelation`, `LogicalRDD`, etc. override `sameResult` method, but not use canonicalized plan to compare so the `CacheManager` can't detect the plans are the same. This pr modifies them to use canonicalized plan when override `sameResult` method. ## How was this patch tested? Added a test to check if DataFrame with plan overriding sameResult but not using canonicalized plan to compare can cacheTable. Author: Takuya UESHIN <ueshin@happy-camper.st> Closes #13638 from ueshin/issues/SPARK-15915.
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala10
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala8
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala8
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala11
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala2
5 files changed, 28 insertions, 11 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index 87b8647655..9d64f35efc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -65,10 +65,12 @@ case class LocalRelation(output: Seq[Attribute], data: Seq[InternalRow] = Nil)
}
}
- override def sameResult(plan: LogicalPlan): Boolean = plan match {
- case LocalRelation(otherOutput, otherData) =>
- otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
- case _ => false
+ override def sameResult(plan: LogicalPlan): Boolean = {
+ plan.canonicalized match {
+ case LocalRelation(otherOutput, otherData) =>
+ otherOutput.map(_.dataType) == output.map(_.dataType) && otherData == data
+ case _ => false
+ }
}
override lazy val statistics =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index ee72a70cce..e2c23a4ba8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -87,9 +87,11 @@ private[sql] case class LogicalRDD(
override def newInstance(): LogicalRDD.this.type =
LogicalRDD(output.map(_.newInstance()), rdd)(session).asInstanceOf[this.type]
- override def sameResult(plan: LogicalPlan): Boolean = plan match {
- case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
- case _ => false
+ override def sameResult(plan: LogicalPlan): Boolean = {
+ plan.canonicalized match {
+ case LogicalRDD(_, otherRDD) => rdd.id == otherRDD.id
+ case _ => false
+ }
}
override protected def stringArgs: Iterator[Any] = Iterator(output)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index a418d02983..39c8606fd1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -60,9 +60,11 @@ case class LogicalRelation(
com.google.common.base.Objects.hashCode(relation, output)
}
- override def sameResult(otherPlan: LogicalPlan): Boolean = otherPlan match {
- case LogicalRelation(otherRelation, _, _) => relation == otherRelation
- case _ => false
+ override def sameResult(otherPlan: LogicalPlan): Boolean = {
+ otherPlan.canonicalized match {
+ case LogicalRelation(otherRelation, _, _) => relation == otherRelation
+ case _ => false
+ }
}
// When comparing two LogicalRelations from within LogicalPlan.sameResult, we only need
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 44bafa55bc..3306ac42a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -552,4 +552,15 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
selectStar,
Seq(Row(1, "1")))
}
+
+ test("SPARK-15915 Logical plans should use canonicalized plan when override sameResult") {
+ val localRelation = Seq(1, 2, 3).toDF()
+ localRelation.createOrReplaceTempView("localRelation")
+
+ spark.catalog.cacheTable("localRelation")
+ assert(
+ localRelation.queryExecution.withCachedData.collect {
+ case i: InMemoryRelation => i
+ }.size == 1)
+ }
}
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 5596a4470f..58bca2059c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -185,7 +185,7 @@ private[hive] case class MetastoreRelation(
/** Only compare database and tablename, not alias. */
override def sameResult(plan: LogicalPlan): Boolean = {
- plan match {
+ plan.canonicalized match {
case mr: MetastoreRelation =>
mr.databaseName == databaseName && mr.tableName == tableName
case _ => false