diff options
Diffstat (limited to 'core/src/test/java')
7 files changed, 111 insertions, 25 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java new file mode 100644 index 0000000000..6927d0a815 --- /dev/null +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.shuffle.sort; + +public class ShuffleInMemoryRadixSorterSuite extends ShuffleInMemorySorterSuite { + @Override + protected boolean shouldUseRadixSort() { return true; } +} diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index 4cd3600df1..43e32f073a 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.shuffle.sort; +import java.lang.Long; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Random; @@ -34,6 +35,8 @@ import org.apache.spark.unsafe.memory.MemoryBlock; public class ShuffleInMemorySorterSuite { + protected boolean shouldUseRadixSort() { return false; } + final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); @@ -47,7 +50,8 @@ public class ShuffleInMemorySorterSuite { @Test public void testSortingEmptyInput() { - final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 100); + final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter( + consumer, 100, shouldUseRadixSort()); final ShuffleInMemorySorter.ShuffleSorterIterator iter = sorter.getSortedIterator(); Assert.assertFalse(iter.hasNext()); } @@ -70,14 +74,16 @@ public class ShuffleInMemorySorterSuite { new TaskMemoryManager(new TestMemoryManager(conf), 0); final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); final Object baseObject = dataPage.getBaseObject(); - final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4); + final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter( + consumer, 4, shouldUseRadixSort()); final HashPartitioner hashPartitioner = new HashPartitioner(4); // Write the records into the data page and store pointers into the sorter long position = dataPage.getBaseOffset(); for (String str : dataToSort) { if (!sorter.hasSpaceForAnotherRecord()) { - sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2)); + sorter.expandPointerArray( + consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2)); } final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position); final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8); @@ -114,12 +120,12 @@ public class ShuffleInMemorySorterSuite { @Test public void testSortingManyNumbers() throws Exception { - ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4); + ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4, shouldUseRadixSort()); int[] numbersToSort = new int[128000]; Random random = new Random(16); for (int i = 0; i < numbersToSort.length; i++) { if (!sorter.hasSpaceForAnotherRecord()) { - sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2)); + sorter.expandPointerArray(consumer.allocateArray(sorter.getMemoryUsage() / 8 * 2)); } numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1); sorter.insertRecord(0, numbersToSort[i]); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index fbaaa1cf49..f9dc20d8b7 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -392,7 +392,20 @@ public class UnsafeShuffleWriterSuite { } @Test - public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { + public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception { + conf.set("spark.shuffle.sort.useRadixSort", "false"); + writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); + assertEquals(2, spillFilesCreated.size()); + } + + @Test + public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() throws Exception { + conf.set("spark.shuffle.sort.useRadixSort", "true"); + writeEnoughRecordsToTriggerSortBufferExpansionAndSpill(); + assertEquals(3, spillFilesCreated.size()); + } + + private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception { memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); @@ -400,7 +413,6 @@ public class UnsafeShuffleWriterSuite { dataToWrite.add(new Tuple2<Object, Object>(i, i)); } writer.write(dataToWrite.iterator()); - assertEquals(2, spillFilesCreated.size()); writer.stop(true); readRecordsFromFile(); assertSpillFilesWereCleanedUp(); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java new file mode 100644 index 0000000000..bb38305a07 --- /dev/null +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection.unsafe.sort; + +public class UnsafeExternalSorterRadixSortSuite extends UnsafeExternalSorterSuite { + @Override + protected boolean shouldUseRadixSort() { return true; } +} diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index a2253d8559..60a40cc172 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -64,12 +64,7 @@ public class UnsafeExternalSorterSuite { new JavaSerializer(new SparkConf()), new SparkConf().set("spark.shuffle.spill.compress", "false")); // Use integer comparison for comparing prefixes (which are partition ids, in this case) - final PrefixComparator prefixComparator = new PrefixComparator() { - @Override - public int compare(long prefix1, long prefix2) { - return (int) prefix1 - (int) prefix2; - } - }; + final PrefixComparator prefixComparator = PrefixComparators.LONG; // Since the key fits within the 8-byte prefix, we don't need to do any record comparison, so // use a dummy comparator final RecordComparator recordComparator = new RecordComparator() { @@ -88,6 +83,7 @@ public class UnsafeExternalSorterSuite { @Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager; @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; + protected boolean shouldUseRadixSort() { return false; } private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m"); @@ -178,7 +174,8 @@ public class UnsafeExternalSorterSuite { recordComparator, prefixComparator, /* initialSize */ 1024, - pageSizeBytes); + pageSizeBytes, + shouldUseRadixSort()); } @Test @@ -381,7 +378,8 @@ public class UnsafeExternalSorterSuite { null, null, /* initialSize */ 1024, - pageSizeBytes); + pageSizeBytes, + shouldUseRadixSort()); long[] record = new long[100]; int recordSize = record.length * 8; int n = (int) pageSizeBytes / recordSize * 3; @@ -416,7 +414,8 @@ public class UnsafeExternalSorterSuite { recordComparator, prefixComparator, 1024, - pageSizeBytes); + pageSizeBytes, + shouldUseRadixSort()); // Peak memory should be monotonically increasing. More specifically, every time // we allocate a new page it should increase by exactly the size of the page. diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java new file mode 100644 index 0000000000..ae69ededf7 --- /dev/null +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.collection.unsafe.sort; + +public class UnsafeInMemorySorterRadixSortSuite extends UnsafeInMemorySorterSuite { + @Override + protected boolean shouldUseRadixSort() { return true; } +} diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index f90214fffd..23f4abfed2 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -17,6 +17,7 @@ package org.apache.spark.util.collection.unsafe.sort; +import java.lang.Long; import java.nio.charset.StandardCharsets; import java.util.Arrays; @@ -39,6 +40,8 @@ import static org.mockito.Mockito.mock; public class UnsafeInMemorySorterSuite { + protected boolean shouldUseRadixSort() { return false; } + private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) { final byte[] strBytes = new byte[length]; Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length); @@ -54,7 +57,8 @@ public class UnsafeInMemorySorterSuite { memoryManager, mock(RecordComparator.class), mock(PrefixComparator.class), - 100); + 100, + shouldUseRadixSort()); final UnsafeSorterIterator iter = sorter.getSortedIterator(); Assert.assertFalse(iter.hasNext()); } @@ -102,19 +106,15 @@ public class UnsafeInMemorySorterSuite { // Compute key prefixes based on the records' partition ids final HashPartitioner hashPartitioner = new HashPartitioner(4); // Use integer comparison for comparing prefixes (which are partition ids, in this case) - final PrefixComparator prefixComparator = new PrefixComparator() { - @Override - public int compare(long prefix1, long prefix2) { - return (int) prefix1 - (int) prefix2; - } - }; + final PrefixComparator prefixComparator = PrefixComparators.LONG; UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager, - recordComparator, prefixComparator, dataToSort.length); + recordComparator, prefixComparator, dataToSort.length, shouldUseRadixSort()); // Given a page of records, insert those records into the sorter one-by-one: position = dataPage.getBaseOffset(); for (int i = 0; i < dataToSort.length; i++) { if (!sorter.hasSpaceForAnotherRecord()) { - sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2)); + sorter.expandPointerArray( + consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2)); } // position now points to the start of a record (which holds its length). final int recordLength = Platform.getInt(baseObject, position); |