aboutsummaryrefslogtreecommitdiff
path: root/sql/core
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-05-29 23:19:12 -0700
committerCheng Lian <lian@databricks.com>2016-05-29 23:19:12 -0700
commit1360a6d636dd812a27955fc85df8e0255db60dfa (patch)
treeb43895b8da141122252ec7edcb0491e6a9622b79 /sql/core
parentce1572d16f03d383071bcc1f30ede551e8ded49f (diff)
downloadspark-1360a6d636dd812a27955fc85df8e0255db60dfa.tar.gz
spark-1360a6d636dd812a27955fc85df8e0255db60dfa.tar.bz2
spark-1360a6d636dd812a27955fc85df8e0255db60dfa.zip
[SPARK-15112][SQL] Disables EmbedSerializerInFilter for plan fragments that change schema
## What changes were proposed in this pull request? `EmbedSerializerInFilter` implicitly assumes that the plan fragment being optimized doesn't change plan schema, which is reasonable because `Dataset.filter` should never change the schema. However, due to another issue involving `DeserializeToObject` and `SerializeFromObject`, typed filter *does* change plan schema (see [SPARK-15632][1]). This breaks `EmbedSerializerInFilter` and causes corrupted data. This PR disables `EmbedSerializerInFilter` when there's a schema change to avoid data corruption. The schema change issue should be addressed in follow-up PRs. ## How was this patch tested? New test case added in `DatasetSuite`. [1]: https://issues.apache.org/jira/browse/SPARK-15632 Author: Cheng Lian <lian@databricks.com> Closes #13362 from liancheng/spark-15112-corrupted-filter.
Diffstat (limited to 'sql/core')
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala16
1 files changed, 15 insertions, 1 deletions
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index e395007999..8fc4dc9f17 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -706,7 +706,7 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val dataset = Seq(1, 2, 3).toDS()
dataset.createOrReplaceTempView("tempView")
- // Overrrides the existing temporary view with same name
+ // Overrides the existing temporary view with same name
// No exception should be thrown here.
dataset.createOrReplaceTempView("tempView")
@@ -769,6 +769,20 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
checkShowString(ds, expected)
}
+
+ test(
+ "SPARK-15112: EmbedDeserializerInFilter should not optimize plan fragment that changes schema"
+ ) {
+ val ds = Seq(1 -> "foo", 2 -> "bar").toDF("b", "a").as[ClassData]
+
+ assertResult(Seq(ClassData("foo", 1), ClassData("bar", 2))) {
+ ds.collect().toSeq
+ }
+
+ assertResult(Seq(ClassData("bar", 2))) {
+ ds.filter(_.b > 1).collect().toSeq
+ }
+ }
}
case class Generic[T](id: T, value: Double)