aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorXin Wu <xinwu@us.ibm.com>2016-09-14 21:14:29 +0200
committerHerman van Hovell <hvanhovell@databricks.com>2016-09-14 21:14:29 +0200
commit040e46979d5f90edc7f9be3cbedd87e8986e8053 (patch)
tree65b9ac14a2ddf54a1f0e8d3c251645fb4ea23273 /core
parenta79838bdeeb12cec4d50da3948bd8a33777e53a6 (diff)
downloadspark-040e46979d5f90edc7f9be3cbedd87e8986e8053.tar.gz
spark-040e46979d5f90edc7f9be3cbedd87e8986e8053.tar.bz2
spark-040e46979d5f90edc7f9be3cbedd87e8986e8053.zip
[SPARK-10747][SQL] Support NULLS FIRST|LAST clause in ORDER BY
## What changes were proposed in this pull request? Currently, ORDER BY clause returns nulls value according to sorting order (ASC|DESC), considering null value is always smaller than non-null values. However, SQL2003 standard support NULLS FIRST or NULLS LAST to allow users to specify whether null values should be returned first or last, regardless of sorting order (ASC|DESC). This PR is to support this new feature. ## How was this patch tested? New test cases are added to test NULLS FIRST|LAST for regular select queries and windowing queries. (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Author: Xin Wu <xinwu@us.ibm.com> Closes #14842 from xwu0226/SPARK-10747.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java58
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java11
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala27
3 files changed, 81 insertions, 15 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
index c44630fbbc..116c84943e 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/PrefixComparators.java
@@ -29,12 +29,23 @@ public class PrefixComparators {
public static final PrefixComparator STRING = new UnsignedPrefixComparator();
public static final PrefixComparator STRING_DESC = new UnsignedPrefixComparatorDesc();
+ public static final PrefixComparator STRING_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+ public static final PrefixComparator STRING_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
+
public static final PrefixComparator BINARY = new UnsignedPrefixComparator();
public static final PrefixComparator BINARY_DESC = new UnsignedPrefixComparatorDesc();
+ public static final PrefixComparator BINARY_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+ public static final PrefixComparator BINARY_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
+
public static final PrefixComparator LONG = new SignedPrefixComparator();
public static final PrefixComparator LONG_DESC = new SignedPrefixComparatorDesc();
+ public static final PrefixComparator LONG_NULLS_LAST = new SignedPrefixComparatorNullsLast();
+ public static final PrefixComparator LONG_DESC_NULLS_FIRST = new SignedPrefixComparatorDescNullsFirst();
+
public static final PrefixComparator DOUBLE = new UnsignedPrefixComparator();
public static final PrefixComparator DOUBLE_DESC = new UnsignedPrefixComparatorDesc();
+ public static final PrefixComparator DOUBLE_NULLS_LAST = new UnsignedPrefixComparatorNullsLast();
+ public static final PrefixComparator DOUBLE_DESC_NULLS_FIRST = new UnsignedPrefixComparatorDescNullsFirst();
public static final class StringPrefixComparator {
public static long computePrefix(UTF8String value) {
@@ -74,6 +85,9 @@ public class PrefixComparators {
/** @return Whether the sort should take into account the sign bit. */
public abstract boolean sortSigned();
+
+ /** @return Whether the sort should put nulls first or last. */
+ public abstract boolean nullsFirst();
}
//
@@ -83,16 +97,34 @@ public class PrefixComparators {
public static final class UnsignedPrefixComparator extends RadixSortSupport {
@Override public boolean sortDescending() { return false; }
@Override public boolean sortSigned() { return false; }
- @Override
+ @Override public boolean nullsFirst() { return true; }
+ public int compare(long aPrefix, long bPrefix) {
+ return UnsignedLongs.compare(aPrefix, bPrefix);
+ }
+ }
+
+ public static final class UnsignedPrefixComparatorNullsLast extends RadixSortSupport {
+ @Override public boolean sortDescending() { return false; }
+ @Override public boolean sortSigned() { return false; }
+ @Override public boolean nullsFirst() { return false; }
public int compare(long aPrefix, long bPrefix) {
return UnsignedLongs.compare(aPrefix, bPrefix);
}
}
+ public static final class UnsignedPrefixComparatorDescNullsFirst extends RadixSortSupport {
+ @Override public boolean sortDescending() { return true; }
+ @Override public boolean sortSigned() { return false; }
+ @Override public boolean nullsFirst() { return true; }
+ public int compare(long bPrefix, long aPrefix) {
+ return UnsignedLongs.compare(aPrefix, bPrefix);
+ }
+ }
+
public static final class UnsignedPrefixComparatorDesc extends RadixSortSupport {
@Override public boolean sortDescending() { return true; }
@Override public boolean sortSigned() { return false; }
- @Override
+ @Override public boolean nullsFirst() { return false; }
public int compare(long bPrefix, long aPrefix) {
return UnsignedLongs.compare(aPrefix, bPrefix);
}
@@ -101,16 +133,34 @@ public class PrefixComparators {
public static final class SignedPrefixComparator extends RadixSortSupport {
@Override public boolean sortDescending() { return false; }
@Override public boolean sortSigned() { return true; }
- @Override
+ @Override public boolean nullsFirst() { return true; }
+ public int compare(long a, long b) {
+ return (a < b) ? -1 : (a > b) ? 1 : 0;
+ }
+ }
+
+ public static final class SignedPrefixComparatorNullsLast extends RadixSortSupport {
+ @Override public boolean sortDescending() { return false; }
+ @Override public boolean sortSigned() { return true; }
+ @Override public boolean nullsFirst() { return false; }
public int compare(long a, long b) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}
}
+ public static final class SignedPrefixComparatorDescNullsFirst extends RadixSortSupport {
+ @Override public boolean sortDescending() { return true; }
+ @Override public boolean sortSigned() { return true; }
+ @Override public boolean nullsFirst() { return true; }
+ public int compare(long b, long a) {
+ return (a < b) ? -1 : (a > b) ? 1 : 0;
+ }
+ }
+
public static final class SignedPrefixComparatorDesc extends RadixSortSupport {
@Override public boolean sortDescending() { return true; }
@Override public boolean sortSigned() { return true; }
- @Override
+ @Override public boolean nullsFirst() { return false; }
public int compare(long b, long a) {
return (a < b) ? -1 : (a > b) ? 1 : 0;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 30d0f3006a..be382955c0 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -333,17 +333,18 @@ public final class UnsafeInMemorySorter {
if (nullBoundaryPos > 0) {
assert radixSortSupport != null : "Nulls are only stored separately with radix sort";
LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
- if (radixSortSupport.sortDescending()) {
- // Nulls are smaller than non-nulls
- queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
+
+ // The null order is either LAST or FIRST, regardless of sorting direction (ASC|DESC)
+ if (radixSortSupport.nullsFirst()) {
queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
+ queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
} else {
- queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
queue.add(new SortedIterator((pos - nullBoundaryPos) / 2, offset));
+ queue.add(new SortedIterator(nullBoundaryPos / 2, 0));
}
return new UnsafeExternalSorter.ChainedIterator(queue);
} else {
return new SortedIterator(pos / 2, offset);
}
}
-}
+} \ No newline at end of file
diff --git a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
index 2c13806410..366ffda778 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/unsafe/sort/RadixSortSuite.scala
@@ -40,23 +40,38 @@ class RadixSortSuite extends SparkFunSuite with Logging {
case class RadixSortType(
name: String,
referenceComparator: PrefixComparator,
- startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean)
+ startByteIdx: Int, endByteIdx: Int, descending: Boolean, signed: Boolean, nullsFirst: Boolean)
val SORT_TYPES_TO_TEST = Seq(
- RadixSortType("unsigned binary data asc", PrefixComparators.BINARY, 0, 7, false, false),
- RadixSortType("unsigned binary data desc", PrefixComparators.BINARY_DESC, 0, 7, true, false),
- RadixSortType("twos complement asc", PrefixComparators.LONG, 0, 7, false, true),
- RadixSortType("twos complement desc", PrefixComparators.LONG_DESC, 0, 7, true, true),
+ RadixSortType("unsigned binary data asc nulls first",
+ PrefixComparators.BINARY, 0, 7, false, false, true),
+ RadixSortType("unsigned binary data asc nulls last",
+ PrefixComparators.BINARY_NULLS_LAST, 0, 7, false, false, false),
+ RadixSortType("unsigned binary data desc nulls last",
+ PrefixComparators.BINARY_DESC_NULLS_FIRST, 0, 7, true, false, false),
+ RadixSortType("unsigned binary data desc nulls first",
+ PrefixComparators.BINARY_DESC, 0, 7, true, false, true),
+
+ RadixSortType("twos complement asc nulls first",
+ PrefixComparators.LONG, 0, 7, false, true, true),
+ RadixSortType("twos complement asc nulls last",
+ PrefixComparators.LONG_NULLS_LAST, 0, 7, false, true, false),
+ RadixSortType("twos complement desc nulls last",
+ PrefixComparators.LONG_DESC, 0, 7, true, true, false),
+ RadixSortType("twos complement desc nulls first",
+ PrefixComparators.LONG_DESC_NULLS_FIRST, 0, 7, true, true, true),
+
RadixSortType(
"binary data partial",
new PrefixComparators.RadixSortSupport {
override def sortDescending = false
override def sortSigned = false
+ override def nullsFirst = true
override def compare(a: Long, b: Long): Int = {
return PrefixComparators.BINARY.compare(a & 0xffffff0000L, b & 0xffffff0000L)
}
},
- 2, 4, false, false))
+ 2, 4, false, false, true))
private def generateTestData(size: Int, rand: => Long): (Array[JLong], LongArray) = {
val ref = Array.tabulate[Long](size) { i => rand }