diff options
author | Davies Liu <davies@databricks.com> | 2016-01-06 23:21:52 -0800 |
---|---|---|
committer | Davies Liu <davies.liu@gmail.com> | 2016-01-06 23:21:52 -0800 |
commit | 6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d (patch) | |
tree | f5096b12f8539866286f6ff6a97281bf6f10a86d /core/src/main/java | |
parent | 84e77a15df18ba3f1cc871a3c52c783b46e52369 (diff) | |
download | spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.gz spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.bz2 spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.zip |
[SPARK-12295] [SQL] external spilling for window functions
This PR manage the memory used by window functions (buffered rows), also enable external spilling.
After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G.
Author: Davies Liu <davies@databricks.com>
Closes #10605 from davies/unsafe_window.
Diffstat (limited to 'core/src/main/java')
5 files changed, 48 insertions, 8 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 77d0b70bb8..68dc0c6d41 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -45,7 +45,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class); + @Nullable private final PrefixComparator prefixComparator; + @Nullable private final RecordComparator recordComparator; private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; @@ -431,7 +433,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer { public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) { this.upstream = inMemIterator; - this.numRecords = inMemIterator.numRecordsLeft(); + this.numRecords = inMemIterator.getNumRecords(); + } + + public int getNumRecords() { + return numRecords; } public long spill() throws IOException { @@ -558,14 +564,24 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final Queue<UnsafeSorterIterator> iterators; private UnsafeSorterIterator current; + private int numRecords; public ChainedIterator(Queue<UnsafeSorterIterator> iterators) { assert iterators.size() > 0; + this.numRecords = 0; + for (UnsafeSorterIterator iter: iterators) { + this.numRecords += iter.getNumRecords(); + } this.iterators = iterators; this.current = iterators.remove(); } @Override + public int getNumRecords() { + return numRecords; + } + + @Override public boolean hasNext() { while (!current.hasNext() && !iterators.isEmpty()) { current = iterators.remove(); @@ -575,6 +591,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { @Override public void loadNext() throws IOException { + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } current.loadNext(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index b7ab45675e..f71b8d154c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -19,6 +19,8 @@ package org.apache.spark.util.collection.unsafe.sort; import java.util.Comparator; +import org.apache.avro.reflect.Nullable; + import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -66,7 +68,9 @@ public final class UnsafeInMemorySorter { private final MemoryConsumer consumer; private final TaskMemoryManager memoryManager; + @Nullable private final Sorter<RecordPointerAndKeyPrefix, LongArray> sorter; + @Nullable private final Comparator<RecordPointerAndKeyPrefix> sortComparator; /** @@ -98,10 +102,11 @@ public final class UnsafeInMemorySorter { LongArray array) { this.consumer = consumer; this.memoryManager = memoryManager; - this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); if (recordComparator != null) { + this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); } else { + this.sorter = null; this.sortComparator = null; } this.array = array; @@ -190,12 +195,13 @@ public final class UnsafeInMemorySorter { } @Override - public boolean hasNext() { - return position / 2 < numRecords; + public int getNumRecords() { + return numRecords; } - public int numRecordsLeft() { - return numRecords - position / 2; + @Override + public boolean hasNext() { + return position / 2 < numRecords; } @Override @@ -227,7 +233,7 @@ public final class UnsafeInMemorySorter { * {@code next()} will return the same mutable object. */ public SortedIterator getSortedIterator() { - if (sortComparator != null) { + if (sorter != null) { sorter.sort(array, 0, pos / 2, sortComparator); } return new SortedIterator(pos / 2); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java index 16ac2e8d82..1b3167fcc2 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java @@ -32,4 +32,6 @@ public abstract class UnsafeSorterIterator { public abstract int getRecordLength(); public abstract long getKeyPrefix(); + + public abstract int getNumRecords(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index 3874a9f9cb..ceb59352af 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -23,6 +23,7 @@ import java.util.PriorityQueue; final class UnsafeSorterSpillMerger { + private int numRecords = 0; private final PriorityQueue<UnsafeSorterIterator> priorityQueue; public UnsafeSorterSpillMerger( @@ -59,6 +60,7 @@ final class UnsafeSorterSpillMerger { // priorityQueue, we will have n extra empty records in the result of the UnsafeSorterIterator. spillReader.loadNext(); priorityQueue.add(spillReader); + numRecords += spillReader.getNumRecords(); } } @@ -68,6 +70,11 @@ final class UnsafeSorterSpillMerger { private UnsafeSorterIterator spillReader; @Override + public int getNumRecords() { + return numRecords; + } + + @Override public boolean hasNext() { return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext()); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index dcb13e6581..20ee1c8eb0 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -38,6 +38,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen // Variables that change with every record read: private int recordLength; private long keyPrefix; + private int numRecords; private int numRecordsRemaining; private byte[] arr = new byte[1024 * 1024]; @@ -53,7 +54,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen try { this.in = blockManager.wrapForCompression(blockId, bs); this.din = new DataInputStream(this.in); - numRecordsRemaining = din.readInt(); + numRecords = numRecordsRemaining = din.readInt(); } catch (IOException e) { Closeables.close(bs, /* swallowIOException = */ true); throw e; @@ -61,6 +62,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen } @Override + public int getNumRecords() { + return numRecords; + } + + @Override public boolean hasNext() { return (numRecordsRemaining > 0); } |