aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-07-29 16:00:30 -0700
committerReynold Xin <rxin@databricks.com>2015-07-29 16:00:30 -0700
commit1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536 (patch)
tree900a7c0fb8296e20d5d10914ab63d6a61805a6da /core
parentb715933fc69a49653abdb2fba0818dfc4f35d358 (diff)
downloadspark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.gz
spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.tar.bz2
spark-1b0099fc62d02ff6216a76fbfe17a4ec5b2f3536.zip
[SPARK-9411] [SQL] Make Tungsten page sizes configurable
We need to make page sizes configurable so we can reduce them in unit tests and increase them in real production workloads. These sizes are now controlled by a new configuration, `spark.buffer.pageSize`. The new default is 64 megabytes. Author: Josh Rosen <joshrosen@databricks.com> Closes #7741 from JoshRosen/SPARK-9411 and squashes the following commits: a43c4db [Josh Rosen] Fix pow 2c0eefc [Josh Rosen] Fix MAXIMUM_PAGE_SIZE_BYTES comment + value bccfb51 [Josh Rosen] Lower page size to 4MB in TestHive ba54d4b [Josh Rosen] Make UnsafeExternalSorter's page size configurable 0045aa2 [Josh Rosen] Make UnsafeShuffle's page size configurable bc734f0 [Josh Rosen] Rename configuration e614858 [Josh Rosen] Makes BytesToBytesMap page size configurable
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java35
-rw-r--r--core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java5
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java30
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java6
4 files changed, 45 insertions, 31 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
index 1d460432be..1aa6ba4201 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleExternalSorter.java
@@ -59,14 +59,14 @@ final class UnsafeShuffleExternalSorter {
private final Logger logger = LoggerFactory.getLogger(UnsafeShuffleExternalSorter.class);
- private static final int PAGE_SIZE = PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@VisibleForTesting
static final int DISK_WRITE_BUFFER_SIZE = 1024 * 1024;
- @VisibleForTesting
- static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
private final int initialSize;
private final int numPartitions;
+ private final int pageSizeBytes;
+ @VisibleForTesting
+ final int maxRecordSizeBytes;
private final TaskMemoryManager memoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
@@ -109,7 +109,10 @@ final class UnsafeShuffleExternalSorter {
this.numPartitions = numPartitions;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility if no units are provided
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
-
+ this.pageSizeBytes = (int) Math.min(
+ PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES,
+ conf.getSizeAsBytes("spark.buffer.pageSize", "64m"));
+ this.maxRecordSizeBytes = pageSizeBytes - 4;
this.writeMetrics = writeMetrics;
initializeForWriting();
}
@@ -272,7 +275,11 @@ final class UnsafeShuffleExternalSorter {
}
private long getMemoryUsage() {
- return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
+ long totalPageSize = 0;
+ for (MemoryBlock page : allocatedPages) {
+ totalPageSize += page.size();
+ }
+ return sorter.getMemoryUsage() + totalPageSize;
}
private long freeMemory() {
@@ -346,23 +353,23 @@ final class UnsafeShuffleExternalSorter {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
- if (requiredSpace > PAGE_SIZE) {
+ if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- PAGE_SIZE + ")");
+ pageSizeBytes + ")");
} else {
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquired < PAGE_SIZE) {
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
- final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
- throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
+ throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
- currentPage = memoryManager.allocatePage(PAGE_SIZE);
+ currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = PAGE_SIZE;
+ freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
index 764578b181..d47d6fc9c2 100644
--- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
+++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java
@@ -129,6 +129,11 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> {
open();
}
+ @VisibleForTesting
+ public int maxRecordSizeBytes() {
+ return sorter.maxRecordSizeBytes;
+ }
+
/**
* This convenience method should only be called in test code.
*/
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 80b03d7e99..c21990f4e4 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -41,10 +41,7 @@ public final class UnsafeExternalSorter {
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
- private static final int PAGE_SIZE = 1 << 27; // 128 megabytes
- @VisibleForTesting
- static final int MAX_RECORD_SIZE = PAGE_SIZE - 4;
-
+ private final long pageSizeBytes;
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
@@ -91,6 +88,7 @@ public final class UnsafeExternalSorter {
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
initializeForWriting();
}
@@ -147,7 +145,11 @@ public final class UnsafeExternalSorter {
}
private long getMemoryUsage() {
- return sorter.getMemoryUsage() + (allocatedPages.size() * (long) PAGE_SIZE);
+ long totalPageSize = 0;
+ for (MemoryBlock page : allocatedPages) {
+ totalPageSize += page.size();
+ }
+ return sorter.getMemoryUsage() + totalPageSize;
}
@VisibleForTesting
@@ -214,23 +216,23 @@ public final class UnsafeExternalSorter {
// TODO: we should track metrics on the amount of space wasted when we roll over to a new page
// without using the free space at the end of the current page. We should also do this for
// BytesToBytesMap.
- if (requiredSpace > PAGE_SIZE) {
+ if (requiredSpace > pageSizeBytes) {
throw new IOException("Required space " + requiredSpace + " is greater than page size (" +
- PAGE_SIZE + ")");
+ pageSizeBytes + ")");
} else {
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquired < PAGE_SIZE) {
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquired < pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquired);
spill();
- final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(PAGE_SIZE);
- if (memoryAcquiredAfterSpilling != PAGE_SIZE) {
+ final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
+ if (memoryAcquiredAfterSpilling != pageSizeBytes) {
shuffleMemoryManager.release(memoryAcquiredAfterSpilling);
- throw new IOException("Unable to acquire " + PAGE_SIZE + " bytes of memory");
+ throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
- currentPage = memoryManager.allocatePage(PAGE_SIZE);
+ currentPage = memoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
- freeSpaceInCurrentPage = PAGE_SIZE;
+ freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 10c3eedbf4..04fc09b323 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -111,7 +111,7 @@ public class UnsafeShuffleWriterSuite {
mergedOutputFile = File.createTempFile("mergedoutput", "", tempDir);
partitionSizesInMergedFile = null;
spillFilesCreated.clear();
- conf = new SparkConf();
+ conf = new SparkConf().set("spark.buffer.pageSize", "128m");
taskMetrics = new TaskMetrics();
when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
@@ -512,12 +512,12 @@ public class UnsafeShuffleWriterSuite {
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[1], new byte[1]));
writer.forceSorterToSpill();
// We should be able to write a record that's right _at_ the max record size
- final byte[] atMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE];
+ final byte[] atMaxRecordSize = new byte[writer.maxRecordSizeBytes()];
new Random(42).nextBytes(atMaxRecordSize);
writer.insertRecordIntoSorter(new Tuple2<Object, Object>(new byte[0], atMaxRecordSize));
writer.forceSorterToSpill();
// Inserting a record that's larger than the max record size should fail:
- final byte[] exceedsMaxRecordSize = new byte[UnsafeShuffleExternalSorter.MAX_RECORD_SIZE + 1];
+ final byte[] exceedsMaxRecordSize = new byte[writer.maxRecordSizeBytes() + 1];
new Random(42).nextBytes(exceedsMaxRecordSize);
Product2<Object, Object> hugeRecord =
new Tuple2<Object, Object>(new byte[0], exceedsMaxRecordSize);