aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorSital Kedia <skedia@fb.com>2016-08-19 11:27:30 -0700
committerDavies Liu <davies.liu@gmail.com>2016-08-19 11:27:30 -0700
commitcf0cce90364d17afe780ff9a5426dfcefa298535 (patch)
tree4826d14efc1b7c57242b9a62f8f7c097c6b514c7 /core/src
parent071eaaf9d2b63589f2e66e5279a16a5a484de6f5 (diff)
downloadspark-cf0cce90364d17afe780ff9a5426dfcefa298535.tar.gz
spark-cf0cce90364d17afe780ff9a5426dfcefa298535.tar.bz2
spark-cf0cce90364d17afe780ff9a5426dfcefa298535.zip
[SPARK-17113] [SHUFFLE] Job failure due to Executor OOM in offheap mode
## What changes were proposed in this pull request? This PR fixes executor OOM in offheap mode due to bug in Cooperative Memory Management for UnsafeExternSorter. UnsafeExternalSorter was checking if memory page is being used by upstream by comparing the base object address of the current page with the base object address of upstream. However, in case of offheap memory allocation, the base object addresses are always null, so there was no spilling happening and eventually the operator would OOM. Following is the stack trace this issue addresses - java.lang.OutOfMemoryError: Unable to acquire 1220 bytes of memory, got 0 at org.apache.spark.memory.MemoryConsumer.allocatePage(MemoryConsumer.java:120) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.acquireNewPageIfNecessary(UnsafeExternalSorter.java:341) at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertRecord(UnsafeExternalSorter.java:362) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.insertRow(UnsafeExternalRowSorter.java:93) at org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:170) ## How was this patch tested? Tested by running the failing job. Author: Sital Kedia <skedia@fb.com> Closes #14693 from sitalkedia/fix_offheap_oom.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java7
2 files changed, 8 insertions, 1 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 8d596f87d2..ccf76643db 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -522,7 +522,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
// is accessing the current record. We free this page in that caller's next loadNext()
// call.
for (MemoryBlock page : allocatedPages) {
- if (!loaded || page.getBaseObject() != upstream.getBaseObject()) {
+ if (!loaded || page.pageNumber != ((UnsafeInMemorySorter.SortedIterator)upstream).getCurrentPageNumber()) {
released += page.size();
freePage(page);
} else {
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 78da389278..30d0f3006a 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -248,6 +248,7 @@ public final class UnsafeInMemorySorter {
private long baseOffset;
private long keyPrefix;
private int recordLength;
+ private long currentPageNumber;
private SortedIterator(int numRecords, int offset) {
this.numRecords = numRecords;
@@ -262,6 +263,7 @@ public final class UnsafeInMemorySorter {
iter.baseOffset = baseOffset;
iter.keyPrefix = keyPrefix;
iter.recordLength = recordLength;
+ iter.currentPageNumber = currentPageNumber;
return iter;
}
@@ -279,6 +281,7 @@ public final class UnsafeInMemorySorter {
public void loadNext() {
// This pointer points to a 4-byte record length, followed by the record's bytes
final long recordPointer = array.get(offset + position);
+ currentPageNumber = memoryManager.decodePageNumber(recordPointer);
baseObject = memoryManager.getPage(recordPointer);
baseOffset = memoryManager.getOffsetInPage(recordPointer) + 4; // Skip over record length
recordLength = Platform.getInt(baseObject, baseOffset - 4);
@@ -292,6 +295,10 @@ public final class UnsafeInMemorySorter {
@Override
public long getBaseOffset() { return baseOffset; }
+ public long getCurrentPageNumber() {
+ return currentPageNumber;
+ }
+
@Override
public int getRecordLength() { return recordLength; }