From 83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 Mon Sep 17 00:00:00 2001 From: Andrew Or Date: Wed, 23 Sep 2015 19:34:31 -0700 Subject: [SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array (round 2) This patch reverts most of the changes in a previous fix #8827. The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](https://github.com/apache/spark/blob/a18208047f06a4244703c17023bb20cbe1f59d73/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L88)) Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff. Author: Andrew Or Closes #8888 from andrewor14/dont-track-pointer-array. --- .../org/apache/spark/sql/execution/UnsafeKVExternalSorter.java | 9 ++------- .../sql/execution/UnsafeFixedWidthAggregationMapSuite.scala | 8 ++------ 2 files changed, 4 insertions(+), 13 deletions(-) (limited to 'sql') diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b81f67a16b..9df5780e4f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -85,7 +85,7 @@ public final class UnsafeKVExternalSorter { // We will use the number of elements in the map as the initialSize of the // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize, // we will use 1 as its initial size if the map is empty. - // TODO: track pointer array memory used by this in-memory sorter! + // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474) final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements())); @@ -124,13 +124,8 @@ public final class UnsafeKVExternalSorter { pageSizeBytes, inMemSorter); - // Note: This spill doesn't actually release any memory, so if we try to allocate a new - // pointer array immediately after the spill then we may fail to acquire sufficient space - // for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after* - // we have actually freed memory from our map. - sorter.spill(false /* initialize for writing */); + sorter.spill(); map.free(); - sorter.initializeForWriting(); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index ada4d42f99..1739798a24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter. - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === - initialMemoryConsumption + 4096 * 16) + assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. @@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter. - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === - initialMemoryConsumption + 4096 * 16) + assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. -- cgit v1.2.3