diff options
author | Reynold Xin <rxin@databricks.com> | 2015-08-20 07:53:27 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-08-20 07:53:40 -0700 |
commit | 5be517584be0c78dc4641a4aa14ea9da05ed344d (patch) | |
tree | 9531a7055403d7eed1104323bd77d4b2e1002898 /sql | |
parent | 675e2249472fbadecb5c8f8da6ae8ff7a1f05305 (diff) | |
download | spark-5be517584be0c78dc4641a4aa14ea9da05ed344d.tar.gz spark-5be517584be0c78dc4641a4aa14ea9da05ed344d.tar.bz2 spark-5be517584be0c78dc4641a4aa14ea9da05ed344d.zip |
[SPARK-10100] [SQL] Eliminate hash table lookup if there is no grouping key in aggregation.
This improves performance by ~ 20 - 30% in one of my local test and should fix the performance regression from 1.4 to 1.5 on ss_max.
Author: Reynold Xin <rxin@databricks.com>
Closes #8332 from rxin/SPARK-10100.
(cherry picked from commit b4f4e91c395cb69ced61d9ff1492d1b814f96828)
Signed-off-by: Yin Huai <yhuai@databricks.com>
Diffstat (limited to 'sql')
2 files changed, 22 insertions, 10 deletions
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala index 99f51ba5b6..ba379d358d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregate.scala @@ -104,7 +104,7 @@ case class TungstenAggregate( } else { // This is a grouped aggregate and the input iterator is empty, // so return an empty iterator. - Iterator[UnsafeRow]() + Iterator.empty } } else { aggregationIterator.start(parentIterator) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index af7e0fcedb..26fdbc83ef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -357,18 +357,30 @@ class TungstenAggregationIterator( // sort-based aggregation (by calling switchToSortBasedAggregation). private def processInputs(): Unit = { assert(inputIter != null, "attempted to process input when iterator was null") - while (!sortBased && inputIter.hasNext) { - val newInput = inputIter.next() - numInputRows += 1 - val groupingKey = groupProjection.apply(newInput) + if (groupingExpressions.isEmpty) { + // If there is no grouping expressions, we can just reuse the same buffer over and over again. + // Note that it would be better to eliminate the hash map entirely in the future. + val groupingKey = groupProjection.apply(null) val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) - if (buffer == null) { - // buffer == null means that we could not allocate more memory. - // Now, we need to spill the map and switch to sort-based aggregation. - switchToSortBasedAggregation(groupingKey, newInput) - } else { + while (inputIter.hasNext) { + val newInput = inputIter.next() + numInputRows += 1 processRow(buffer, newInput) } + } else { + while (!sortBased && inputIter.hasNext) { + val newInput = inputIter.next() + numInputRows += 1 + val groupingKey = groupProjection.apply(newInput) + val buffer: UnsafeRow = hashMap.getAggregationBufferFromUnsafeRow(groupingKey) + if (buffer == null) { + // buffer == null means that we could not allocate more memory. + // Now, we need to spill the map and switch to sort-based aggregation. + switchToSortBasedAggregation(groupingKey, newInput) + } else { + processRow(buffer, newInput) + } + } } } |