diff options
author | Reynold Xin <rxin@databricks.com> | 2015-07-30 17:17:27 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-07-30 17:17:27 -0700 |
commit | e7a0976e991f75a7bda99509e2b040daab965ae6 (patch) | |
tree | 8a8197424593977086fca74b073a96bd52f5a89d /sql/catalyst/src/main/java | |
parent | df32669514afc0223ecdeca30fbfbe0b40baef3a (diff) | |
download | spark-e7a0976e991f75a7bda99509e2b040daab965ae6.tar.gz spark-e7a0976e991f75a7bda99509e2b040daab965ae6.tar.bz2 spark-e7a0976e991f75a7bda99509e2b040daab965ae6.zip |
[SPARK-9458][SPARK-9469][SQL] Code generate prefix computation in sorting & moves unsafe conversion out of TungstenSort.
Author: Reynold Xin <rxin@databricks.com>
Closes #7803 from rxin/SPARK-9458 and squashes the following commits:
5b032dc [Reynold Xin] Fix string.
b670dbb [Reynold Xin] [SPARK-9458][SPARK-9469][SQL] Code generate prefix computation in sorting & moves unsafe conversion out of TungstenSort.
Diffstat (limited to 'sql/catalyst/src/main/java')
-rw-r--r-- | sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java | 27 |
1 files changed, 12 insertions, 15 deletions
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java index 4c3f2c6557..68c49feae9 100644 --- a/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java +++ b/sql/catalyst/src/main/java/org/apache/spark/sql/execution/UnsafeExternalRowSorter.java @@ -48,7 +48,6 @@ final class UnsafeExternalRowSorter { private long numRowsInserted = 0; private final StructType schema; - private final UnsafeProjection unsafeProjection; private final PrefixComputer prefixComputer; private final UnsafeExternalSorter sorter; @@ -62,7 +61,6 @@ final class UnsafeExternalRowSorter { PrefixComparator prefixComparator, PrefixComputer prefixComputer) throws IOException { this.schema = schema; - this.unsafeProjection = UnsafeProjection.create(schema); this.prefixComputer = prefixComputer; final SparkEnv sparkEnv = SparkEnv.get(); final TaskContext taskContext = TaskContext.get(); @@ -88,13 +86,12 @@ final class UnsafeExternalRowSorter { } @VisibleForTesting - void insertRow(InternalRow row) throws IOException { - UnsafeRow unsafeRow = unsafeProjection.apply(row); + void insertRow(UnsafeRow row) throws IOException { final long prefix = prefixComputer.computePrefix(row); sorter.insertRecord( - unsafeRow.getBaseObject(), - unsafeRow.getBaseOffset(), - unsafeRow.getSizeInBytes(), + row.getBaseObject(), + row.getBaseOffset(), + row.getSizeInBytes(), prefix ); numRowsInserted++; @@ -113,7 +110,7 @@ final class UnsafeExternalRowSorter { } @VisibleForTesting - Iterator<InternalRow> sort() throws IOException { + Iterator<UnsafeRow> sort() throws IOException { try { final UnsafeSorterIterator sortedIterator = sorter.getSortedIterator(); if (!sortedIterator.hasNext()) { @@ -121,7 +118,7 @@ final class UnsafeExternalRowSorter { // here in order to prevent memory leaks. cleanupResources(); } - return new AbstractScalaRowIterator() { + return new AbstractScalaRowIterator<UnsafeRow>() { private final int numFields = schema.length(); private UnsafeRow row = new UnsafeRow(); @@ -132,7 +129,7 @@ final class UnsafeExternalRowSorter { } @Override - public InternalRow next() { + public UnsafeRow next() { try { sortedIterator.loadNext(); row.pointTo( @@ -164,11 +161,11 @@ final class UnsafeExternalRowSorter { } - public Iterator<InternalRow> sort(Iterator<InternalRow> inputIterator) throws IOException { - while (inputIterator.hasNext()) { - insertRow(inputIterator.next()); - } - return sort(); + public Iterator<UnsafeRow> sort(Iterator<UnsafeRow> inputIterator) throws IOException { + while (inputIterator.hasNext()) { + insertRow(inputIterator.next()); + } + return sort(); } /** |