aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-08-12 10:08:35 -0700
committerReynold Xin <rxin@databricks.com>2015-08-12 10:08:35 -0700
commite0110792ef71ebfd3727b970346a2e13695990a4 (patch)
treebf2a56847391ed0e2ead0a589d85871176b1ac3c /core
parent66d87c1d76bea2b81993156ac1fa7dad6c312ebf (diff)
downloadspark-e0110792ef71ebfd3727b970346a2e13695990a4.tar.gz
spark-e0110792ef71ebfd3727b970346a2e13695990a4.tar.bz2
spark-e0110792ef71ebfd3727b970346a2e13695990a4.zip
[SPARK-9747] [SQL] Avoid starving an unsafe operator in aggregation
This is the sister patch to #8011, but for aggregation. In a nutshell: create the `TungstenAggregationIterator` before computing the parent partition. Internally this creates a `BytesToBytesMap` which acquires a page in the constructor as of this patch. This ensures that the aggregation operator is not starved since we reserve at least 1 page in advance. rxin yhuai Author: Andrew Or <andrew@databricks.com> Closes #8038 from andrewor14/unsafe-starve-memory-agg.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java34
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java9
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java11
3 files changed, 39 insertions, 15 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 85b46ec8bf..87ed47e88c 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -193,6 +193,11 @@ public final class BytesToBytesMap {
TaskMemoryManager.MAXIMUM_PAGE_SIZE_BYTES);
}
allocate(initialCapacity);
+
+ // Acquire a new page as soon as we construct the map to ensure that we have at least
+ // one page to work with. Otherwise, other operators in the same task may starve this
+ // map (SPARK-9747).
+ acquireNewPage();
}
public BytesToBytesMap(
@@ -574,16 +579,9 @@ public final class BytesToBytesMap {
final long lengthOffsetInPage = currentDataPage.getBaseOffset() + pageCursor;
Platform.putInt(pageBaseObject, lengthOffsetInPage, END_OF_PAGE_MARKER);
}
- final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
- if (memoryGranted != pageSizeBytes) {
- shuffleMemoryManager.release(memoryGranted);
- logger.debug("Failed to acquire {} bytes of memory", pageSizeBytes);
+ if (!acquireNewPage()) {
return false;
}
- MemoryBlock newPage = taskMemoryManager.allocatePage(pageSizeBytes);
- dataPages.add(newPage);
- pageCursor = 0;
- currentDataPage = newPage;
dataPage = currentDataPage;
dataPageBaseObject = currentDataPage.getBaseObject();
dataPageInsertOffset = currentDataPage.getBaseOffset();
@@ -643,6 +641,24 @@ public final class BytesToBytesMap {
}
/**
+ * Acquire a new page from the {@link ShuffleMemoryManager}.
+ * @return whether there is enough space to allocate the new page.
+ */
+ private boolean acquireNewPage() {
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryGranted != pageSizeBytes) {
+ shuffleMemoryManager.release(memoryGranted);
+ logger.debug("Failed to acquire {} bytes of memory", pageSizeBytes);
+ return false;
+ }
+ MemoryBlock newPage = taskMemoryManager.allocatePage(pageSizeBytes);
+ dataPages.add(newPage);
+ pageCursor = 0;
+ currentDataPage = newPage;
+ return true;
+ }
+
+ /**
* Allocate new data structures for this map. When calling this outside of the constructor,
* make sure to keep references to the old data structures so that you can free them.
*
@@ -748,7 +764,7 @@ public final class BytesToBytesMap {
}
@VisibleForTesting
- int getNumDataPages() {
+ public int getNumDataPages() {
return dataPages.size();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 9601aafe55..fc364e0a89 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -132,16 +132,15 @@ public final class UnsafeExternalSorter {
if (existingInMemorySorter == null) {
initializeForWriting();
+ // Acquire a new page as soon as we construct the sorter to ensure that we have at
+ // least one page to work with. Otherwise, other operators in the same task may starve
+ // this sorter (SPARK-9709). We don't need to do this if we already have an existing sorter.
+ acquireNewPage();
} else {
this.isInMemSorterExternal = true;
this.inMemSorter = existingInMemorySorter;
}
- // Acquire a new page as soon as we construct the sorter to ensure that we have at
- // least one page to work with. Otherwise, other operators in the same task may starve
- // this sorter (SPARK-9709).
- acquireNewPage();
-
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
// does not fully consume the sorter's output (e.g. sort followed by limit).
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 1a79c20c35..ab480b60ad 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -543,7 +543,7 @@ public abstract class AbstractBytesToBytesMapSuite {
Platform.LONG_ARRAY_OFFSET,
8);
newPeakMemory = map.getPeakMemoryUsedBytes();
- if (i % numRecordsPerPage == 0) {
+ if (i % numRecordsPerPage == 0 && i > 0) {
// We allocated a new page for this record, so peak memory should change
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
@@ -561,4 +561,13 @@ public abstract class AbstractBytesToBytesMapSuite {
map.free();
}
}
+
+ @Test
+ public void testAcquirePageInConstructor() {
+ final BytesToBytesMap map = new BytesToBytesMap(
+ taskMemoryManager, shuffleMemoryManager, 1, PAGE_SIZE_BYTES);
+ assertEquals(1, map.getNumDataPages());
+ map.free();
+ }
+
}