aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorCheng Lian <lian@databricks.com>2016-04-01 22:00:24 -0700
committerYin Huai <yhuai@databricks.com>2016-04-01 22:00:24 -0700
commit27e71a2cd930ae28c82c9c3ee6476a12ea165fdf (patch)
tree8170697ae9631a2f9b307a785ef221bd3fd7b60f /sql/catalyst
parent4fc35e6f5c590feb47cbcb5b1136f2e985677b3f (diff)
downloadspark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.gz
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.tar.bz2
spark-27e71a2cd930ae28c82c9c3ee6476a12ea165fdf.zip
[SPARK-14244][SQL] Don't use SizeBasedWindowFunction.n created on executor side when evaluating window functions
## What changes were proposed in this pull request? `SizeBasedWindowFunction.n` is a global singleton attribute created for evaluating size based aggregate window functions like `CUME_DIST`. However, this attribute gets different expression IDs when created on both driver side and executor side. This PR adds `withPartitionSize` method to `SizeBasedWindowFunction` so that we can easily rewrite `SizeBasedWindowFunction.n` on executor side. ## How was this patch tested? A test case is added in `HiveSparkSubmitSuite`, which supports launching multi-process clusters. Author: Cheng Lian <lian@databricks.com> Closes #12040 from liancheng/spark-14244-fix-sized-window-function.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala6
1 files changed, 5 insertions, 1 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
index b8679474cf..c0b453dccf 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/windowExpressions.scala
@@ -451,7 +451,11 @@ abstract class RowNumberLike extends AggregateWindowFunction {
* A [[SizeBasedWindowFunction]] needs the size of the current window for its calculation.
*/
trait SizeBasedWindowFunction extends AggregateWindowFunction {
- protected def n: AttributeReference = SizeBasedWindowFunction.n
+ // It's made a val so that the attribute created on driver side is serialized to executor side.
+ // Otherwise, if it's defined as a function, when it's called on executor side, it actually
+ // returns the singleton value instantiated on executor side, which has different expression ID
+ // from the one created on driver side.
+ val n: AttributeReference = SizeBasedWindowFunction.n
}
object SizeBasedWindowFunction {