aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java63
1 files changed, 63 insertions, 0 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 2e40312674..5a97f4f113 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -21,6 +21,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
+import java.util.Queue;
import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
@@ -521,4 +522,66 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
return upstream.getKeyPrefix();
}
}
+
+ /**
+ * Returns a iterator, which will return the rows in the order as inserted.
+ *
+ * It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
+ public UnsafeSorterIterator getIterator() throws IOException {
+ if (spillWriters.isEmpty()) {
+ assert(inMemSorter != null);
+ return inMemSorter.getIterator();
+ } else {
+ LinkedList<UnsafeSorterIterator> queue = new LinkedList<>();
+ for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
+ queue.add(spillWriter.getReader(blockManager));
+ }
+ if (inMemSorter != null) {
+ queue.add(inMemSorter.getIterator());
+ }
+ return new ChainedIterator(queue);
+ }
+ }
+
+ /**
+ * Chain multiple UnsafeSorterIterator together as single one.
+ */
+ class ChainedIterator extends UnsafeSorterIterator {
+
+ private final Queue<UnsafeSorterIterator> iterators;
+ private UnsafeSorterIterator current;
+
+ public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
+ assert iterators.size() > 0;
+ this.iterators = iterators;
+ this.current = iterators.remove();
+ }
+
+ @Override
+ public boolean hasNext() {
+ while (!current.hasNext() && !iterators.isEmpty()) {
+ current = iterators.remove();
+ }
+ return current.hasNext();
+ }
+
+ @Override
+ public void loadNext() throws IOException {
+ current.loadNext();
+ }
+
+ @Override
+ public Object getBaseObject() { return current.getBaseObject(); }
+
+ @Override
+ public long getBaseOffset() { return current.getBaseOffset(); }
+
+ @Override
+ public int getRecordLength() { return current.getRecordLength(); }
+
+ @Override
+ public long getKeyPrefix() { return current.getKeyPrefix(); }
+ }
}