diff options
author | Andrew Or <andrew@databricks.com> | 2015-08-07 14:20:13 -0700 |
---|---|---|
committer | Yin Huai <yhuai@databricks.com> | 2015-08-07 14:20:13 -0700 |
commit | 881548ab20fa4c4b635c51d956b14bd13981e2f4 (patch) | |
tree | c29a335f8f90d664a77bab926d866468922c762b /sql | |
parent | 05d04e10a8ea030bea840c3c5ba93ecac479a039 (diff) | |
download | spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.gz spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.bz2 spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.zip |
[SPARK-9674] Re-enable ignored test in SQLQuerySuite
The original code that this test tests is removed in https://github.com/apache/spark/commit/9270bd06fd0b16892e3f37213b5bc7813ea11fdd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass.
JoshRosen yhuai
Author: Andrew Or <andrew@databricks.com>
Closes #8015 from andrewor14/SPARK-9674 and squashes the following commits:
225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674
8c24209 [Andrew Or] Fix NPE
e541d64 [Andrew Or] Track aggregation memory for both sort and hash
0be3a42 [Andrew Or] Fix test
Diffstat (limited to 'sql')
4 files changed, 39 insertions, 15 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java index efb33530da..b08a4a13a2 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java @@ -210,11 +210,10 @@ public final class UnsafeFixedWidthAggregationMap { } /** - * The memory used by this map's managed structures, in bytes. - * Note that this is also the peak memory used by this map, since the map is append-only. + * Return the peak memory used so far, in bytes. */ - public long getMemoryUsage() { - return map.getTotalMemoryConsumption(); + public long getPeakMemoryUsedBytes() { + return map.getPeakMemoryUsedBytes(); } /** diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 9a65c9d3a4..69d6784713 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -160,6 +160,13 @@ public final class UnsafeKVExternalSorter { } /** + * Return the peak memory used so far, in bytes. + */ + public long getPeakMemoryUsedBytes() { + return sorter.getPeakMemoryUsedBytes(); + } + + /** * Marks the current page as no-more-space-available, and as a result, either allocate a * new page or spill when we see the next record. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala index 4d5e98a3e9..440bef32f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.execution.aggregate import org.apache.spark.unsafe.KVIterator -import org.apache.spark.{Logging, SparkEnv, TaskContext} +import org.apache.spark.{InternalAccumulator, Logging, SparkEnv, TaskContext} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner @@ -397,14 +397,20 @@ class TungstenAggregationIterator( private[this] var mapIteratorHasNext: Boolean = false /////////////////////////////////////////////////////////////////////////// - // Part 4: The function used to switch this iterator from hash-based - // aggregation to sort-based aggregation. + // Part 3: Methods and fields used by sort-based aggregation. /////////////////////////////////////////////////////////////////////////// + // This sorter is used for sort-based aggregation. It is initialized as soon as + // we switch from hash-based to sort-based aggregation. Otherwise, it is not used. + private[this] var externalSorter: UnsafeKVExternalSorter = null + + /** + * Switch to sort-based aggregation when the hash-based approach is unable to acquire memory. + */ private def switchToSortBasedAggregation(firstKey: UnsafeRow, firstInput: UnsafeRow): Unit = { logInfo("falling back to sort based aggregation.") // Step 1: Get the ExternalSorter containing sorted entries of the map. - val externalSorter: UnsafeKVExternalSorter = hashMap.destructAndCreateExternalSorter() + externalSorter = hashMap.destructAndCreateExternalSorter() // Step 2: Free the memory used by the map. hashMap.free() @@ -601,7 +607,7 @@ class TungstenAggregationIterator( } /////////////////////////////////////////////////////////////////////////// - // Par 7: Iterator's public methods. + // Part 7: Iterator's public methods. /////////////////////////////////////////////////////////////////////////// override final def hasNext: Boolean = { @@ -610,7 +616,7 @@ class TungstenAggregationIterator( override final def next(): UnsafeRow = { if (hasNext) { - if (sortBased) { + val res = if (sortBased) { // Process the current group. processCurrentSortedGroup() // Generate output row for the current group. @@ -641,6 +647,19 @@ class TungstenAggregationIterator( result } } + + // If this is the last record, update the task's peak memory usage. Since we destroy + // the map to create the sorter, their memory usages should not overlap, so it is safe + // to just use the max of the two. + if (!hasNext) { + val mapMemory = hashMap.getPeakMemoryUsedBytes + val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L) + val peakMemory = Math.max(mapMemory, sorterMemory) + TaskContext.get().internalMetricsToAccumulators( + InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory) + } + + res } else { // no more result throw new NoSuchElementException @@ -651,6 +670,7 @@ class TungstenAggregationIterator( // Part 8: A utility function used to generate a output row when there is no // input and there is no grouping expression. /////////////////////////////////////////////////////////////////////////// + def outputForEmptyGroupingKeyWithoutInput(): UnsafeRow = { if (groupingExpressions.isEmpty) { sortBasedAggregationBuffer.copyFrom(initialAggregationBuffer) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index c64aa7a07d..b14ef9bab9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -267,7 +267,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { if (!hasGeneratedAgg) { fail( s""" - |Codegen is enabled, but query $sqlText does not have GeneratedAggregate in the plan. + |Codegen is enabled, but query $sqlText does not have TungstenAggregate in the plan. |${df.queryExecution.simpleString} """.stripMargin) } @@ -1602,10 +1602,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils { Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123)))) } - ignore("aggregation with codegen updates peak execution memory") { - withSQLConf( - (SQLConf.CODEGEN_ENABLED.key, "true"), - (SQLConf.USE_SQL_AGGREGATE2.key, "false")) { + test("aggregation with codegen updates peak execution memory") { + withSQLConf((SQLConf.CODEGEN_ENABLED.key, "true")) { val sc = sqlContext.sparkContext AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "aggregation with codegen") { testCodeGen( |