diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2017-01-18 01:57:12 +0800 |
---|---|---|
committer | Wenchen Fan <wenchen@databricks.com> | 2017-01-18 01:57:12 +0800 |
commit | a83accfcfd6a92afac5040c50577258ab83d10dd (patch) | |
tree | 123b8b01ad17639c72829d4e75217fa6b414129d /sql | |
parent | 20e6280626fe243b170a2e7c5e018c67f3dac1db (diff) | |
download | spark-a83accfcfd6a92afac5040c50577258ab83d10dd.tar.gz spark-a83accfcfd6a92afac5040c50577258ab83d10dd.tar.bz2 spark-a83accfcfd6a92afac5040c50577258ab83d10dd.zip |
[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates
## What changes were proposed in this pull request?
`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.
## How was this patch tested?
test("SPARK-19065: dropDuplicates should not create expressions using the same id")
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #16564 from zsxwing/SPARK-19065.
Diffstat (limited to 'sql')
3 files changed, 27 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 1a7a5ba798..24b9b810fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2003,10 +2003,7 @@ class Dataset[T] private[sql]( if (groupColExprIds.contains(attr.exprId)) { attr } else { - // Removing duplicate rows should not change output attributes. We should keep - // the original exprId of the attribute. Otherwise, to select a column in original - // dataset will cause analysis exception due to unresolved attribute. - Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId) + Alias(new First(attr).toAggregateExpression(), attr.name)() } } Aggregate(groupCols, aggCols, logicalPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 731a28c237..b37bf131e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext { (1, 2), (1, 1), (2, 1), (2, 2)) } - test("dropDuplicates should not change child plan output") { - val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS() - checkDataset( - ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]), - ("a", 1), ("b", 1)) - } - test("SPARK-16097: Encoders.tuple should handle null object correctly") { val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING) val data = Seq((("a", "b"), "c"), (null, "d")) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e964e646d2..f31dc8add4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -304,6 +304,32 @@ class StreamSuite extends StreamTest { q.stop() } } + + test("SPARK-19065: dropDuplicates should not create expressions using the same id") { + withTempPath { testPath => + val data = Seq((1, 2), (2, 3), (3, 4)) + data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath) + val schema = spark.read.json(testPath.getCanonicalPath).schema + val query = spark + .readStream + .schema(schema) + .json(testPath.getCanonicalPath) + .dropDuplicates("_1") + .writeStream + .format("memory") + .queryName("testquery") + .outputMode("complete") + .start() + try { + query.processAllAvailable() + if (query.exception.isDefined) { + throw query.exception.get + } + } finally { + query.stop() + } + } + } } /** |