aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala36
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala12
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala26
3 files changed, 8 insertions, 66 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index b19b772409..3cc99d3c7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -200,47 +200,17 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
inputSchema: Seq[Attribute],
useSubexprElimination: Boolean = false): () => MutableProjection = {
log.debug(s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- try {
- GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
+ GenerateMutableProjection.generate(expressions, inputSchema, useSubexprElimination)
}
protected def newPredicate(
expression: Expression, inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
+ GeneratePredicate.generate(expression, inputSchema)
}
protected def newOrdering(
order: Seq[SortOrder], inputSchema: Seq[Attribute]): Ordering[InternalRow] = {
- try {
- GenerateOrdering.generate(order, inputSchema)
- } catch {
- case e: Exception =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate ordering, fallback to interpreted", e)
- new InterpretedOrdering(order, inputSchema)
- }
- }
+ GenerateOrdering.generate(order, inputSchema)
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
index 5a19920add..812e696338 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/udaf.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.aggregate
import org.apache.spark.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
-import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, InterpretedMutableProjection, MutableRow}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateFunction, ImperativeAggregate}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, MutableRow}
+import org.apache.spark.sql.catalyst.expressions.aggregate.ImperativeAggregate
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateMutableProjection
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._
@@ -361,13 +361,7 @@ private[sql] case class ScalaUDAF(
val inputAttributes = childrenSchema.toAttributes
log.debug(
s"Creating MutableProj: $children, inputSchema: $inputAttributes.")
- try {
- GenerateMutableProjection.generate(children, inputAttributes)()
- } catch {
- case e: Exception =>
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- new InterpretedMutableProjection(children, inputAttributes)
- }
+ GenerateMutableProjection.generate(children, inputAttributes)()
}
private[this] lazy val inputToScalaConverters: Any => Any =
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
index a0dfe996cc..8726e48781 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/local/LocalNode.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql.execution.local
-import scala.util.control.NonFatal
-
import org.apache.spark.Logging
import org.apache.spark.sql.{Row, SQLConf}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
@@ -96,33 +94,13 @@ abstract class LocalNode(conf: SQLConf) extends QueryPlan[LocalNode] with Loggin
inputSchema: Seq[Attribute]): () => MutableProjection = {
log.debug(
s"Creating MutableProj: $expressions, inputSchema: $inputSchema")
- try {
- GenerateMutableProjection.generate(expressions, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate mutable projection, fallback to interpreted", e)
- () => new InterpretedMutableProjection(expressions, inputSchema)
- }
- }
+ GenerateMutableProjection.generate(expressions, inputSchema)
}
protected def newPredicate(
expression: Expression,
inputSchema: Seq[Attribute]): (InternalRow) => Boolean = {
- try {
- GeneratePredicate.generate(expression, inputSchema)
- } catch {
- case NonFatal(e) =>
- if (isTesting) {
- throw e
- } else {
- log.error("Failed to generate predicate, fallback to interpreted", e)
- InterpretedPredicate.create(expression, inputSchema)
- }
- }
+ GeneratePredicate.generate(expression, inputSchema)
}
}