From 8a02410a92429bff50d6ce082f873cea9e9fa91e Mon Sep 17 00:00:00 2001 From: Wenchen Fan Date: Thu, 22 Sep 2016 23:25:32 +0800 Subject: [SQL][MINOR] correct the comment of SortBasedAggregationIterator.safeProj ## What changes were proposed in this pull request? This comment went stale long time ago, this PR fixes it according to my understanding. ## How was this patch tested? N/A Author: Wenchen Fan Closes #15095 from cloud-fan/update-comment. --- .../execution/aggregate/SortBasedAggregationIterator.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala index 3f7f849885..c2b1ef0fe3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortBasedAggregationIterator.scala @@ -86,8 +86,15 @@ class SortBasedAggregationIterator( // The aggregation buffer used by the sort-based aggregation. private[this] val sortBasedAggregationBuffer: MutableRow = newBuffer - // A SafeProjection to turn UnsafeRow into GenericInternalRow, because UnsafeRow can't be - // compared to MutableRow (aggregation buffer) directly. + // This safe projection is used to turn the input row into safe row. This is necessary + // because the input row may be produced by unsafe projection in child operator and all the + // produced rows share one byte array. However, when we update the aggregate buffer according to + // the input row, we may cache some values from input row, e.g. `Max` will keep the max value from + // input row via MutableProjection, `CollectList` will keep all values in an array via + // ImperativeAggregate framework. These values may get changed unexpectedly if the underlying + // unsafe projection update the shared byte array. By applying a safe projection to the input row, + // we can cut down the connection from input row to the shared byte array, and thus it's safe to + // cache values from input row while updating the aggregation buffer. private[this] val safeProj: Projection = FromUnsafeProjection(valueAttributes.map(_.dataType)) protected def initialize(): Unit = { -- cgit v1.2.3