diff options
author | Yin Huai <yhuai@databricks.com> | 2015-07-28 19:01:25 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-07-28 19:01:25 -0700 |
commit | 3744b7fd42e52011af60cc205fcb4e4b23b35c68 (patch) | |
tree | cd48bb28d354e3da5550f6b2be56a6dc618742c3 /sql/catalyst | |
parent | e78ec1a8fabfe409c92c4904208f53dbdcfcf139 (diff) | |
download | spark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.tar.gz spark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.tar.bz2 spark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.zip |
[SPARK-9422] [SQL] Remove the placeholder attributes used in the aggregation buffers
https://issues.apache.org/jira/browse/SPARK-9422
Author: Yin Huai <yhuai@databricks.com>
Closes #7737 from yhuai/removePlaceHolder and squashes the following commits:
ec29b44 [Yin Huai] Remove placeholder attributes.
Diffstat (limited to 'sql/catalyst')
-rw-r--r-- | sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala | 27 |
1 files changed, 24 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala index 10bd19c8a8..9fb7623172 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala @@ -103,9 +103,30 @@ abstract class AggregateFunction2 final override def foldable: Boolean = false /** - * The offset of this function's buffer in the underlying buffer shared with other functions. + * The offset of this function's start buffer value in the + * underlying shared mutable aggregation buffer. + * For example, we have two aggregate functions `avg(x)` and `avg(y)`, which share + * the same aggregation buffer. In this shared buffer, the position of the first + * buffer value of `avg(x)` will be 0 and the position of the first buffer value of `avg(y)` + * will be 2. */ - var bufferOffset: Int = 0 + var mutableBufferOffset: Int = 0 + + /** + * The offset of this function's start buffer value in the + * underlying shared input aggregation buffer. An input aggregation buffer is used + * when we merge two aggregation buffers and it is basically the immutable one + * (we merge an input aggregation buffer and a mutable aggregation buffer and + * then store the new buffer values to the mutable aggregation buffer). + * Usually, an input aggregation buffer also contain extra elements like grouping + * keys at the beginning. So, mutableBufferOffset and inputBufferOffset are often + * different. + * For example, we have a grouping expression `key``, and two aggregate functions + * `avg(x)` and `avg(y)`. In this shared input aggregation buffer, the position of the first + * buffer value of `avg(x)` will be 1 and the position of the first buffer value of `avg(y)` + * will be 3 (position 0 is used for the value of key`). + */ + var inputBufferOffset: Int = 0 /** The schema of the aggregation buffer. */ def bufferSchema: StructType @@ -176,7 +197,7 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable w override def initialize(buffer: MutableRow): Unit = { var i = 0 while (i < bufferAttributes.size) { - buffer(i + bufferOffset) = initialValues(i).eval() + buffer(i + mutableBufferOffset) = initialValues(i).eval() i += 1 } } |