aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorCheng Hao <hao.cheng@intel.com>2014-05-15 22:12:34 -0700
committerReynold Xin <rxin@apache.org>2014-05-15 22:12:34 -0700
commita20fea98811d98958567780815fcf0d4fb4e28d4 (patch)
treee9c3ea380653ca6168016f079309d5caefdd17da /sql
parentbb98ecafce196ecc5bc3a1e4cc9264df7b752c6a (diff)
downloadspark-a20fea98811d98958567780815fcf0d4fb4e28d4.tar.gz
spark-a20fea98811d98958567780815fcf0d4fb4e28d4.tar.bz2
spark-a20fea98811d98958567780815fcf0d4fb4e28d4.zip
[Spark-1461] Deferred Expression Evaluation (short-circuit evaluation)
This patch unify the foldable & nullable interface for Expression. 1) Deterministic-less UDF (like Rand()) can not be folded. 2) Short-circut will significantly improves the performance in Expression Evaluation, however, the stateful UDF should not be ignored in a short-circuit evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, row_sequence() can not be ignored even if col1 > 0 is false) I brought an concept of DeferredObject from Hive, which has 2 kinds of children classes (EagerResult / DeferredResult), the former requires triggering the evaluation before it's created, while the later trigger the evaluation when first called its get() method. Author: Cheng Hao <hao.cheng@intel.com> Closes #446 from chenghao-intel/expression_deferred_evaluation and squashes the following commits: d2729de [Cheng Hao] Fix the codestyle issues a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation
Diffstat (limited to 'sql')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala47
-rw-r--r--sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala28
2 files changed, 53 insertions, 22 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
index 6ee479939d..d111578530 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala
@@ -98,13 +98,19 @@ case class And(left: Expression, right: Expression) extends BinaryPredicate {
override def eval(input: Row): Any = {
val l = left.eval(input)
- val r = right.eval(input)
- if (l == false || r == false) {
- false
- } else if (l == null || r == null ) {
- null
+ if (l == false) {
+ false
} else {
- true
+ val r = right.eval(input)
+ if (r == false) {
+ false
+ } else {
+ if (l != null && r != null) {
+ true
+ } else {
+ null
+ }
+ }
}
}
}
@@ -114,13 +120,19 @@ case class Or(left: Expression, right: Expression) extends BinaryPredicate {
override def eval(input: Row): Any = {
val l = left.eval(input)
- val r = right.eval(input)
- if (l == true || r == true) {
+ if (l == true) {
true
- } else if (l == null || r == null) {
- null
} else {
- false
+ val r = right.eval(input)
+ if (r == true) {
+ true
+ } else {
+ if (l != null && r != null) {
+ false
+ } else {
+ null
+ }
+ }
}
}
}
@@ -133,8 +145,12 @@ case class Equals(left: Expression, right: Expression) extends BinaryComparison
def symbol = "="
override def eval(input: Row): Any = {
val l = left.eval(input)
- val r = right.eval(input)
- if (l == null || r == null) null else l == r
+ if (l == null) {
+ null
+ } else {
+ val r = right.eval(input)
+ if (r == null) null else l == r
+ }
}
}
@@ -162,7 +178,7 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
extends Expression {
def children = predicate :: trueValue :: falseValue :: Nil
- def nullable = trueValue.nullable || falseValue.nullable
+ override def nullable = trueValue.nullable || falseValue.nullable
def references = children.flatMap(_.references).toSet
override lazy val resolved = childrenResolved && trueValue.dataType == falseValue.dataType
def dataType = {
@@ -175,8 +191,9 @@ case class If(predicate: Expression, trueValue: Expression, falseValue: Expressi
}
type EvaluatedType = Any
+
override def eval(input: Row): Any = {
- if (predicate.eval(input).asInstanceOf[Boolean]) {
+ if (true == predicate.eval(input)) {
trueValue.eval(input)
} else {
falseValue.eval(input)
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
index d50e2c65b7..5729020423 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala
@@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression])
isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable)
}
+ protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({
+ new DeferredObjectAdapter
+ })
+
+ // Adapter from Catalyst ExpressionResult to Hive DeferredObject
+ class DeferredObjectAdapter extends DeferredObject {
+ private var func: () => Any = _
+ def set(func: () => Any) {
+ this.func = func
+ }
+ override def prepare(i: Int) = {}
+ override def get(): AnyRef = wrap(func())
+ }
+
val dataType: DataType = inspectorToDataType(returnInspector)
override def eval(input: Row): Any = {
returnInspector // Make sure initialized.
- val args = children.map { v =>
- new DeferredObject {
- override def prepare(i: Int) = {}
- override def get(): AnyRef = wrap(v.eval(input))
- }
- }.toArray
- unwrap(function.evaluate(args))
+ var i = 0
+ while (i < children.length) {
+ val idx = i
+ deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)})
+ i += 1
+ }
+ unwrap(function.evaluate(deferedObjects))
}
}