diff options
author | Eric Liang <ekl@databricks.com> | 2016-09-28 13:22:45 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-09-28 13:22:45 -0700 |
commit | a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a (patch) | |
tree | 410eaf12e801ce184ce9a4ec60a526deb0a5f21a | |
parent | 46d1203bf2d01b219c4efc7e0e77a844c0c664da (diff) | |
download | spark-a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a.tar.gz spark-a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a.tar.bz2 spark-a6cfa3f38bcf6ba154d5ed2a53748fbc90c8872a.zip |
[SPARK-17673][SQL] Incorrect exchange reuse with RowDataSourceScan
## What changes were proposed in this pull request?
It seems the equality check for reuse of `RowDataSourceScanExec` nodes doesn't respect the output schema. This can cause self-joins or unions over the same underlying data source to return incorrect results if they select different fields.
## How was this patch tested?
New unit test passes after the fix.
Author: Eric Liang <ekl@databricks.com>
Closes #15273 from ericl/spark-17673.
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala | 4 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 8 |
2 files changed, 12 insertions, 0 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 63f01c5bb9..693b4c4d0e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -340,6 +340,8 @@ object DataSourceStrategy extends Strategy with Logging { // `Filter`s or cannot be handled by `relation`. val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And) + // These metadata values make scan plans uniquely identifiable for equality checking. + // TODO(SPARK-17701) using strings for equality checking is brittle val metadata: Map[String, String] = { val pairs = ArrayBuffer.empty[(String, String)] @@ -350,6 +352,8 @@ object DataSourceStrategy extends Strategy with Logging { } pairs += ("PushedFilters" -> markedFilters.mkString("[", ", ", "]")) } + pairs += ("ReadSchema" -> + StructType.fromAttributes(projects.map(_.toAttribute)).catalogString) pairs.toMap } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 10f15ca280..c94cb3b69d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -791,4 +791,12 @@ class JDBCSuite extends SparkFunSuite val schema = JdbcUtils.schemaString(df, "jdbc:mysql://localhost:3306/temp") assert(schema.contains("`order` TEXT")) } + + test("SPARK-17673: Exchange reuse respects differences in output schema") { + val df = sql("SELECT * FROM inttypes WHERE a IS NOT NULL") + val df1 = df.groupBy("a").agg("c" -> "min") + val df2 = df.groupBy("a").agg("d" -> "min") + val res = df1.union(df2) + assert(res.distinct().count() == 2) // would be 1 if the exchange was incorrectly reused + } } |