diff options
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala | 7 | ||||
-rw-r--r-- | sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 |
2 files changed, 11 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index c4594f0480..447dbe7018 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] { case plan: CodegenSupport if plan.supportCodegen => val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined) // the generated code will be huge if there are too many columns - val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields - !willFallback && !haveTooManyFields + val hasTooManyOutputFields = + numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields + val hasTooManyInputFields = + plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields) + !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields case _ => false } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e8e801084f..47251681e3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext { val df = streaming.join(static, Seq("b")) assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.") } + + test("SPARK-14554: Dataset.map may generate wrong java code for wide table") { + val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*) + // Make sure the generated code for this plan can compile and execute. + wideDF.map(_.getLong(0)).collect() + } } case class OtherTuple(_1: String, _2: Int) |