aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-18 23:58:25 -0700
committerAndrew Or <andrew@databricks.com>2015-09-18 23:58:25 -0700
commit7ff8d68cc19299e16dedfd819b9e96480fa6cf44 (patch)
treebf4851837bc99d35cdc6ac2c00ad46511835affb /core
parent22be2ae147a111e88896f6fb42ed46bbf108a99b (diff)
downloadspark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.gz
spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.tar.bz2
spark-7ff8d68cc19299e16dedfd819b9e96480fa6cf44.zip
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array
When `TungstenAggregation` hits memory pressure, it switches from hash-based to sort-based aggregation in-place. However, in the process we try to allocate the pointer array for writing to the new `UnsafeExternalSorter` *before* actually freeing the memory from the hash map. This lead to the following exception: ``` java.io.IOException: Could not acquire 65536 bytes of memory at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.initializeForWriting(UnsafeExternalSorter.java:169) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.spill(UnsafeExternalSorter.java:220) at org.apache.spark.sql.execution.UnsafeKVExternalSorter.<init>(UnsafeKVExternalSorter.java:126) at org.apache.spark.sql.execution.UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter(UnsafeFixedWidthAggregationMap.java:257) at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.switchToSortBasedAggregation(TungstenAggregationIterator.scala:435) ``` Author: Andrew Or <andrew@databricks.com> Closes #8827 from andrewor14/allocate-pointer-array.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java14
1 files changed, 12 insertions, 2 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index fc364e0a89..14b6aafdea 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,7 +159,7 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
- private void initializeForWriting() throws IOException {
+ public void initializeForWriting() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
final long pointerArrayMemory =
UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
@@ -187,6 +187,14 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
+ spill(true);
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ * @param shouldInitializeForWriting whether to allocate memory for writing after the spill
+ */
+ public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -217,7 +225,9 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- initializeForWriting();
+ if (shouldInitializeForWriting) {
+ initializeForWriting();
+ }
}
/**