aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst
diff options
context:
space:
mode:
authorYin Huai <yhuai@databricks.com>2015-07-28 19:01:25 -0700
committerYin Huai <yhuai@databricks.com>2015-07-28 19:01:25 -0700
commit3744b7fd42e52011af60cc205fcb4e4b23b35c68 (patch)
treecd48bb28d354e3da5550f6b2be56a6dc618742c3 /sql/catalyst
parente78ec1a8fabfe409c92c4904208f53dbdcfcf139 (diff)
downloadspark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.tar.gz
spark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.tar.bz2
spark-3744b7fd42e52011af60cc205fcb4e4b23b35c68.zip
[SPARK-9422] [SQL] Remove the placeholder attributes used in the aggregation buffers
https://issues.apache.org/jira/browse/SPARK-9422 Author: Yin Huai <yhuai@databricks.com> Closes #7737 from yhuai/removePlaceHolder and squashes the following commits: ec29b44 [Yin Huai] Remove placeholder attributes.
Diffstat (limited to 'sql/catalyst')
-rw-r--r--sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala27
1 files changed, 24 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
index 10bd19c8a8..9fb7623172 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/interfaces.scala
@@ -103,9 +103,30 @@ abstract class AggregateFunction2
final override def foldable: Boolean = false
/**
- * The offset of this function's buffer in the underlying buffer shared with other functions.
+ * The offset of this function's start buffer value in the
+ * underlying shared mutable aggregation buffer.
+ * For example, we have two aggregate functions `avg(x)` and `avg(y)`, which share
+ * the same aggregation buffer. In this shared buffer, the position of the first
+ * buffer value of `avg(x)` will be 0 and the position of the first buffer value of `avg(y)`
+ * will be 2.
*/
- var bufferOffset: Int = 0
+ var mutableBufferOffset: Int = 0
+
+ /**
+ * The offset of this function's start buffer value in the
+ * underlying shared input aggregation buffer. An input aggregation buffer is used
+ * when we merge two aggregation buffers and it is basically the immutable one
+ * (we merge an input aggregation buffer and a mutable aggregation buffer and
+ * then store the new buffer values to the mutable aggregation buffer).
+ * Usually, an input aggregation buffer also contain extra elements like grouping
+ * keys at the beginning. So, mutableBufferOffset and inputBufferOffset are often
+ * different.
+ * For example, we have a grouping expression `key``, and two aggregate functions
+ * `avg(x)` and `avg(y)`. In this shared input aggregation buffer, the position of the first
+ * buffer value of `avg(x)` will be 1 and the position of the first buffer value of `avg(y)`
+ * will be 3 (position 0 is used for the value of key`).
+ */
+ var inputBufferOffset: Int = 0
/** The schema of the aggregation buffer. */
def bufferSchema: StructType
@@ -176,7 +197,7 @@ abstract class AlgebraicAggregate extends AggregateFunction2 with Serializable w
override def initialize(buffer: MutableRow): Unit = {
var i = 0
while (i < bufferAttributes.size) {
- buffer(i + bufferOffset) = initialValues(i).eval()
+ buffer(i + mutableBufferOffset) = initialValues(i).eval()
i += 1
}
}