diff options
author | Cheng Lian <lian@databricks.com> | 2016-04-01 22:00:24 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-01 22:00:24 -0700 |
commit | 27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (patch) | |
tree | 8170697ae9631a2f9b307a785ef221bd3fd7b60f /sql/core/src/main | |
parent | 4fc35e6f5c590feb47cbcb5b1136f2e985677b3f (diff) | |
download | spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.gz spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.bz2 spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.zip |
[SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor side when evaluating window functions
## What changes were proposed in this pull request?
`SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side.
## How was this patch tested?
A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters.
Author: Cheng Lian <lian@databricks.com>
Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r-- | sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala | 22 |
1 files changed, 16 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala index 7acf020b28..7d0567842c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala @@ -874,7 +874,8 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame( * processor class. */ private[execution] object AggregateProcessor { - def apply(functions: Array[Expression], + def apply( + functions: Array[Expression], ordinal: Int, inputAttributes: Seq[Attribute], newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => MutableProjection): @@ -885,11 +886,20 @@ private[execution] object AggregateProcessor { val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp) val imperatives = mutable.Buffer.empty[ImperativeAggregate] + // SPARK-14244: `SizeBasedWindowFunction`s are firstly created on driver side and then + // serialized to executor side. These functions all reference a global singleton window + // partition size attribute reference, i.e., `SizeBasedWindowFunction.n`. Here we must collect + // the singleton instance created on driver side instead of using executor side + // `SizeBasedWindowFunction.n` to avoid binding failure caused by mismatching expression ID. + val partitionSize: Option[AttributeReference] = { + val aggs = functions.flatMap(_.collectFirst { case f: SizeBasedWindowFunction => f }) + aggs.headOption.map(_.n) + } + // Check if there are any SizeBasedWindowFunctions. If there are, we add the partition size to // the aggregation buffer. Note that the ordinal of the partition size value will always be 0. - val trackPartitionSize = functions.exists(_.isInstanceOf[SizeBasedWindowFunction]) - if (trackPartitionSize) { - aggBufferAttributes += SizeBasedWindowFunction.n + partitionSize.foreach { n => + aggBufferAttributes += n initialValues += NoOp updateExpressions += NoOp } @@ -920,7 +930,7 @@ private[execution] object AggregateProcessor { // Create the projections. val initialProjection = newMutableProjection( initialValues, - Seq(SizeBasedWindowFunction.n))() + partitionSize.toSeq)() val updateProjection = newMutableProjection( updateExpressions, aggBufferAttributes ++ inputAttributes)() @@ -935,7 +945,7 @@ private[execution] object AggregateProcessor { updateProjection, evaluateProjection, imperatives.toArray, - trackPartitionSize) + partitionSize.isDefined) } } |