aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenchen Fan <wenchen@databricks.com>2016-09-22 23:25:32 +0800
committerCheng Lian <lian@databricks.com>2016-09-22 23:25:32 +0800
commit8a02410a92429bff50d6ce082f873cea9e9fa91e (patch)
tree738122924eacf57307d94423550459acb8155845
parent72d9fba26c19aae73116fd0d00b566967934c6fc (diff)
downloadspark-8a02410a92429bff50d6ce082f873cea9e9fa91e.tar.gz
spark-8a02410a92429bff50d6ce082f873cea9e9fa91e.tar.bz2
spark-8a02410a92429bff50d6ce082f873cea9e9fa91e.zip
[SQL][MINOR] correct the comment of SortBasedAggregationIterator.safeProj
## What changes were proposed in this pull request? This comment went stale long time ago, this PR fixes it according to my understanding. ## How was this patch tested? N/A Author: Wenchen Fan <wenchen@databricks.com> Closes #15095 from cloud-fan/update-comment.
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala11
1 files changed, 9 insertions, 2 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
index 3f7f849885..c2b1ef0fe3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala
@@ -86,8 +86,15 @@ class SortBasedAggregationIterator(
// The aggregation buffer used by the sort-based aggregation.
private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer
- // A SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be
- // compared to MutableRow (aggregation buffer) directly.
+ // This safe projection is used to turn the input row into safe row. This is necessary
+ // because the input row may be produced by unsafe projection in child operator and all the
+ // produced rows share one byte array. However, when we update the aggregate buffer according to
+ // the input row, we may cache some values from input row, e.g. `Max` will keep the max value from
+ // input row via MutableProjection, `CollectList` will keep all values in an array via
+ // ImperativeAggregate framework. These values may get changed unexpectedly if the underlying
+ // unsafe projection update the shared byte array. By applying a safe projection to the input row,
+ // we can cut down the connection from input row to the shared byte array, and thus it's safe to
+ // cache values from input row while updating the aggregation buffer.
private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType))
protected def initialize(): Unit = {