aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-02-08 12:08:58 -0800
committerJosh Rosen <joshrosen@databricks.com>2016-02-08 12:09:20 -0800
commit37bc203c8dd5022cb11d53b697c28a737ee85bcc (patch)
treefa6613dad1fd01764db879299142e7c9ffb8c595
parent8e4d15f70713e1aaaa96dfb3ea4ccc5bb08eb2ce (diff)
downloadspark-37bc203c8dd5022cb11d53b697c28a737ee85bcc.tar.gz
spark-37bc203c8dd5022cb11d53b697c28a737ee85bcc.tar.bz2
spark-37bc203c8dd5022cb11d53b697c28a737ee85bcc.zip
[SPARK-13210][SQL] catch OOM when allocate memory and expand array
There is a bug when we try to grow the buffer, OOM is ignore wrongly (the assert also skipped by JVM), then we try grow the array again, this one will trigger spilling free the current page, the current record we inserted will be invalid. The root cause is that JVM has less free memory than MemoryManager thought, it will OOM when allocate a page without trigger spilling. We should catch the OOM, and acquire memory again to trigger spilling. And also, we could not grow the array in `insertRecord` of `InMemorySorter` (it was there just for easy testing). Author: Davies Liu <davies@databricks.com> Closes #11095 from davies/fix_expand.
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java23
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java10
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java2
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java3
7 files changed, 35 insertions, 21 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index d2a88864f7..8757dff36f 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -112,6 +112,11 @@ public class TaskMemoryManager {
private final HashSet<MemoryConsumer> consumers;
/**
+ * The amount of memory that is acquired but not used.
+ */
+ private long acquiredButNotUsed = 0L;
+
+ /**
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
@@ -256,7 +261,20 @@ public class TaskMemoryManager {
}
allocatedPages.set(pageNumber);
}
- final MemoryBlock page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+ MemoryBlock page = null;
+ try {
+ page = memoryManager.tungstenMemoryAllocator().allocate(acquired);
+ } catch (OutOfMemoryError e) {
+ logger.warn("Failed to allocate a page ({} bytes), try again.", acquired);
+ // there is no enough memory actually, it means the actual free memory is smaller than
+ // MemoryManager thought, we should keep the acquired memory.
+ acquiredButNotUsed += acquired;
+ synchronized (this) {
+ allocatedPages.clear(pageNumber);
+ }
+ // this could trigger spilling to free some pages.
+ return allocatePage(size, consumer);
+ }
page.pageNumber = pageNumber;
pageTable[pageNumber] = page;
if (logger.isTraceEnabled()) {
@@ -378,6 +396,9 @@ public class TaskMemoryManager {
}
Arrays.fill(pageTable, null);
+ // release the memory that is not used by any consumer.
+ memoryManager.releaseExecutionMemory(acquiredButNotUsed, taskAttemptId, tungstenMemoryMode);
+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 2c84de5bf2..f97e76d7ed 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -320,15 +320,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
- LongArray array;
- try {
- // could trigger spilling
- array = allocateArray(used / 8 * 2);
- } catch (OutOfMemoryError e) {
- // should have trigger spilling
- assert(inMemSorter.hasSpaceForAnotherRecord());
- return;
- }
+ LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
index 58ad88e1ed..d74602cd20 100644
--- a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
@@ -104,7 +104,7 @@ final class ShuffleInMemorySorter {
*/
public void insertRecord(long recordPointer, int partitionId) {
if (!hasSpaceForAnotherRecord()) {
- expandPointerArray(consumer.allocateArray(array.size() * 2));
+ throw new IllegalStateException("There is no space for new record");
}
array.set(pos, PackedRecordPointer.packPointer(recordPointer, partitionId));
pos++;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index a6edc1ad3f..296bf722fc 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -293,15 +293,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
assert(inMemSorter != null);
if (!inMemSorter.hasSpaceForAnotherRecord()) {
long used = inMemSorter.getMemoryUsage();
- LongArray array;
- try {
- // could trigger spilling
- array = allocateArray(used / 8 * 2);
- } catch (OutOfMemoryError e) {
- // should have trigger spilling
- assert(inMemSorter.hasSpaceForAnotherRecord());
- return;
- }
+ LongArray array = allocateArray(used / 8 * 2);
// check if spilling is triggered or not
if (inMemSorter.hasSpaceForAnotherRecord()) {
freeArray(array);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index d1b0bc5d11..cea0f0a0c6 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -164,7 +164,7 @@ public final class UnsafeInMemorySorter {
*/
public void insertRecord(long recordPointer, long keyPrefix) {
if (!hasSpaceForAnotherRecord()) {
- expandPointerArray(consumer.allocateArray(array.size() * 2));
+ throw new IllegalStateException("There is no space for new record");
}
array.set(pos, recordPointer);
pos++;
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 0328e63e45..eb1da8e1b4 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -75,6 +75,9 @@ public class ShuffleInMemorySorterSuite {
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ }
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes("utf-8");
Platform.putInt(baseObject, position, strBytes.length);
@@ -114,6 +117,9 @@ public class ShuffleInMemorySorterSuite {
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ }
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 93efd033eb..8e557ec0ab 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -111,6 +111,9 @@ public class UnsafeInMemorySorterSuite {
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
+ if (!sorter.hasSpaceForAnotherRecord()) {
+ sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
+ }
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);
final long address = memoryManager.encodePageNumberAndOffset(dataPage, position);