diff options
Diffstat (limited to 'core/src')
3 files changed, 81 insertions, 15 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java index c44630fbbc..116c84943e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java @@ -29,12 +29,23 @@ public class PrefixComparators { public static final PrefixComparator STRING = new UnsignedPrefixComparator(); public static final PrefixComparator STRING_DESC = new UnsignedPrefixComparatorDesc(); + public static final PrefixComparator STRING_NULLS_LAST = new UnsignedPrefixComparatorNullsLast(); + public static final PrefixComparator STRING_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst(); + public static final PrefixComparator BINARY = new UnsignedPrefixComparator(); public static final PrefixComparator BINARY_DESC = new UnsignedPrefixComparatorDesc(); + public static final PrefixComparator BINARY_NULLS_LAST = new UnsignedPrefixComparatorNullsLast(); + public static final PrefixComparator BINARY_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst(); + public static final PrefixComparator LONG = new SignedPrefixComparator(); public static final PrefixComparator LONG_DESC = new SignedPrefixComparatorDesc(); + public static final PrefixComparator LONG_NULLS_LAST = new SignedPrefixComparatorNullsLast(); + public static final PrefixComparator LONG_DESC_NULLS_FIRST = new SignedPrefixComparatorDescNullsFirst(); + public static final PrefixComparator DOUBLE = new UnsignedPrefixComparator(); public static final PrefixComparator DOUBLE_DESC = new UnsignedPrefixComparatorDesc(); + public static final PrefixComparator DOUBLE_NULLS_LAST = new UnsignedPrefixComparatorNullsLast(); + public static final PrefixComparator DOUBLE_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst(); public static final class StringPrefixComparator { public static long computePrefix(UTF8String value) { @@ -74,6 +85,9 @@ public class PrefixComparators { /** @return Whether the sort should take into account the sign bit. */ public abstract boolean sortSigned(); + + /** @return Whether the sort should put nulls first or last. */ + public abstract boolean nullsFirst(); } // @@ -83,16 +97,34 @@ public class PrefixComparators { public static final class UnsignedPrefixComparator extends RadixSortSupport { @Override public boolean sortDescending() { return false; } @Override public boolean sortSigned() { return false; } - @Override + @Override public boolean nullsFirst() { return true; } + public int compare(long aPrefix, long bPrefix) { + return UnsignedLongs.compare(aPrefix, bPrefix); + } + } + + public static final class UnsignedPrefixComparatorNullsLast extends RadixSortSupport { + @Override public boolean sortDescending() { return false; } + @Override public boolean sortSigned() { return false; } + @Override public boolean nullsFirst() { return false; } public int compare(long aPrefix, long bPrefix) { return UnsignedLongs.compare(aPrefix, bPrefix); } } + public static final class UnsignedPrefixComparatorDescNullsFirst extends RadixSortSupport { + @Override public boolean sortDescending() { return true; } + @Override public boolean sortSigned() { return false; } + @Override public boolean nullsFirst() { return true; } + public int compare(long bPrefix, long aPrefix) { + return UnsignedLongs.compare(aPrefix, bPrefix); + } + } + public static final class UnsignedPrefixComparatorDesc extends RadixSortSupport { @Override public boolean sortDescending() { return true; } @Override public boolean sortSigned() { return false; } - @Override + @Override public boolean nullsFirst() { return false; } public int compare(long bPrefix, long aPrefix) { return UnsignedLongs.compare(aPrefix, bPrefix); } @@ -101,16 +133,34 @@ public class PrefixComparators { public static final class SignedPrefixComparator extends RadixSortSupport { @Override public boolean sortDescending() { return false; } @Override public boolean sortSigned() { return true; } - @Override + @Override public boolean nullsFirst() { return true; } + public int compare(long a, long b) { + return (a < b) ? -1 : (a > b) ? 1 : 0; + } + } + + public static final class SignedPrefixComparatorNullsLast extends RadixSortSupport { + @Override public boolean sortDescending() { return false; } + @Override public boolean sortSigned() { return true; } + @Override public boolean nullsFirst() { return false; } public int compare(long a, long b) { return (a < b) ? -1 : (a > b) ? 1 : 0; } } + public static final class SignedPrefixComparatorDescNullsFirst extends RadixSortSupport { + @Override public boolean sortDescending() { return true; } + @Override public boolean sortSigned() { return true; } + @Override public boolean nullsFirst() { return true; } + public int compare(long b, long a) { + return (a < b) ? -1 : (a > b) ? 1 : 0; + } + } + public static final class SignedPrefixComparatorDesc extends RadixSortSupport { @Override public boolean sortDescending() { return true; } @Override public boolean sortSigned() { return true; } - @Override + @Override public boolean nullsFirst() { return false; } public int compare(long b, long a) { return (a < b) ? -1 : (a > b) ? 1 : 0; } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index 30d0f3006a..be382955c0 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -333,17 +333,18 @@ public final class UnsafeInMemorySorter { if (nullBoundaryPos > 0) { assert radixSortSupport != null : "Nulls are only stored separately with radix sort"; LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); - if (radixSortSupport.sortDescending()) { - // Nulls are smaller than non-nulls - queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset)); + + // The null order is either LAST or FIRST, regardless of sorting direction (ASC|DESC) + if (radixSortSupport.nullsFirst()) { queue.add(new SortedIterator(nullBoundaryPos / 2, 0)); + queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset)); } else { - queue.add(new SortedIterator(nullBoundaryPos / 2, 0)); queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset)); + queue.add(new SortedIterator(nullBoundaryPos / 2, 0)); } return new UnsafeExternalSorter.ChainedIterator(queue); } else { return new SortedIterator(pos / 2, offset); } } -} +}
\ No newline at end of file diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala index 2c13806410..366ffda778 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala @@ -40,23 +40,38 @@ class RadixSortSuite extends SparkFunSuite with Logging { case class RadixSortType( name: String, referenceComparator: PrefixComparator, - startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean) + startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean, nullsFirst: Boolean) val SORT_TYPES_TO_TEST = Seq( - RadixSortType("unsigned binary data asc", PrefixComparators.BINARY, 0, 7, false, false), - RadixSortType("unsigned binary data desc", PrefixComparators.BINARY_DESC, 0, 7, true, false), - RadixSortType("twos complement asc", PrefixComparators.LONG, 0, 7, false, true), - RadixSortType("twos complement desc", PrefixComparators.LONG_DESC, 0, 7, true, true), + RadixSortType("unsigned binary data asc nulls first", + PrefixComparators.BINARY, 0, 7, false, false, true), + RadixSortType("unsigned binary data asc nulls last", + PrefixComparators.BINARY_NULLS_LAST, 0, 7, false, false, false), + RadixSortType("unsigned binary data desc nulls last", + PrefixComparators.BINARY_DESC_NULLS_FIRST, 0, 7, true, false, false), + RadixSortType("unsigned binary data desc nulls first", + PrefixComparators.BINARY_DESC, 0, 7, true, false, true), + + RadixSortType("twos complement asc nulls first", + PrefixComparators.LONG, 0, 7, false, true, true), + RadixSortType("twos complement asc nulls last", + PrefixComparators.LONG_NULLS_LAST, 0, 7, false, true, false), + RadixSortType("twos complement desc nulls last", + PrefixComparators.LONG_DESC, 0, 7, true, true, false), + RadixSortType("twos complement desc nulls first", + PrefixComparators.LONG_DESC_NULLS_FIRST, 0, 7, true, true, true), + RadixSortType( "binary data partial", new PrefixComparators.RadixSortSupport { override def sortDescending = false override def sortSigned = false + override def nullsFirst = true override def compare(a: Long, b: Long): Int = { return PrefixComparators.BINARY.compare(a & 0xffffff0000L, b & 0xffffff0000L) } }, - 2, 4, false, false)) + 2, 4, false, false, true)) private def generateTestData(size: Int, rand: => Long): (Array[JLong], LongArray) = { val ref = Array.tabulate[Long](size) { i => rand } |