aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-05 15:07:43 -0800
committerDavies Liu <davies.liu@gmail.com>2016-02-05 15:07:43 -0800
commit875f5079290a06c12d44de657dfeabc913e4a303 (patch)
tree4389e54322a8e0161d84e78f0724fc04113fc713
parent0bb5b73387e60ef007b415fba69a3e1e89a4f013 (diff)
downloadspark-875f5079290a06c12d44de657dfeabc913e4a303.tar.gz
spark-875f5079290a06c12d44de657dfeabc913e4a303.tar.bz2
spark-875f5079290a06c12d44de657dfeabc913e4a303.zip
[SPARK-13215] [SQL] remove fallback in codegen
Since we remove the configuration for codegen, we are heavily reply on codegen (also TungstenAggregate require the generated MutableProjection to update UnsafeRow), should remove the fallback, which could make user confusing, see the discussion in SPARK-13116. Author: Davies Liu <davies@databricks.com> Closes #11097 from davies/remove_fallback.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala26
3 files changed, 8 insertions, 66 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b19b772409..3cc99d3c7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- try {
- GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
+ GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
+ GeneratePredicate.generate(expression, inputSchema)
}
protected def newOrdering(
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
- try {
- GenerateOrdering.generate(order, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate ordering, fallback to interpreted", e)
- new InterpretedOrdering(order, inputSchema)
- }
- }
+ GenerateOrdering.generate(order, inputSchema)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 5a19920add..812e696338 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
+import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
@@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
- try {
- GenerateMutableProjection.generate(children, inputAttributes)()
- } catch {
- case e: Exception =>
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- new InterpretedMutableProjection(children, inputAttributes)
- }
+ GenerateMutableProjection.generate(children, inputAttributes)()
}
private[this] lazy val inputToScalaConverters: Any => Any =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index a0dfe996cc..8726e48781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.local
-import scala.util.control.NonFatal
-
import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- try {
- GenerateMutableProjection.generate(expressions, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
+ GenerateMutableProjection.generate(expressions, inputSchema)
}
protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
+ GeneratePredicate.generate(expression, inputSchema)
}
}