diff options
author | Andrew Or <andrew@databricks.com> | 2015-09-23 19:34:31 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-09-23 19:34:31 -0700 |
commit | 83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (patch) | |
tree | 2267010df342a512c68a11ee63f5dd2bf655d7f0 | |
parent | 084e4e126211d74a79e8dbd2d0e604dd3c650822 (diff) | |
download | spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.gz spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.bz2 spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.zip |
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array (round 2)
This patch reverts most of the changes in a previous fix #8827.
The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](https://github.com/apache/spark/blob/a18208047f06a4244703c17023bb20cbe1f59d73/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L88))
Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff.
Author: Andrew Or <andrew@databricks.com>
Closes #8888 from andrewor14/dont-track-pointer-array.
3 files changed, 16 insertions, 52 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 14b6aafdea..0a311d2d93 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -159,16 +159,15 @@ public final class UnsafeExternalSorter { /** * Allocates new sort data structures. Called when creating the sorter and after each spill. */ - public void initializeForWriting() throws IOException { + private void initializeForWriting() throws IOException { + // Note: Do not track memory for the pointer array for now because of SPARK-10474. + // In more detail, in TungstenAggregate we only reserve a page, but when we fall back to + // sort-based aggregation we try to acquire a page AND a pointer array, which inevitably + // fails if all other memory is already occupied. It should be safe to not track the array + // because its memory footprint is frequently much smaller than that of a page. This is a + // temporary hack that we should address in 1.6.0. + // TODO: track the pointer array memory! this.writeMetrics = new ShuffleWriteMetrics(); - final long pointerArrayMemory = - UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize); - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory); - if (memoryAcquired != pointerArrayMemory) { - shuffleMemoryManager.release(memoryAcquired); - throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory"); - } - this.inMemSorter = new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize); this.isInMemSorterExternal = false; @@ -187,14 +186,6 @@ public final class UnsafeExternalSorter { * Sort and spill the current records in response to memory pressure. */ public void spill() throws IOException { - spill(true); - } - - /** - * Sort and spill the current records in response to memory pressure. - * @param shouldInitializeForWriting whether to allocate memory for writing after the spill - */ - public void spill(boolean shouldInitializeForWriting) throws IOException { assert(inMemSorter != null); logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)", Thread.currentThread().getId(), @@ -225,9 +216,7 @@ public final class UnsafeExternalSorter { // written to disk. This also counts the space needed to store the sorter's pointer array. taskContext.taskMetrics().incMemoryBytesSpilled(spillSize); - if (shouldInitializeForWriting) { - initializeForWriting(); - } + initializeForWriting(); } /** @@ -275,14 +264,7 @@ public final class UnsafeExternalSorter { shuffleMemoryManager.release(block.size()); memoryFreed += block.size(); } - if (inMemSorter != null) { - if (!isInMemSorterExternal) { - long sorterMemoryUsage = inMemSorter.getMemoryUsage(); - memoryFreed += sorterMemoryUsage; - shuffleMemoryManager.release(sorterMemoryUsage); - } - inMemSorter = null; - } + // TODO: track in-memory sorter memory usage (SPARK-10474) allocatedPages.clear(); currentPage = null; currentPagePosition = -1; @@ -320,17 +302,8 @@ public final class UnsafeExternalSorter { private void growPointerArrayIfNecessary() throws IOException { assert(inMemSorter != null); if (!inMemSorter.hasSpaceForAnotherRecord()) { - logger.debug("Attempting to expand sort pointer array"); - final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage(); - final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2; - final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray); - if (memoryAcquired < memoryToGrowPointerArray) { - shuffleMemoryManager.release(memoryAcquired); - spill(); - } else { - inMemSorter.expandPointerArray(); - shuffleMemoryManager.release(oldPointerArrayMemoryUsage); - } + // TODO: track the pointer array memory! (SPARK-10474) + inMemSorter.expandPointerArray(); } } diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index b81f67a16b..9df5780e4f 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -85,7 +85,7 @@ public final class UnsafeKVExternalSorter { // We will use the number of elements in the map as the initialSize of the // UnsafeInMemorySorter. Because UnsafeInMemorySorter does not accept 0 as the initialSize, // we will use 1 as its initial size if the map is empty. - // TODO: track pointer array memory used by this in-memory sorter! + // TODO: track pointer array memory used by this in-memory sorter! (SPARK-10474) final UnsafeInMemorySorter inMemSorter = new UnsafeInMemorySorter( taskMemoryManager, recordComparator, prefixComparator, Math.max(1, map.numElements())); @@ -124,13 +124,8 @@ public final class UnsafeKVExternalSorter { pageSizeBytes, inMemSorter); - // Note: This spill doesn't actually release any memory, so if we try to allocate a new - // pointer array immediately after the spill then we may fail to acquire sufficient space - // for it (SPARK-10474). For this reason, we must initialize for writing explicitly *after* - // we have actually freed memory from our map. - sorter.spill(false /* initialize for writing */); + sorter.spill(); map.free(); - sorter.initializeForWriting(); } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala index ada4d42f99..1739798a24 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala @@ -200,9 +200,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter. - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === - initialMemoryConsumption + 4096 * 16) + assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. @@ -305,9 +303,7 @@ class UnsafeFixedWidthAggregationMapSuite val sorter = map.destructAndCreateExternalSorter() withClue(s"destructAndCreateExternalSorter should release memory used by the map") { - // 4096 * 16 is the initial size allocated for the pointer/prefix array in the in-mem sorter. - assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === - initialMemoryConsumption + 4096 * 16) + assert(shuffleMemoryManager.getMemoryConsumptionForThisTask() === initialMemoryConsumption) } // Add more keys to the sorter and make sure the results come out sorted. |