diff options
author | Davies Liu <davies@databricks.com> | 2015-07-09 09:20:16 -0700 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2015-07-09 09:20:16 -0700 |
commit | 23448a9e988a1b92bd05ee8c6c1a096c83375a12 (patch) | |
tree | e890dcdf7317b0c06204d88ce6d4aa40d63eca27 | |
parent | f88b12537ee81d914ef7c51a08f80cb28d93c8ed (diff) | |
download | spark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.tar.gz spark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.tar.bz2 spark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.zip |
[SPARK-8931] [SQL] Fallback to interpreted evaluation if failed to compile in codegen
Exception will not be catched during tests.
cc marmbrus rxin
Author: Davies Liu <davies@databricks.com>
Closes #7309 from davies/fallback and squashes the following commits:
969a612 [Davies Liu] throw exception during tests
f844f77 [Davies Liu] fallback
a3091bc [Davies Liu] Merge branch 'master' of github.com:apache/spark into fallback
364a0d6 [Davies Liu] fallback to interpret mode if failed to compile
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala | 51 | ||||
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala | 13 |
2 files changed, 58 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ca53186383..4d7d8626a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -153,12 +153,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ buf.toArray.map(converter(_).asInstanceOf[Row]) } + private[this] def isTesting: Boolean = sys.props.contains("spark.testing") + protected def newProjection( expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = { log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled) { - GenerateProjection.generate(expressions, inputSchema) + try { + GenerateProjection.generate(expressions, inputSchema) + } catch { + case e: Exception => + if (isTesting) { + throw e + } else { + log.error("Failed to generate projection, fallback to interpret", e) + new InterpretedProjection(expressions, inputSchema) + } + } } else { new InterpretedProjection(expressions, inputSchema) } @@ -170,17 +182,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ log.debug( s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if(codegenEnabled) { - GenerateMutableProjection.generate(expressions, inputSchema) + try { + GenerateMutableProjection.generate(expressions, inputSchema) + } catch { + case e: Exception => + if (isTesting) { + throw e + } else { + log.error("Failed to generate mutable projection, fallback to interpreted", e) + () => new InterpretedMutableProjection(expressions, inputSchema) + } + } } else { () => new InterpretedMutableProjection(expressions, inputSchema) } } - protected def newPredicate( expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = { if (codegenEnabled) { - GeneratePredicate.generate(expression, inputSchema) + try { + GeneratePredicate.generate(expression, inputSchema) + } catch { + case e: Exception => + if (isTesting) { + throw e + } else { + log.error("Failed to generate predicate, fallback to interpreted", e) + InterpretedPredicate.create(expression, inputSchema) + } + } } else { InterpretedPredicate.create(expression, inputSchema) } @@ -190,7 +221,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = { if (codegenEnabled) { - GenerateOrdering.generate(order, inputSchema) + try { + GenerateOrdering.generate(order, inputSchema) + } catch { + case e: Exception => + if (isTesting) { + throw e + } else { + log.error("Failed to generate ordering, fallback to interpreted", e) + new RowOrdering(order, inputSchema) + } + } } else { new RowOrdering(order, inputSchema) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala index ecbc889770..9189d17611 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala @@ -276,7 +276,18 @@ private[sql] case class InsertIntoHadoopFsRelation( log.debug( s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled") if (codegenEnabled) { - GenerateProjection.generate(expressions, inputSchema) + + try { + GenerateProjection.generate(expressions, inputSchema) + } catch { + case e: Exception => + if (sys.props.contains("spark.testing")) { + throw e + } else { + log.error("failed to generate projection, fallback to interpreted", e) + new InterpretedProjection(expressions, inputSchema) + } + } } else { new InterpretedProjection(expressions, inputSchema) } |