aboutsummaryrefslogtreecommitdiff
path: root/sql/core/src/main
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-01 22:00:24 -0700
committerYin Huai <yhuai@databricks.com>2016-04-01 22:00:24 -0700
commit27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (patch)
tree8170697ae9631a2f9b307a785ef221bd3fd7b60f /sql/core/src/main
parent4fc35e6f5c590feb47cbcb5b1136f2e985677b3f (diff)
downloadspark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.gz
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.bz2
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.zip
[SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor side when evaluating window functions
## What changes were proposed in this pull request? `SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side. ## How was this patch tested? A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters. Author: Cheng Lian <lian@databricks.com> Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
Diffstat (limited to 'sql/core/src/main')
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala22
1 files changed, 16 insertions, 6 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
index 7acf020b28..7d0567842c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Window.scala
@@ -874,7 +874,8 @@ private[execution] final class UnboundedFollowingWindowFunctionFrame(
* processor class.
*/
private[execution] object AggregateProcessor {
- def apply(functions: Array[Expression],
+ def apply(
+ functions: Array[Expression],
ordinal: Int,
inputAttributes: Seq[Attribute],
newMutableProjection: (Seq[Expression], Seq[Attribute]) => () => MutableProjection):
@@ -885,11 +886,20 @@ private[execution] object AggregateProcessor {
val evaluateExpressions = mutable.Buffer.fill[Expression](ordinal)(NoOp)
val imperatives = mutable.Buffer.empty[ImperativeAggregate]
+ // SPARK-14244: `SizeBasedWindowFunction`s are firstly created on driver side and then
+ // serialized to executor side. These functions all reference a global singleton window
+ // partition size attribute reference, i.e., `SizeBasedWindowFunction.n`. Here we must collect
+ // the singleton instance created on driver side instead of using executor side
+ // `SizeBasedWindowFunction.n` to avoid binding failure caused by mismatching expression ID.
+ val partitionSize: Option[AttributeReference] = {
+ val aggs = functions.flatMap(_.collectFirst { case f: SizeBasedWindowFunction => f })
+ aggs.headOption.map(_.n)
+ }
+
// Check if there are any SizeBasedWindowFunctions. If there are, we add the partition size to
// the aggregation buffer. Note that the ordinal of the partition size value will always be 0.
- val trackPartitionSize = functions.exists(_.isInstanceOf[SizeBasedWindowFunction])
- if (trackPartitionSize) {
- aggBufferAttributes += SizeBasedWindowFunction.n
+ partitionSize.foreach { n =>
+ aggBufferAttributes += n
initialValues += NoOp
updateExpressions += NoOp
}
@@ -920,7 +930,7 @@ private[execution] object AggregateProcessor {
// Create the projections.
val initialProjection = newMutableProjection(
initialValues,
- Seq(SizeBasedWindowFunction.n))()
+ partitionSize.toSeq)()
val updateProjection = newMutableProjection(
updateExpressions,
aggBufferAttributes ++ inputAttributes)()
@@ -935,7 +945,7 @@ private[execution] object AggregateProcessor {
updateProjection,
evaluateProjection,
imperatives.toArray,
- trackPartitionSize)
+ partitionSize.isDefined)
}
}