diff options
author | Eric Liang <ekl@databricks.com> | 2016-06-11 15:42:58 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2016-06-11 15:42:58 -0700 |
commit | c06c58bbbb2de0c22cfc70c486d23a94c3079ba4 (patch) | |
tree | 2d7b99a05f88c5e90ad5b18898447defb53fbb20 /core/src/test/java | |
parent | 75705e8dbb51ac91ffc7012fa67f072494c13832 (diff) | |
download | spark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.tar.gz spark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.tar.bz2 spark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.zip |
[SPARK-14851][CORE] Support radix sort with nullable longs
## What changes were proposed in this pull request?
This adds support for radix sort of nullable long fields. When a sort field is null and radix sort is enabled, we keep nulls in a separate region of the sort buffer so that radix sort does not need to deal with them. This also has performance benefits when sorting smaller integer types, since the current representation of nulls in two's complement (Long.MIN_VALUE) otherwise forces a full-width radix sort.
This strategy for nulls does mean the sort is no longer stable. cc davies
## How was this patch tested?
Existing randomized sort tests for correctness. I also tested some TPCDS queries and there does not seem to be any significant regression for non-null sorts.
Some test queries (best of 5 runs each).
Before change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190437233227987
res3: Double = 4716.471091
After change:
scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6
start: Long = 3190367870952791
res4: Double = 2981.143045
Author: Eric Liang <ekl@databricks.com>
Closes #13161 from ericl/sc-2998.
Diffstat (limited to 'core/src/test/java')
2 files changed, 14 insertions, 14 deletions
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 2cae4beb4c..bce958c3dc 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -156,14 +156,14 @@ public class UnsafeExternalSorterSuite { private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception { final int[] arr = new int[]{ value }; - sorter.insertRecord(arr, Platform.INT_ARRAY_OFFSET, 4, value); + sorter.insertRecord(arr, Platform.INT_ARRAY_OFFSET, 4, value, false); } private static void insertRecord( UnsafeExternalSorter sorter, int[] record, long prefix) throws IOException { - sorter.insertRecord(record, Platform.INT_ARRAY_OFFSET, record.length * 4, prefix); + sorter.insertRecord(record, Platform.INT_ARRAY_OFFSET, record.length * 4, prefix, false); } private UnsafeExternalSorter newSorter() throws IOException { @@ -206,13 +206,13 @@ public class UnsafeExternalSorterSuite { @Test public void testSortingEmptyArrays() throws Exception { final UnsafeExternalSorter sorter = newSorter(); - sorter.insertRecord(null, 0, 0, 0); - sorter.insertRecord(null, 0, 0, 0); + sorter.insertRecord(null, 0, 0, 0, false); + sorter.insertRecord(null, 0, 0, 0, false); sorter.spill(); - sorter.insertRecord(null, 0, 0, 0); + sorter.insertRecord(null, 0, 0, 0, false); sorter.spill(); - sorter.insertRecord(null, 0, 0, 0); - sorter.insertRecord(null, 0, 0, 0); + sorter.insertRecord(null, 0, 0, 0, false); + sorter.insertRecord(null, 0, 0, 0, false); UnsafeSorterIterator iter = sorter.getSortedIterator(); @@ -232,7 +232,7 @@ public class UnsafeExternalSorterSuite { long prevSortTime = sorter.getSortTimeNanos(); assertEquals(prevSortTime, 0); - sorter.insertRecord(null, 0, 0, 0); + sorter.insertRecord(null, 0, 0, 0, false); sorter.spill(); assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); prevSortTime = sorter.getSortTimeNanos(); @@ -240,7 +240,7 @@ public class UnsafeExternalSorterSuite { sorter.spill(); // no sort needed assertEquals(sorter.getSortTimeNanos(), prevSortTime); - sorter.insertRecord(null, 0, 0, 0); + sorter.insertRecord(null, 0, 0, 0, false); UnsafeSorterIterator iter = sorter.getSortedIterator(); assertThat(sorter.getSortTimeNanos(), greaterThan(prevSortTime)); } @@ -280,7 +280,7 @@ public class UnsafeExternalSorterSuite { final UnsafeExternalSorter sorter = newSorter(); byte[] record = new byte[16]; while (sorter.getNumberOfAllocatedPages() < 2) { - sorter.insertRecord(record, Platform.BYTE_ARRAY_OFFSET, record.length, 0); + sorter.insertRecord(record, Platform.BYTE_ARRAY_OFFSET, record.length, 0, false); } sorter.cleanupResources(); assertSpillFilesWereCleanedUp(); @@ -340,7 +340,7 @@ public class UnsafeExternalSorterSuite { int n = (int) pageSizeBytes / recordSize * 3; for (int i = 0; i < n; i++) { record[0] = (long) i; - sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0); + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false); } assertTrue(sorter.getNumberOfAllocatedPages() >= 2); UnsafeExternalSorter.SpillableIterator iter = @@ -372,7 +372,7 @@ public class UnsafeExternalSorterSuite { int n = (int) pageSizeBytes / recordSize * 3; for (int i = 0; i < n; i++) { record[0] = (long) i; - sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0); + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false); } assertTrue(sorter.getNumberOfAllocatedPages() >= 2); UnsafeExternalSorter.SpillableIterator iter = @@ -406,7 +406,7 @@ public class UnsafeExternalSorterSuite { int batch = n / 4; for (int i = 0; i < n; i++) { record[0] = (long) i; - sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0); + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0, false); if (i % batch == batch - 1) { sorter.spill(); } diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 383c5b3b08..bd89085aa9 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -120,7 +120,7 @@ public class UnsafeInMemorySorterSuite { final long address = memoryManager.encodePageNumberAndOffset(dataPage, position); final String str = getStringFromDataPage(baseObject, position + 4, recordLength); final int partitionId = hashPartitioner.getPartition(str); - sorter.insertRecord(address, partitionId); + sorter.insertRecord(address, partitionId, false); position += 4 + recordLength; } final UnsafeSorterIterator iter = sorter.getSortedIterator(); |