diff options
author | Andrew Or <andrew@databricks.com> | 2015-09-18 23:58:25 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-09-18 23:58:25 -0700 |
commit | 7ff8d68cc19299e16dedfd819b9e96480fa6cf44 (patch) | |
tree | bf4851837bc99d35cdc6ac2c00ad46511835affb /core | |
parent | 22be2ae147a111e88896f6fb42ed46bbf108a99b (diff) | |
download | spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.gz spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.bz2 spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.zip |
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception:
```
java.io.IOException: Could not acquire 65536 bytes of memory
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169)
at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220)
at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126)
at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257)
at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435)
```
Author: Andrew Or <andrew@databricks.com>
Closes #8827 from andrewor14/allocate-pointer-array.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java | 14 |
1 files changed, 12 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index fc364e0a89..14b6aafdea 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -159,7 +159,7 @@ public final class UnsafeExternalSorter { /** * Allocates new sort data structures. Called when creating the sorter and after each spill. */ - private void initializeForWriting() throws IOException { + public void initializeForWriting() throws IOException { this.writeMetrics = new ShuffleWriteMetrics(); final long pointerArrayMemory = UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize); @@ -187,6 +187,14 @@ public final class UnsafeExternalSorter { * Sort and spill the current records in response to memory pressure. */ public void spill() throws IOException { + spill(true); + } + + /** + * Sort and spill the current records in response to memory pressure. + * @param shouldInitializeForWriting whether to allocate memory for writing after the spill + */ + public void spill(boolean shouldInitializeForWriting) throws IOException { assert(inMemSorter != null); logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), @@ -217,7 +225,9 @@ public final class UnsafeExternalSorter { // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - initializeForWriting(); + if (shouldInitializeForWriting) { + initializeForWriting(); + } } /** |