aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java16
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java16
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java17
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java23
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java18
7 files changed, 111 insertions, 25 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java
new file mode 100644
index 0000000000..6927d0a815
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemoryRadixSorterSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.shuffle.sort;
+
+public class ShuffleInMemoryRadixSorterSuite extends ShuffleInMemorySorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 4cd3600df1..43e32f073a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.shuffle.sort;
+import java.lang.Long;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Random;
@@ -34,6 +35,8 @@ import org.apache.spark.unsafe.memory.MemoryBlock;
public class ShuffleInMemorySorterSuite {
+ protected boolean shouldUseRadixSort() { return false; }
+
final TestMemoryManager memoryManager =
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
@@ -47,7 +50,8 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 100);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
+ consumer, 100, shouldUseRadixSort());
final ShuffleInMemorySorter.ShuffleSorterIterator iter = sorter.getSortedIterator();
Assert.assertFalse(iter.hasNext());
}
@@ -70,14 +74,16 @@ public class ShuffleInMemorySorterSuite {
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
- final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
+ final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
+ consumer, 4, shouldUseRadixSort());
final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Write the records into the data page and store pointers into the sorter
long position = dataPage.getBaseOffset();
for (String str : dataToSort) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ sorter.expandPointerArray(
+ consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2));
}
final long recordAddress = memoryManager.encodePageNumberAndOffset(dataPage, position);
final byte[] strBytes = str.getBytes(StandardCharsets.UTF_8);
@@ -114,12 +120,12 @@ public class ShuffleInMemorySorterSuite {
@Test
public void testSortingManyNumbers() throws Exception {
- ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4);
+ ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(consumer, 4, shouldUseRadixSort());
int[] numbersToSort = new int[128000];
Random random = new Random(16);
for (int i = 0; i < numbersToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2));
+ sorter.expandPointerArray(consumer.allocateArray(sorter.getMemoryUsage() / 8 * 2));
}
numbersToSort[i] = random.nextInt(PackedRecordPointer.MAXIMUM_PARTITION_ID + 1);
sorter.insertRecord(0, numbersToSort[i]);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index fbaaa1cf49..f9dc20d8b7 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -392,7 +392,20 @@ public class UnsafeShuffleWriterSuite {
}
@Test
- public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
+ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOff() throws Exception {
+ conf.set("spark.shuffle.sort.useRadixSort", "false");
+ writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
+ assertEquals(2, spillFilesCreated.size());
+ }
+
+ @Test
+ public void writeEnoughRecordsToTriggerSortBufferExpansionAndSpillRadixOn() throws Exception {
+ conf.set("spark.shuffle.sort.useRadixSort", "true");
+ writeEnoughRecordsToTriggerSortBufferExpansionAndSpill();
+ assertEquals(3, spillFilesCreated.size());
+ }
+
+ private void writeEnoughRecordsToTriggerSortBufferExpansionAndSpill() throws Exception {
memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
@@ -400,7 +413,6 @@ public class UnsafeShuffleWriterSuite {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
- assertEquals(2, spillFilesCreated.size());
writer.stop(true);
readRecordsFromFile();
assertSpillFilesWereCleanedUp();
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java
new file mode 100644
index 0000000000..bb38305a07
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterRadixSortSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection.unsafe.sort;
+
+public class UnsafeExternalSorterRadixSortSuite extends UnsafeExternalSorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index a2253d8559..60a40cc172 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -64,12 +64,7 @@ public class UnsafeExternalSorterSuite {
new JavaSerializer(new SparkConf()),
new SparkConf().set("spark.shuffle.spill.compress", "false"));
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
- final PrefixComparator prefixComparator = new PrefixComparator() {
- @Override
- public int compare(long prefix1, long prefix2) {
- return (int) prefix1 - (int) prefix2;
- }
- };
+ final PrefixComparator prefixComparator = PrefixComparators.LONG;
// Since the key fits within the 8-byte prefix, we don't need to do any record comparison, so
// use a dummy comparator
final RecordComparator recordComparator = new RecordComparator() {
@@ -88,6 +83,7 @@ public class UnsafeExternalSorterSuite {
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
+ protected boolean shouldUseRadixSort() { return false; }
private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m");
@@ -178,7 +174,8 @@ public class UnsafeExternalSorterSuite {
recordComparator,
prefixComparator,
/* initialSize */ 1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
}
@Test
@@ -381,7 +378,8 @@ public class UnsafeExternalSorterSuite {
null,
null,
/* initialSize */ 1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
long[] record = new long[100];
int recordSize = record.length * 8;
int n = (int) pageSizeBytes / recordSize * 3;
@@ -416,7 +414,8 @@ public class UnsafeExternalSorterSuite {
recordComparator,
prefixComparator,
1024,
- pageSizeBytes);
+ pageSizeBytes,
+ shouldUseRadixSort());
// Peak memory should be monotonically increasing. More specifically, every time
// we allocate a new page it should increase by exactly the size of the page.
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java
new file mode 100644
index 0000000000..ae69ededf7
--- /dev/null
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterRadixSortSuite.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.util.collection.unsafe.sort;
+
+public class UnsafeInMemorySorterRadixSortSuite extends UnsafeInMemorySorterSuite {
+ @Override
+ protected boolean shouldUseRadixSort() { return true; }
+}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index f90214fffd..23f4abfed2 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -17,6 +17,7 @@
package org.apache.spark.util.collection.unsafe.sort;
+import java.lang.Long;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
@@ -39,6 +40,8 @@ import static org.mockito.Mockito.mock;
public class UnsafeInMemorySorterSuite {
+ protected boolean shouldUseRadixSort() { return false; }
+
private static String getStringFromDataPage(Object baseObject, long baseOffset, int length) {
final byte[] strBytes = new byte[length];
Platform.copyMemory(baseObject, baseOffset, strBytes, Platform.BYTE_ARRAY_OFFSET, length);
@@ -54,7 +57,8 @@ public class UnsafeInMemorySorterSuite {
memoryManager,
mock(RecordComparator.class),
mock(PrefixComparator.class),
- 100);
+ 100,
+ shouldUseRadixSort());
final UnsafeSorterIterator iter = sorter.getSortedIterator();
Assert.assertFalse(iter.hasNext());
}
@@ -102,19 +106,15 @@ public class UnsafeInMemorySorterSuite {
// Compute key prefixes based on the records' partition ids
final HashPartitioner hashPartitioner = new HashPartitioner(4);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
- final PrefixComparator prefixComparator = new PrefixComparator() {
- @Override
- public int compare(long prefix1, long prefix2) {
- return (int) prefix1 - (int) prefix2;
- }
- };
+ final PrefixComparator prefixComparator = PrefixComparators.LONG;
UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer, memoryManager,
- recordComparator, prefixComparator, dataToSort.length);
+ recordComparator, prefixComparator, dataToSort.length, shouldUseRadixSort());
// Given a page of records, insert those records into the sorter one-by-one:
position = dataPage.getBaseOffset();
for (int i = 0; i < dataToSort.length; i++) {
if (!sorter.hasSpaceForAnotherRecord()) {
- sorter.expandPointerArray(consumer.allocateArray(sorter.numRecords() * 2 * 2));
+ sorter.expandPointerArray(
+ consumer.allocateArray(sorter.getMemoryUsage() / Long.BYTES * 2));
}
// position now points to the start of a record (which holds its length).
final int recordLength = Platform.getInt(baseObject, position);