aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-06-10 21:12:06 -0700
committerDavies Liu <davies.liu@gmail.com>2016-06-10 21:12:06 -0700
commit7504bc73f20fe0e6546a019ed91c3fd3804287ba (patch)
treec6dd39e4a922bce5747d1d333add2e4be6e304e5 /sql
parent468da03e23a01e02718608f05d778386cbb8416b (diff)
downloadspark-7504bc73f20fe0e6546a019ed91c3fd3804287ba.tar.gz
spark-7504bc73f20fe0e6546a019ed91c3fd3804287ba.tar.bz2
spark-7504bc73f20fe0e6546a019ed91c3fd3804287ba.zip
[SPARK-15759] [SQL] Fallback to non-codegen when fail to compile generated code
## What changes were proposed in this pull request? In case of any bugs in whole-stage codegen, the generated code can't be compiled, we should fallback to non-codegen to make sure that query could run. The batch mode of new parquet reader depends on codegen, can't be easily switched to non-batch mode, so we still use codegen for batched scan (for parquet). Because it only support primitive types and the number of columns is less than spark.sql.codegen.maxFields (100), it should not fail. This could be configurable by `spark.sql.codegen.fallback` ## How was this patch tested? Manual test it with buggy operator, it worked well. Author: Davies Liu <davies@databricks.com> Closes #13501 from davies/codegen_fallback.
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala11
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala11
3 files changed, 24 insertions, 3 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
index 9ab98fd124..ee72a70cce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
@@ -234,7 +234,10 @@ private[sql] case class BatchedDataSourceScanExec(
"scanTime" -> SQLMetrics.createTimingMetric(sparkContext, "scan time"))
protected override def doExecute(): RDD[InternalRow] = {
- throw new UnsupportedOperationException
+ // in the case of fallback, this batched scan should never fail because of:
+ // 1) only primitive types are supported
+ // 2) the number of columns should be smaller than spark.sql.codegen.maxFields
+ WholeStageCodegenExec(this).execute()
}
override def simpleString: String = {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
index e0d8e35713..ac4c3aae5f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoi
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
/**
* An interface for those physical operators that support codegen.
@@ -339,12 +340,20 @@ case class WholeStageCodegenExec(child: SparkPlan) extends UnaryExecNode with Co
new CodeAndComment(CodeFormatter.stripExtraNewLines(source), ctx.getPlaceHolderToComments()))
logDebug(s"\n${CodeFormatter.format(cleanedSource)}")
- CodeGenerator.compile(cleanedSource)
(ctx, cleanedSource)
}
override def doExecute(): RDD[InternalRow] = {
val (ctx, cleanedSource) = doCodeGen()
+ // try to compile and fallback if it failed
+ try {
+ CodeGenerator.compile(cleanedSource)
+ } catch {
+ case e: Exception if !Utils.isTesting && sqlContext.conf.wholeStageFallback =>
+ // We should already saw the error message
+ logWarning(s"Whole-stage codegen disabled for this plan:\n $treeString")
+ return child.execute()
+ }
val references = ctx.references.toArray
val durationMs = longMetric("pipelineTime")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 437e093825..27b1fffe27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -433,7 +433,14 @@ object SQLConf {
.doc("The maximum number of fields (including nested fields) that will be supported before" +
" deactivating whole-stage codegen.")
.intConf
- .createWithDefault(200)
+ .createWithDefault(100)
+
+ val WHOLESTAGE_FALLBACK = SQLConfigBuilder("spark.sql.codegen.fallback")
+ .internal()
+ .doc("When true, whole stage codegen could be temporary disabled for the part of query that" +
+ " fail to compile generated code")
+ .booleanConf
+ .createWithDefault(true)
val MAX_CASES_BRANCHES = SQLConfigBuilder("spark.sql.codegen.maxCaseBranches")
.internal()
@@ -605,6 +612,8 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {
def wholeStageMaxNumFields: Int = getConf(WHOLESTAGE_MAX_NUM_FIELDS)
+ def wholeStageFallback: Boolean = getConf(WHOLESTAGE_FALLBACK)
+
def maxCaseBranchesForCodegen: Int = getConf(MAX_CASES_BRANCHES)
def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)