aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java23
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java2
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java3
7 files changed, 35 insertions, 21 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index d2a88864f7..8757dff36f 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -112,6 +112,11 @@ public class TaskMemoryManager {
private final HashSet<MemoryConsumer> consumers;
/**
+ * The amount of memory that is acquired but not used.
+ */
+ private long acquiredButNotUsed = 0L;
+
+ /**
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
@@ -256,7 +261,20 @@ public class TaskMemoryManager {
}
allocatedPages.set(pageNumber);
}
- final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+ MemoryBlock page = null;
+ try {
+ page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+ } catch (OutOfMemoryError e) {
+ logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
+ // there is no enough memory actually, it means the actual free memory is smaller than
+ // MemoryManager thought, we should keep the acquired memory.
+ acquiredButNotUsed += acquired;
+ synchronized (this) {
+ allocatedPages.clear(pageNumber);
+ }
+ // this could trigger spilling to free some pages.
+ return allocatePage(size, consumer);
+ }
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
@@ -378,6 +396,9 @@ public class TaskMemoryManager {
}
Arrays.fill(pageTable, null);
+ // release the memory that is not used by any consumer.
+ memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 2c84de5bf2..f97e76d7ed 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -320,15 +320,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
- LongArray array;
- try {
- // could trigger spilling
- array = allocateArray(used / 8 * 2);
- } catch (OutOfMemoryError e) {
- // should have trigger spilling
- assert(inMemSorter.hasSpaceForAnotherRecord());
- return;
- }
+ LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 58ad88e1ed..d74602cd20 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -104,7 +104,7 @@ final class ShuffleInMemorySorter {
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
- expandPointerArray(consumer.allocateArray(array.size() * 2));
+ throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6edc1ad3f..296bf722fc 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -293,15 +293,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
- LongArray array;
- try {
- // could trigger spilling
- array = allocateArray(used / 8 * 2);
- } catch (OutOfMemoryError e) {
- // should have trigger spilling
- assert(inMemSorter.hasSpaceForAnotherRecord());
- return;
- }
+ LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index d1b0bc5d11..cea0f0a0c6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -164,7 +164,7 @@ public final class UnsafeInMemorySorter {
*/
public void insertRecord(long recordPointer, long keyPrefix) {
if (!hasSpaceForAnotherRecord()) {
- expandPointerArray(consumer.allocateArray(array.size() * 2));
+ throw new IllegalStateException("There is no space for new record");
}
array.set(pos, recordPointer);
pos++;
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 0328e63e45..eb1da8e1b4 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -75,6 +75,9 @@ public class ShuffleInMemorySorterSuite {
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ }
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes("utf-8");
Platform.putInt(baseObject, position, strBytes.length);
@@ -114,6 +117,9 @@ public class ShuffleInMemorySorterSuite {
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ }
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 93efd033eb..8e557ec0ab 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -111,6 +111,9 @@ public class UnsafeInMemorySorterSuite {
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
+ }
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);