From bf665a958631125a1670504ef5966ef1a0e14798 Mon Sep 17 00:00:00 2001 From: Sital Kedia Date: Sat, 25 Jun 2016 09:13:39 +0100 Subject: [SPARK-15958] Make initial buffer size for the Sorter configurable ## What changes were proposed in this pull request? Currently the initial buffer size in the sorter is hard coded inside the code and is too small for large workload. As a result, the sorter spends significant time expanding the buffer size and copying the data. It would be useful to have it configurable. ## How was this patch tested? Tested by running a job on the cluster. Author: Sital Kedia Closes #13699 from sitalkedia/config_sort_buffer_upstream. --- .../org/apache/spark/sql/execution/UnsafeExternalRowSorter.java | 4 +++- .../org/apache/spark/sql/execution/UnsafeKVExternalSorter.java | 7 +++++-- 2 files changed, 8 insertions(+), 3 deletions(-) (limited to 'sql') diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index ad76bf5a0a..0b177ad411 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -38,6 +38,7 @@ import org.apache.spark.util.collection.unsafe.sort.UnsafeSorterIterator; public final class UnsafeExternalRowSorter { + static final int DEFAULT_INITIAL_SORT_BUFFER_SIZE = 4096; /** * If positive, forces records to be spilled to disk at the given frequency (measured in numbers * of records). This is only intended to be used in tests. @@ -85,7 +86,8 @@ public final class UnsafeExternalRowSorter { taskContext, new RowComparator(ordering, schema.length()), prefixComparator, - /* initialSize */ 4096, + sparkEnv.conf().getInt("spark.shuffle.sort.initialBufferSize", + DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, canUseRadixSort ); diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java index 99fe51db68..b1cc523363 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/UnsafeKVExternalSorter.java @@ -22,6 +22,7 @@ import java.io.IOException; import com.google.common.annotations.VisibleForTesting; +import org.apache.spark.SparkEnv; import org.apache.spark.TaskContext; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializerManager; @@ -86,7 +87,8 @@ public final class UnsafeKVExternalSorter { taskContext, recordComparator, prefixComparator, - /* initialSize */ 4096, + SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", + UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, canUseRadixSort); } else { @@ -131,7 +133,8 @@ public final class UnsafeKVExternalSorter { taskContext, new KVComparator(ordering, keySchema.length()), prefixComparator, - /* initialSize */ 4096, + SparkEnv.get().conf().getInt("spark.shuffle.sort.initialBufferSize", + UnsafeExternalRowSorter.DEFAULT_INITIAL_SORT_BUFFER_SIZE), pageSizeBytes, inMemSorter); -- cgit v1.2.3