aboutsummaryrefslogtreecommitdiff
path: root/sql/catalyst/src/main/java
diff options
context:
space:
mode:
authorEric Liang <ekl@databricks.com>2016-06-11 15:42:58 -0700
committerReynold Xin <rxin@databricks.com>2016-06-11 15:42:58 -0700
commitc06c58bbbb2de0c22cfc70c486d23a94c3079ba4 (patch)
tree2d7b99a05f88c5e90ad5b18898447defb53fbb20 /sql/catalyst/src/main/java
parent75705e8dbb51ac91ffc7012fa67f072494c13832 (diff)
downloadspark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.tar.gz
spark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.tar.bz2
spark-c06c58bbbb2de0c22cfc70c486d23a94c3079ba4.zip
[SPARK-14851][CORE] Support radix sort with nullable longs
## What changes were proposed in this pull request? This adds support for radix sort of nullable long fields. When a sort field is null and radix sort is enabled, we keep nulls in a separate region of the sort buffer so that radix sort does not need to deal with them. This also has performance benefits when sorting smaller integer types, since the current representation of nulls in two's complement (Long.MIN_VALUE) otherwise forces a full-width radix sort. This strategy for nulls does mean the sort is no longer stable. cc davies ## How was this patch tested? Existing randomized sort tests for correctness. I also tested some TPCDS queries and there does not seem to be any significant regression for non-null sorts. Some test queries (best of 5 runs each). Before change: scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6 start: Long = 3190437233227987 res3: Double = 4716.471091 After change: scala> val start = System.nanoTime; spark.range(5000000).selectExpr("if(id > 5, cast(hash(id) as long), NULL) as h").coalesce(1).orderBy("h").collect(); (System.nanoTime - start) / 1e6 start: Long = 3190367870952791 res4: Double = 2981.143045 Author: Eric Liang <ekl@databricks.com> Closes #13161 from ericl/sc-2998.
Diffstat (limited to 'sql/catalyst/src/main/java')
-rw-r--r--sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java20
1 files changed, 17 insertions, 3 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
index 37fbad47c1..ad76bf5a0a 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java
@@ -51,7 +51,20 @@ public final class UnsafeExternalRowSorter {
private final UnsafeExternalSorter sorter;
public abstract static class PrefixComputer {
- abstract long computePrefix(InternalRow row);
+
+ public static class Prefix {
+ /** Key prefix value, or the null prefix value if isNull = true. **/
+ long value;
+
+ /** Whether the key is null. */
+ boolean isNull;
+ }
+
+ /**
+ * Computes prefix for the given row. For efficiency, the returned object may be reused in
+ * further calls to a given PrefixComputer.
+ */
+ abstract Prefix computePrefix(InternalRow row);
}
public UnsafeExternalRowSorter(
@@ -88,12 +101,13 @@ public final class UnsafeExternalRowSorter {
}
public void insertRow(UnsafeRow row) throws IOException {
- final long prefix = prefixComputer.computePrefix(row);
+ final PrefixComputer.Prefix prefix = prefixComputer.computePrefix(row);
sorter.insertRecord(
row.getBaseObject(),
row.getBaseOffset(),
row.getSizeInBytes(),
- prefix
+ prefix.value,
+ prefix.isNull
);
numRowsInserted++;
if (testSpillFrequency > 0 && (numRowsInserted % testSpillFrequency) == 0) {