diff options
author | Cheng Hao <hao.cheng@intel.com> | 2014-05-15 22:12:34 -0700 |
---|---|---|
committer | Reynold Xin <rxin@apache.org> | 2014-05-15 22:12:34 -0700 |
commit | a20fea98811d98958567780815fcf0d4fb4e28d4 (patch) | |
tree | e9c3ea380653ca6168016f079309d5caefdd17da /sql/hive | |
parent | bb98ecafce196ecc5bc3a1e4cc9264df7b752c6a (diff) | |
download | spark-a20fea98811d98958567780815fcf0d4fb4e28d4.tar.gz spark-a20fea98811d98958567780815fcf0d4fb4e28d4.tar.bz2 spark-a20fea98811d98958567780815fcf0d4fb4e28d4.zip |
[Spark-1461] Deferred Expression Evaluation (short-circuit evaluation)
This patch unify the foldable & nullable interface for Expression.
1) Deterministic-less UDF (like Rand()) can not be folded.
2) Short-circut will significantly improves the performance in Expression Evaluation, however, the stateful UDF should not be ignored in a short-circuit evaluation(e.g. in expression: col1 > 0 and row_sequence() < 1000, row_sequence() can not be ignored even if col1 > 0 is false)
I brought an concept of DeferredObject from Hive, which has 2 kinds of children classes (EagerResult / DeferredResult), the former requires triggering the evaluation before it's created, while the later trigger the evaluation when first called its get() method.
Author: Cheng Hao <hao.cheng@intel.com>
Closes #446 from chenghao-intel/expression_deferred_evaluation and squashes the following commits:
d2729de [Cheng Hao] Fix the codestyle issues
a08f09c [Cheng Hao] fix bug in or/and short-circuit evaluation
af2236b [Cheng Hao] revert the short-circuit expression evaluation for IF
b7861d2 [Cheng Hao] Add Support for Deferred Expression Evaluation
Diffstat (limited to 'sql/hive')
-rw-r--r-- | sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala | 28 |
1 files changed, 21 insertions, 7 deletions
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala index d50e2c65b7..5729020423 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala @@ -248,17 +248,31 @@ private[hive] case class HiveGenericUdf(name: String, children: Seq[Expression]) isUDFDeterministic && children.foldLeft(true)((prev, n) => prev && n.foldable) } + protected lazy val deferedObjects = Array.fill[DeferredObject](children.length)({ + new DeferredObjectAdapter + }) + + // Adapter from Catalyst ExpressionResult to Hive DeferredObject + class DeferredObjectAdapter extends DeferredObject { + private var func: () => Any = _ + def set(func: () => Any) { + this.func = func + } + override def prepare(i: Int) = {} + override def get(): AnyRef = wrap(func()) + } + val dataType: DataType = inspectorToDataType(returnInspector) override def eval(input: Row): Any = { returnInspector // Make sure initialized. - val args = children.map { v => - new DeferredObject { - override def prepare(i: Int) = {} - override def get(): AnyRef = wrap(v.eval(input)) - } - }.toArray - unwrap(function.evaluate(args)) + var i = 0 + while (i < children.length) { + val idx = i + deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() => {children(idx).eval(input)}) + i += 1 + } + unwrap(function.evaluate(deferedObjects)) } } |