aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala6
2 files changed, 11 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index c4594f0480..447dbe7018 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
- val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
- !willFallback && !haveTooManyFields
+ val hasTooManyOutputFields =
+ numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+ val hasTooManyInputFields =
+ plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
+ !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
case _ => false
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index e8e801084f..47251681e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val df = streaming.join(static, Seq("b"))
assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}
+
+ test("SPARK-14554: Dataset.map may generate wrong java code for wide table") {
+ val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
+ // Make sure the generated code for this plan can compile and execute.
+ wideDF.map(_.getLong(0)).collect()
+ }
}
case class OtherTuple(_1: String, _2: Int)