aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-04-21 16:48:51 -0700
committerDavies Liu <davies.liu@gmail.com>2016-04-21 16:48:51 -0700
commite2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5 (patch)
tree4ba34de912caf16f572a42c9cc033aeb2bb9bb4a /core/src/test/java
parent6d1e4c4a65541cbf78284005de1776dc49efa9f4 (diff)
downloadspark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.tar.gz
spark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.tar.bz2
spark-e2b5647ab92eb478b3f7b36a0ce6faf83e24c0e5.zip
[SPARK-14724] Use radix sort for shuffles and sort operator when possible
## What changes were proposed in this pull request? Spark currently uses TimSort for all in-memory sorts, including sorts done for shuffle. One low-hanging fruit is to use radix sort when possible (e.g. sorting by integer keys). This PR adds a radix sort implementation to the unsafe sort package and switches shuffles and sorts to use it when possible. The current implementation does not have special support for null values, so we cannot radix-sort `LongType`. I will address this in a follow-up PR. ## How was this patch tested? Unit tests, enabling radix sort on existing tests. Microbenchmark results: ``` Running benchmark: radix sort 25000000 Java HotSpot(TM) 64-Bit Server VM 1.8.0_66-b17 on Linux 3.13.0-44-generic Intel(R) Core(TM) i7-4600U CPU 2.10GHz radix sort 25000000: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------- reference TimSort key prefix array 15546 / 15859 1.6 621.9 1.0X reference Arrays.sort 2416 / 2446 10.3 96.6 6.4X radix sort one byte 133 / 137 188.4 5.3 117.2X radix sort two bytes 255 / 258 98.2 10.2 61.1X radix sort eight bytes 991 / 997 25.2 39.6 15.7X radix sort key prefix array 1540 / 1563 16.2 61.6 10.1X ``` I also ran a mix of the supported TPCDS queries and compared TimSort vs RadixSort metrics. The overall benchmark ran ~10% faster with radix sort on. In the breakdown below, the radix-enabled sort phases averaged about 20x faster than TimSort, however sorting is only a small fraction of the overall runtime. About half of the TPCDS queries were able to take advantage of radix sort. ``` TPCDS on master: 2499s real time, 8185s executor - 1171s in TimSort, avg 267 MB/s (note the /s accounting is weird here since dataSize counts the record sizes too) TPCDS with radix enabled: 2294s real time, 7391s executor - 596s in TimSort, avg 254 MB/s - 26s in radix sort, avg 4.2 GB/s ``` cc davies rxin Author: Eric Liang <ekl@databricks.com> Closes #12490 from ericl/sort-benchmark.
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java16
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java16
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java17
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java18
7 files changed, 111 insertions, 25 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java
new file mode 100644
index 0000000000..6927d0a815
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+public class ShuffleInMemoryRadixSorterSuite extends ShuffleInMemorySorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 4cd3600df1..43e32f073a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.sort;
+import java.lang.Long;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
@@ -34,6 +35,8 @@ import org.apache.spark.unsafe.memory.MemoryBlock;
public class ShuffleInMemorySorterSuite {
+ protected boolean shouldUseRadixSort() { return false; }
+
final TestMemoryManager memoryManager =
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
@@ -47,7 +50,8 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 100);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
+ consumer, 100, shouldUseRadixSort());
final ShuffleInMemorySorter.ShuffleSorterIterator iter = sorter.getSortedIterator();
Assert.assertFalse(iter.hasNext());
}
@@ -70,14 +74,16 @@ public class ShuffleInMemorySorterSuite {
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
+ consumer, 4, shouldUseRadixSort());
final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ sorter.expandPointerArray(
+ consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
@@ -114,12 +120,12 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingManyNumbers() throws Exception {
- ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
+ ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4, shouldUseRadixSort());
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ sorter.expandPointerArray(consumer.allocateArray(sorter.getMemoryUsage() / 8 * 2));
}
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index fbaaa1cf49..f9dc20d8b7 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -392,7 +392,20 @@ public class UnsafeShuffleWriterSuite {
}
@Test
- public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
+ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception {
+ conf.set("spark.shuffle.sort.useRadixSort", "false");
+ writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
+ assertEquals(2, spillFilesCreated.size());
+ }
+
+ @Test
+ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() throws Exception {
+ conf.set("spark.shuffle.sort.useRadixSort", "true");
+ writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
+ assertEquals(3, spillFilesCreated.size());
+ }
+
+ private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
@@ -400,7 +413,6 @@ public class UnsafeShuffleWriterSuite {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
- assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java
new file mode 100644
index 0000000000..bb38305a07
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection.unsafe.sort;
+
+public class UnsafeExternalSorterRadixSortSuite extends UnsafeExternalSorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index a2253d8559..60a40cc172 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -64,12 +64,7 @@ public class UnsafeExternalSorterSuite {
new JavaSerializer(new SparkConf()),
new SparkConf().set("spark.shuffle.spill.compress", "false"));
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
- final PrefixComparator prefixComparator = new PrefixComparator() {
- @Override
- public int compare(long prefix1, long prefix2) {
- return (int) prefix1 - (int) prefix2;
- }
- };
+ final PrefixComparator prefixComparator = PrefixComparators.LONG;
// Since the key fits within the 8-byte prefix, we don't need to do any record comparison, so
// use a dummy comparator
final RecordComparator recordComparator = new RecordComparator() {
@@ -88,6 +83,7 @@ public class UnsafeExternalSorterSuite {
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
+ protected boolean shouldUseRadixSort() { return false; }
private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m");
@@ -178,7 +174,8 @@ public class UnsafeExternalSorterSuite {
recordComparator,
prefixComparator,
/* initialSize */ 1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
}
@Test
@@ -381,7 +378,8 @@ public class UnsafeExternalSorterSuite {
null,
null,
/* initialSize */ 1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
int n = (int) pageSizeBytes / recordSize * 3;
@@ -416,7 +414,8 @@ public class UnsafeExternalSorterSuite {
recordComparator,
prefixComparator,
1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
// Peak memory should be monotonically increasing. More specifically, every time
// we allocate a new page it should increase by exactly the size of the page.
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java
new file mode 100644
index 0000000000..ae69ededf7
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection.unsafe.sort;
+
+public class UnsafeInMemorySorterRadixSortSuite extends UnsafeInMemorySorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index f90214fffd..23f4abfed2 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection.unsafe.sort;
+import java.lang.Long;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -39,6 +40,8 @@ import static org.mockito.Mockito.mock;
public class UnsafeInMemorySorterSuite {
+ protected boolean shouldUseRadixSort() { return false; }
+
private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) {
final byte[] strBytes = new byte[length];
Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length);
@@ -54,7 +57,8 @@ public class UnsafeInMemorySorterSuite {
memoryManager,
mock(RecordComparator.class),
mock(PrefixComparator.class),
- 100);
+ 100,
+ shouldUseRadixSort());
final UnsafeSorterIterator iter = sorter.getSortedIterator();
Assert.assertFalse(iter.hasNext());
}
@@ -102,19 +106,15 @@ public class UnsafeInMemorySorterSuite {
// Compute key prefixes based on the records' partition ids
final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
- final PrefixComparator prefixComparator = new PrefixComparator() {
- @Override
- public int compare(long prefix1, long prefix2) {
- return (int) prefix1 - (int) prefix2;
- }
- };
+ final PrefixComparator prefixComparator = PrefixComparators.LONG;
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
- recordComparator, prefixComparator, dataToSort.length);
+ recordComparator, prefixComparator, dataToSort.length, shouldUseRadixSort());
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
+ sorter.expandPointerArray(
+ consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2));
}
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);