diff options
author | Sital Kedia <skedia@fb.com> | 2016-06-25 09:13:39 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2016-06-25 09:13:39 +0100 |
commit | bf665a958631125a1670504ef5966ef1a0e14798 (patch) | |
tree | 4bdefeed5732c74e577c272bb9d2651cc990dcce /core | |
parent | a3c7b4187bad00dad87df7e3b5929a44d29568ed (diff) | |
download | spark-bf665a958631125a1670504ef5966ef1a0e14798.tar.gz spark-bf665a958631125a1670504ef5966ef1a0e14798.tar.bz2 spark-bf665a958631125a1670504ef5966ef1a0e14798.zip |
[SPARK-15958] Make initial buffer size for the Sorter configurable
## What changes were proposed in this pull request?
Currently the initial buffer size in the sorter is hard coded inside the code and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable.
## How was this patch tested?
Tested by running a job on the cluster.
Author: Sital Kedia <skedia@fb.com>
Closes #13699 from sitalkedia/config_sort_buffer_upstream.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java | 7 | ||||
-rw-r--r-- | core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java | 4 |
2 files changed, 7 insertions, 4 deletions
diff --git a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java index daa63d47e6..05fa04c44d 100644 --- a/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriter.java @@ -61,7 +61,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { private static final ClassTag<Object> OBJECT_CLASS_TAG = ClassTag$.MODULE$.Object(); @VisibleForTesting - static final int INITIAL_SORT_BUFFER_SIZE = 4096; + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; private final BlockManager blockManager; private final IndexShuffleBlockResolver shuffleBlockResolver; @@ -74,6 +74,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { private final TaskContext taskContext; private final SparkConf sparkConf; private final boolean transferToEnabled; + private final int initialSortBufferSize; @Nullable private MapStatus mapStatus; @Nullable private ShuffleExternalSorter sorter; @@ -122,6 +123,8 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { this.taskContext = taskContext; this.sparkConf = sparkConf; this.transferToEnabled = sparkConf.getBoolean("spark.file.transferTo", true); + this.initialSortBufferSize = sparkConf.getInt("spark.shuffle.sort.initialBufferSize", + DEFAULT_INITIAL_SORT_BUFFER_SIZE); open(); } @@ -187,7 +190,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { memoryManager, blockManager, taskContext, - INITIAL_SORT_BUFFER_SIZE, + initialSortBufferSize, partitioner.numPartitions(), sparkConf, writeMetrics); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 7dd61f85ab..daeb4675ea 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -413,10 +413,10 @@ public class UnsafeShuffleWriterSuite { } private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { - memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16); + memoryManager.limit(UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); - for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) { + for (int i = 0; i < UnsafeShuffleWriter.DEFAULT_INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2<Object, Object>(i, i)); } writer.write(dataToWrite.iterator()); |