diff options
author | Cheng Lian <lian@databricks.com> | 2016-04-01 22:00:24 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2016-04-01 22:00:24 -0700 |
commit | 27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (patch) | |
tree | 8170697ae9631a2f9b307a785ef221bd3fd7b60f /sql/catalyst | |
parent | 4fc35e6f5c590feb47cbcb5b1136f2e985677b3f (diff) | |
download | spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.gz spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.bz2 spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.zip |
[SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor side when evaluating window functions
## What changes were proposed in this pull request?
`SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side.
## How was this patch tested?
A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters.
Author: Cheng Lian <lian@databricks.com>
Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala | 6 |
1 files changed, 5 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala index b8679474cf..c0b453dccf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala @@ -451,7 +451,11 @@ abstract class RowNumberLike extends AggregateWindowFunction { * A [[SizeBasedWindowFunction]] needs the size of the current window for its calculation. */ trait SizeBasedWindowFunction extends AggregateWindowFunction { - protected def n: AttributeReference = SizeBasedWindowFunction.n + // It's made a val so that the attribute created on driver side is serialized to executor side. + // Otherwise, if it's defined as a function, when it's called on executor side, it actually + // returns the singleton value instantiated on executor side, which has different expression ID + // from the one created on driver side. + val n: AttributeReference = SizeBasedWindowFunction.n } object SizeBasedWindowFunction { |