aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-04-11 22:58:35 -0700
committerYin Huai <yhuai@databricks.com>2016-04-11 22:58:35 -0700
commit52a801124f429ab133f9a3867c1da6ebd8fa7d4e (patch)
treeec09e7b7bfa2c86207d65d0068251fa6e7852513
parent2d81ba542e12db65c2bd67357093244be9403102 (diff)
downloadspark-52a801124f429ab133f9a3867c1da6ebd8fa7d4e.tar.gz
spark-52a801124f429ab133f9a3867c1da6ebd8fa7d4e.tar.bz2
spark-52a801124f429ab133f9a3867c1da6ebd8fa7d4e.zip
[SPARK-14554][SQL] disable whole stage codegen if there are too many input columns
## What changes were proposed in this pull request? In https://github.com/apache/spark/pull/12047/files#diff-94a1f59bcc9b6758c4ca874652437634R529, we may split field expressions codes in `CreateExternalRow` to support wide table. However, the whole stage codegen framework doesn't support it, because the input for expressions is not always the input row, but can be `CodeGenContext.currentVars`, which doesn't work well with `CodeGenContext.splitExpressions`. Actually we do have a check to guard against this cases, but it's incomplete, it only checks output fields. This PR improves the whole stage codegen support check, to disable it if there are too many input fields, so that we can avoid splitting field expressions codes in `CreateExternalRow` for whole stage codegen. TODO: Is it a better solution if we can make `CodeGenContext.currentVars` work well with `CodeGenContext.splitExpressions`? ## How was this patch tested? new test in DatasetSuite. Author: Wenchen Fan <wenchen@databricks.com> Closes #12322 from cloud-fan/codegen.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala7
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala6
2 files changed, 11 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
index c4594f0480..447dbe7018 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala
@@ -446,8 +446,11 @@ case class CollapseCodegenStages(conf: SQLConf) extends Rule[SparkPlan] {
case plan: CodegenSupport if plan.supportCodegen =>
val willFallback = plan.expressions.exists(_.find(e => !supportCodegen(e)).isDefined)
// the generated code will be huge if there are too many columns
- val haveTooManyFields = numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
- !willFallback && !haveTooManyFields
+ val hasTooManyOutputFields =
+ numOfNestedFields(plan.schema) > conf.wholeStageMaxNumFields
+ val hasTooManyInputFields =
+ plan.children.map(p => numOfNestedFields(p.schema)).exists(_ > conf.wholeStageMaxNumFields)
+ !willFallback && !hasTooManyOutputFields && !hasTooManyInputFields
case _ => false
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index e8e801084f..47251681e3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -620,6 +620,12 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
val df = streaming.join(static, Seq("b"))
assert(df.isStreaming, "streaming Dataset returned false for 'isStreaming'.")
}
+
+ test("SPARK-14554: Dataset.map may generate wrong java code for wide table") {
+ val wideDF = sqlContext.range(10).select(Seq.tabulate(1000) {i => ('id + i).as(s"c$i")} : _*)
+ // Make sure the generated code for this plan can compile and execute.
+ wideDF.map(_.getLong(0)).collect()
+ }
}
case class OtherTuple(_1: String, _2: Int)