aboutsummaryrefslogtreecommitdiff
path: root/sql
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-07 14:20:13 -0700
committerYin Huai <yhuai@databricks.com>2015-08-07 14:20:13 -0700
commit881548ab20fa4c4b635c51d956b14bd13981e2f4 (patch)
treec29a335f8f90d664a77bab926d866468922c762b /sql
parent05d04e10a8ea030bea840c3c5ba93ecac479a039 (diff)
downloadspark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.gz
spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.tar.bz2
spark-881548ab20fa4c4b635c51d956b14bd13981e2f4.zip
[SPARK-9674] Re-enable ignored test in SQLQuerySuite
The original code that this test tests is removed in https://github.com/apache/spark/commit/9270bd06fd0b16892e3f37213b5bc7813ea11fdd. It was ignored shortly before that so we never caught it. This patch re-enables the test and adds the code necessary to make it pass. JoshRosen yhuai Author: Andrew Or <andrew@databricks.com> Closes #8015 from andrewor14/SPARK-9674 and squashes the following commits: 225eac2 [Andrew Or] Merge branch 'master' of github.com:apache/spark into SPARK-9674 8c24209 [Andrew Or] Fix NPE e541d64 [Andrew Or] Track aggregation memory for both sort and hash 0be3a42 [Andrew Or] Fix test
Diffstat (limited to 'sql')
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java7
-rw-r--r--sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java7
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala32
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala8
4 files changed, 39 insertions, 15 deletions
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
index efb33530da..b08a4a13a2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMap.java
@@ -210,11 +210,10 @@ public final class UnsafeFixedWidthAggregationMap {
}
/**
- * The memory used by this map's managed structures, in bytes.
- * Note that this is also the peak memory used by this map, since the map is append-only.
+ * Return the peak memory used so far, in bytes.
*/
- public long getMemoryUsage() {
- return map.getTotalMemoryConsumption();
+ public long getPeakMemoryUsedBytes() {
+ return map.getPeakMemoryUsedBytes();
}
/**
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
index 9a65c9d3a4..69d6784713 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java
@@ -160,6 +160,13 @@ public final class UnsafeKVExternalSorter {
}
/**
+ * Return the peak memory used so far, in bytes.
+ */
+ public long getPeakMemoryUsedBytes() {
+ return sorter.getPeakMemoryUsedBytes();
+ }
+
+ /**
* Marks the current page as no-more-space-available, and as a result, either allocate a
* new page or spill when we see the next record.
*/
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
index 4d5e98a3e9..440bef32f4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/TungstenAggregationIterator.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.execution.aggregate
import org.apache.spark.unsafe.KVIterator
-import org.apache.spark.{Logging, SparkEnv, TaskContext}
+import org.apache.spark.{InternalAccumulator, Logging, SparkEnv, TaskContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.expressions.codegen.GenerateUnsafeRowJoiner
@@ -397,14 +397,20 @@ class TungstenAggregationIterator(
private[this] var mapIteratorHasNext: Boolean = false
///////////////////////////////////////////////////////////////////////////
- // Part 4: The function used to switch this iterator from hash-based
- // aggregation to sort-based aggregation.
+ // Part 3: Methods and fields used by sort-based aggregation.
///////////////////////////////////////////////////////////////////////////
+ // This sorter is used for sort-based aggregation. It is initialized as soon as
+ // we switch from hash-based to sort-based aggregation. Otherwise, it is not used.
+ private[this] var externalSorter: UnsafeKVExternalSorter = null
+
+ /**
+ * Switch to sort-based aggregation when the hash-based approach is unable to acquire memory.
+ */
private def switchToSortBasedAggregation(firstKey: UnsafeRow, firstInput: UnsafeRow): Unit = {
logInfo("falling back to sort based aggregation.")
// Step 1: Get the ExternalSorter containing sorted entries of the map.
- val externalSorter: UnsafeKVExternalSorter = hashMap.destructAndCreateExternalSorter()
+ externalSorter = hashMap.destructAndCreateExternalSorter()
// Step 2: Free the memory used by the map.
hashMap.free()
@@ -601,7 +607,7 @@ class TungstenAggregationIterator(
}
///////////////////////////////////////////////////////////////////////////
- // Par 7: Iterator's public methods.
+ // Part 7: Iterator's public methods.
///////////////////////////////////////////////////////////////////////////
override final def hasNext: Boolean = {
@@ -610,7 +616,7 @@ class TungstenAggregationIterator(
override final def next(): UnsafeRow = {
if (hasNext) {
- if (sortBased) {
+ val res = if (sortBased) {
// Process the current group.
processCurrentSortedGroup()
// Generate output row for the current group.
@@ -641,6 +647,19 @@ class TungstenAggregationIterator(
result
}
}
+
+ // If this is the last record, update the task's peak memory usage. Since we destroy
+ // the map to create the sorter, their memory usages should not overlap, so it is safe
+ // to just use the max of the two.
+ if (!hasNext) {
+ val mapMemory = hashMap.getPeakMemoryUsedBytes
+ val sorterMemory = Option(externalSorter).map(_.getPeakMemoryUsedBytes).getOrElse(0L)
+ val peakMemory = Math.max(mapMemory, sorterMemory)
+ TaskContext.get().internalMetricsToAccumulators(
+ InternalAccumulator.PEAK_EXECUTION_MEMORY).add(peakMemory)
+ }
+
+ res
} else {
// no more result
throw new NoSuchElementException
@@ -651,6 +670,7 @@ class TungstenAggregationIterator(
// Part 8: A utility function used to generate a output row when there is no
// input and there is no grouping expression.
///////////////////////////////////////////////////////////////////////////
+
def outputForEmptyGroupingKeyWithoutInput(): UnsafeRow = {
if (groupingExpressions.isEmpty) {
sortBasedAggregationBuffer.copyFrom(initialAggregationBuffer)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index c64aa7a07d..b14ef9bab9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -267,7 +267,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
if (!hasGeneratedAgg) {
fail(
s"""
- |Codegen is enabled, but query $sqlText does not have GeneratedAggregate in the plan.
+ |Codegen is enabled, but query $sqlText does not have TungstenAggregate in the plan.
|${df.queryExecution.simpleString}
""".stripMargin)
}
@@ -1602,10 +1602,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
Row(new CalendarInterval(-(12 * 3 - 3), -(7L * MICROS_PER_WEEK + 123))))
}
- ignore("aggregation with codegen updates peak execution memory") {
- withSQLConf(
- (SQLConf.CODEGEN_ENABLED.key, "true"),
- (SQLConf.USE_SQL_AGGREGATE2.key, "false")) {
+ test("aggregation with codegen updates peak execution memory") {
+ withSQLConf((SQLConf.CODEGEN_ENABLED.key, "true")) {
val sc = sqlContext.sparkContext
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "aggregation with codegen") {
testCodeGen(