aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/test
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-07-30 22:48:09 -0700
committerReynold Xin <rxin@databricks.com>2016-07-30 22:48:09 -0700
commit957a8ab3743521850fb1c0106c37c5d3997b9e56 (patch)
treeabbcd0676d67326624d37e47d93abc9b9bfe7942 /sql/core/src/test
parenta6290e51e402e8434d6207d553db1f551e714fde (diff)
downloadspark-957a8ab3743521850fb1c0106c37c5d3997b9e56.tar.gz
spark-957a8ab3743521850fb1c0106c37c5d3997b9e56.tar.bz2
spark-957a8ab3743521850fb1c0106c37c5d3997b9e56.zip
[SPARK-16818] Exchange reuse incorrectly reuses scans over different sets of partitions
## What changes were proposed in this pull request? This fixes a bug wherethe file scan operator does not take into account partition pruning in its implementation of `sameResult()`. As a result, executions may be incorrect on self-joins over the same base file relation. The patch here is minimal, but we should reconsider relying on `metadata` for implementing sameResult() in the future, as string representations may not be uniquely identifying. cc rxin ## How was this patch tested? Unit tests. Author: Eric Liang <ekl@databricks.com> Closes #14425 from ericl/spark-16818.
Diffstat (limited to 'sql/core/src/test')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala35
1 files changed, 34 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
index 2f551b1a01..18246500f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionSet, PredicateHelper}
import org.apache.spark.sql.catalyst.util
-import org.apache.spark.sql.execution.DataSourceScanExec
+import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources._
@@ -408,6 +408,39 @@ class FileSourceStrategySuite extends QueryTest with SharedSQLContext with Predi
}
}
+ test("[SPARK-16818] partition pruned file scans implement sameResult correctly") {
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+ spark.range(100)
+ .selectExpr("id", "id as b")
+ .write
+ .partitionBy("id")
+ .parquet(tempDir)
+ val df = spark.read.parquet(tempDir)
+ def getPlan(df: DataFrame): SparkPlan = {
+ df.queryExecution.executedPlan
+ }
+ assert(getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 2"))))
+ assert(!getPlan(df.where("id = 2")).sameResult(getPlan(df.where("id = 3"))))
+ }
+ }
+
+ test("[SPARK-16818] exchange reuse respects differences in partition pruning") {
+ spark.conf.set("spark.sql.exchange.reuse", true)
+ withTempPath { path =>
+ val tempDir = path.getCanonicalPath
+ spark.range(10)
+ .selectExpr("id % 2 as a", "id % 3 as b", "id as c")
+ .write
+ .partitionBy("a")
+ .parquet(tempDir)
+ val df = spark.read.parquet(tempDir)
+ val df1 = df.where("a = 0").groupBy("b").agg("c" -> "sum")
+ val df2 = df.where("a = 1").groupBy("b").agg("c" -> "sum")
+ checkAnswer(df1.join(df2, "b"), Row(0, 6, 12) :: Row(1, 4, 8) :: Row(2, 10, 5) :: Nil)
+ }
+ }
+
// Helpers for checking the arguments passed to the FileFormat.
protected val checkPartitionSchema =