diff options
Diffstat (limited to 'core/src/main/java/org/apache')
2 files changed, 70 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 2e40312674..5a97f4f113 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -21,6 +21,7 @@ import javax.annotation.Nullable; import java.io.File; import java.io.IOException; import java.util.LinkedList; +import java.util.Queue; import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; @@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends MemoryConsumer { return upstream.getKeyPrefix(); } } + + /** + * Returns a iterator, which will return the rows in the order as inserted. + * + * It is the caller's responsibility to call `cleanupResources()` + * after consuming this iterator. + */ + public UnsafeSorterIterator getIterator() throws IOException { + if (spillWriters.isEmpty()) { + assert(inMemSorter != null); + return inMemSorter.getIterator(); + } else { + LinkedList<UnsafeSorterIterator> queue = new LinkedList<>(); + for (UnsafeSorterSpillWriter spillWriter : spillWriters) { + queue.add(spillWriter.getReader(blockManager)); + } + if (inMemSorter != null) { + queue.add(inMemSorter.getIterator()); + } + return new ChainedIterator(queue); + } + } + + /** + * Chain multiple UnsafeSorterIterator together as single one. + */ + class ChainedIterator extends UnsafeSorterIterator { + + private final Queue<UnsafeSorterIterator> iterators; + private UnsafeSorterIterator current; + + public ChainedIterator(Queue<UnsafeSorterIterator> iterators) { + assert iterators.size() > 0; + this.iterators = iterators; + this.current = iterators.remove(); + } + + @Override + public boolean hasNext() { + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } + return current.hasNext(); + } + + @Override + public void loadNext() throws IOException { + current.loadNext(); + } + + @Override + public Object getBaseObject() { return current.getBaseObject(); } + + @Override + public long getBaseOffset() { return current.getBaseOffset(); } + + @Override + public int getRecordLength() { return current.getRecordLength(); } + + @Override + public long getKeyPrefix() { return current.getKeyPrefix(); } + } } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index dce1f15a29..c91e88f31b 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -226,4 +226,11 @@ public final class UnsafeInMemorySorter { sorter.sort(array, 0, pos / 2, sortComparator); return new SortedIterator(pos / 2); } + + /** + * Returns an iterator over record pointers in original order (inserted). + */ + public SortedIterator getIterator() { + return new SortedIterator(pos / 2); + } } |