diff options
Diffstat (limited to 'core/src/main/java/org/apache')
2 files changed, 13 insertions, 11 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 79d74b23ce..77d0b70bb8 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -400,6 +400,7 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * after consuming this iterator. */ public UnsafeSorterIterator getSortedIterator() throws IOException { + assert(recordComparator != null); if (spillWriters.isEmpty()) { assert(inMemSorter != null); readingIterator = new SpillableIterator(inMemSorter.getSortedIterator()); @@ -531,18 +532,20 @@ public final class UnsafeExternalSorter extends MemoryConsumer { * * It is the caller's responsibility to call `cleanupResources()` * after consuming this iterator. + * + * TODO: support forced spilling */ public UnsafeSorterIterator getIterator() throws IOException { if (spillWriters.isEmpty()) { assert(inMemSorter != null); - return inMemSorter.getIterator(); + return inMemSorter.getSortedIterator(); } else { LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); for (UnsafeSorterSpillWriter spillWriter : spillWriters) { queue.add(spillWriter.getReader(blockManager)); } if (inMemSorter != null) { - queue.add(inMemSorter.getIterator()); + queue.add(inMemSorter.getSortedIterator()); } return new ChainedIterator(queue); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index c16cbce9a0..b7ab45675e 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -99,7 +99,11 @@ public final class UnsafeInMemorySorter { this.consumer = consumer; this.memoryManager = memoryManager; this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); - this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); + if (recordComparator != null) { + this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); + } else { + this.sortComparator = null; + } this.array = array; } @@ -223,14 +227,9 @@ public final class UnsafeInMemorySorter { * {@code next()} will return the same mutable object. */ public SortedIterator getSortedIterator() { - sorter.sort(array, 0, pos / 2, sortComparator); - return new SortedIterator(pos / 2); - } - - /** - * Returns an iterator over record pointers in original order (inserted). - */ - public SortedIterator getIterator() { + if (sortComparator != null) { + sorter.sort(array, 0, pos / 2, sortComparator); + } return new SortedIterator(pos / 2); } } |