aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-07-09 09:20:16 -0700
committerDavies Liu <davies.liu@gmail.com>2015-07-09 09:20:16 -0700
commit23448a9e988a1b92bd05ee8c6c1a096c83375a12 (patch)
treee890dcdf7317b0c06204d88ce6d4aa40d63eca27
parentf88b12537ee81d914ef7c51a08f80cb28d93c8ed (diff)
downloadspark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.tar.gz
spark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.tar.bz2
spark-23448a9e988a1b92bd05ee8c6c1a096c83375a12.zip
[SPARK-8931] [SQL] Fallback to interpreted evaluation if failed to compile in codegen
Exception will not be catched during tests. cc marmbrus rxin Author: Davies Liu <davies@databricks.com> Closes #7309 from davies/fallback and squashes the following commits: 969a612 [Davies Liu] throw exception during tests f844f77 [Davies Liu] fallback a3091bc [Davies Liu] Merge branch 'master' of github.com:apache/spark into fallback 364a0d6 [Davies Liu] fallback to interpret mode if failed to compile
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala51
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala13
2 files changed, 58 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index ca53186383..4d7d8626a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -153,12 +153,24 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
buf.toArray.map(converter(_).asInstanceOf[Row])
}
+ private[this] def isTesting: Boolean = sys.props.contains("spark.testing")
+
protected def newProjection(
expressions: Seq[Expression], inputSchema: Seq[Attribute]): Projection = {
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
- GenerateProjection.generate(expressions, inputSchema)
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate projection, fallback to interpret", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
} else {
new InterpretedProjection(expressions, inputSchema)
}
@@ -170,17 +182,36 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if(codegenEnabled) {
- GenerateMutableProjection.generate(expressions, inputSchema)
+ try {
+ GenerateMutableProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate mutable projection, fallback to interpreted", e)
+ () => new InterpretedMutableProjection(expressions, inputSchema)
+ }
+ }
} else {
() => new InterpretedMutableProjection(expressions, inputSchema)
}
}
-
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
if (codegenEnabled) {
- GeneratePredicate.generate(expression, inputSchema)
+ try {
+ GeneratePredicate.generate(expression, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate predicate, fallback to interpreted", e)
+ InterpretedPredicate.create(expression, inputSchema)
+ }
+ }
} else {
InterpretedPredicate.create(expression, inputSchema)
}
@@ -190,7 +221,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
order: Seq[SortOrder],
inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
if (codegenEnabled) {
- GenerateOrdering.generate(order, inputSchema)
+ try {
+ GenerateOrdering.generate(order, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (isTesting) {
+ throw e
+ } else {
+ log.error("Failed to generate ordering, fallback to interpreted", e)
+ new RowOrdering(order, inputSchema)
+ }
+ }
} else {
new RowOrdering(order, inputSchema)
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index ecbc889770..9189d17611 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -276,7 +276,18 @@ private[sql] case class InsertIntoHadoopFsRelation(
log.debug(
s"Creating Projection: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
if (codegenEnabled) {
- GenerateProjection.generate(expressions, inputSchema)
+
+ try {
+ GenerateProjection.generate(expressions, inputSchema)
+ } catch {
+ case e: Exception =>
+ if (sys.props.contains("spark.testing")) {
+ throw e
+ } else {
+ log.error("failed to generate projection, fallback to interpreted", e)
+ new InterpretedProjection(expressions, inputSchema)
+ }
+ }
} else {
new InterpretedProjection(expressions, inputSchema)
}