aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java30
1 files changed, 16 insertions, 14 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 80b03d7e99..c21990f4e4 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
- private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
- @VisibleForTesting
- static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
-
+ private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
@@ -91,6 +88,7 @@ public final class UnsafeExternalSorter {
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
}
@@ -147,7 +145,11 @@ public final class UnsafeExternalSorter {
}
private long getMemoryUsage() {
- return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
+ long totalPageSize = 0;
+ for (MemoryBlock page : allocatedPages) {
+ totalPageSize += page.size();
+ }
+ return sorter.getMemoryUsage() + totalPageSize;
}
@VisibleForTesting
@@ -214,23 +216,23 @@ public final class UnsafeExternalSorter {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
- if (requiredSpace > PAGE_SIZE) {
+ if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- PAGE_SIZE + ")");
+ pageSizeBytes + ")");
} else {
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquired < PAGE_SIZE) {
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
- final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
- throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
+ throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
- currentPage = memoryManager.allocatePage(PAGE_SIZE);
+ currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = PAGE_SIZE;
+ freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}