aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-09-23 19:34:31 -0700
committerAndrew Or <andrew@databricks.com>2015-09-23 19:34:31 -0700
commit83f6f54d12a418f5158ee7ee985b54eef8cc1cf0 (patch)
tree2267010df342a512c68a11ee63f5dd2bf655d7f0 /core
parent084e4e126211d74a79e8dbd2d0e604dd3c650822 (diff)
downloadspark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.gz
spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.tar.bz2
spark-83f6f54d12a418f5158ee7ee985b54eef8cc1cf0.zip
[SPARK-10474] [SQL] Aggregation fails to allocate memory for pointer array (round 2)
This patch reverts most of the changes in a previous fix #8827. The real cause of the issue is that in `TungstenAggregate`'s prepare method we only reserve 1 page, but later when we switch to sort-based aggregation we try to acquire 1 page AND a pointer array. The longer-term fix should be to reserve also the pointer array, but for now ***we will simply not track the pointer array***. (Note that elsewhere we already don't track the pointer array, e.g. [here](https://github.com/apache/spark/blob/a18208047f06a4244703c17023bb20cbe1f59d73/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java#L88)) Note: This patch reuses the unit test added in #8827 so it doesn't show up in the diff. Author: Andrew Or <andrew@databricks.com> Closes #8888 from andrewor14/dont-track-pointer-array.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java51
1 files changed, 12 insertions, 39 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 14b6aafdea..0a311d2d93 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -159,16 +159,15 @@ public final class UnsafeExternalSorter {
/**
* Allocates new sort data structures. Called when creating the sorter and after each spill.
*/
- public void initializeForWriting() throws IOException {
+ private void initializeForWriting() throws IOException {
+ // Note: Do not track memory for the pointer array for now because of SPARK-10474.
+ // In more detail, in TungstenAggregate we only reserve a page, but when we fall back to
+ // sort-based aggregation we try to acquire a page AND a pointer array, which inevitably
+ // fails if all other memory is already occupied. It should be safe to not track the array
+ // because its memory footprint is frequently much smaller than that of a page. This is a
+ // temporary hack that we should address in 1.6.0.
+ // TODO: track the pointer array memory!
this.writeMetrics = new ShuffleWriteMetrics();
- final long pointerArrayMemory =
- UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory);
- if (memoryAcquired != pointerArrayMemory) {
- shuffleMemoryManager.release(memoryAcquired);
- throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory");
- }
-
this.inMemSorter =
new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
this.isInMemSorterExternal = false;
@@ -187,14 +186,6 @@ public final class UnsafeExternalSorter {
* Sort and spill the current records in response to memory pressure.
*/
public void spill() throws IOException {
- spill(true);
- }
-
- /**
- * Sort and spill the current records in response to memory pressure.
- * @param shouldInitializeForWriting whether to allocate memory for writing after the spill
- */
- public void spill(boolean shouldInitializeForWriting) throws IOException {
assert(inMemSorter != null);
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -225,9 +216,7 @@ public final class UnsafeExternalSorter {
// written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
- if (shouldInitializeForWriting) {
- initializeForWriting();
- }
+ initializeForWriting();
}
/**
@@ -275,14 +264,7 @@ public final class UnsafeExternalSorter {
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
- if (inMemSorter != null) {
- if (!isInMemSorterExternal) {
- long sorterMemoryUsage = inMemSorter.getMemoryUsage();
- memoryFreed += sorterMemoryUsage;
- shuffleMemoryManager.release(sorterMemoryUsage);
- }
- inMemSorter = null;
- }
+ // TODO: track in-memory sorter memory usage (SPARK-10474)
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
@@ -320,17 +302,8 @@ public final class UnsafeExternalSorter {
private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
- logger.debug("Attempting to expand sort pointer array");
- final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
- final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
- if (memoryAcquired < memoryToGrowPointerArray) {
- shuffleMemoryManager.release(memoryAcquired);
- spill();
- } else {
- inMemSorter.expandPointerArray();
- shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
- }
+ // TODO: track the pointer array memory! (SPARK-10474)
+ inMemSorter.expandPointerArray();
}
}