aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2017-01-18 01:57:12 +0800
committerWenchen Fan <wenchen@databricks.com>2017-01-18 01:57:12 +0800
commita83accfcfd6a92afac5040c50577258ab83d10dd (patch)
tree123b8b01ad17639c72829d4e75217fa6b414129d /sql
parent20e6280626fe243b170a2e7c5e018c67f3dac1db (diff)
downloadspark-a83accfcfd6a92afac5040c50577258ab83d10dd.tar.gz
spark-a83accfcfd6a92afac5040c50577258ab83d10dd.tar.bz2
spark-a83accfcfd6a92afac5040c50577258ab83d10dd.zip
[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates
## What changes were proposed in this pull request? `dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary. ## How was this patch tested? test("SPARK-19065: dropDuplicates should not create expressions using the same id") Author: Shixiong Zhu <shixiong@databricks.com> Closes #16564 from zsxwing/SPARK-19065.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala5
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala26
3 files changed, 27 insertions, 11 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 1a7a5ba798..24b9b810fc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
if (groupColExprIds.contains(attr.exprId)) {
attr
} else {
- // Removing duplicate rows should not change output attributes. We should keep
- // the original exprId of the attribute. Otherwise, to select a column in original
- // dataset will cause analysis exception due to unresolved attribute.
- Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
+ Alias(new First(attr).toAggregateExpression(), attr.name)()
}
}
Aggregate(groupCols, aggCols, logicalPlan)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 731a28c237..b37bf131e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
(1, 2), (1, 1), (2, 1), (2, 2))
}
- test("dropDuplicates should not change child plan output") {
- val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
- checkDataset(
- ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
- ("a", 1), ("b", 1))
- }
-
test("SPARK-16097: Encoders.tuple should handle null object correctly") {
val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
val data = Seq((("a", "b"), "c"), (null, "d"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e964e646d2..f31dc8add4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
q.stop()
}
}
+
+ test("SPARK-19065: dropDuplicates should not create expressions using the same id") {
+ withTempPath { testPath =>
+ val data = Seq((1, 2), (2, 3), (3, 4))
+ data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
+ val schema = spark.read.json(testPath.getCanonicalPath).schema
+ val query = spark
+ .readStream
+ .schema(schema)
+ .json(testPath.getCanonicalPath)
+ .dropDuplicates("_1")
+ .writeStream
+ .format("memory")
+ .queryName("testquery")
+ .outputMode("complete")
+ .start()
+ try {
+ query.processAllAvailable()
+ if (query.exception.isDefined) {
+ throw query.exception.get
+ }
+ } finally {
+ query.stop()
+ }
+ }
+ }
}
/**