From 6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 6 Jan 2016 23:21:52 -0800 Subject: [SPARK-12295] [SQL] external spilling for window functions This PR manage the memory used by window functions (buffered rows), also enable external spilling. After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G. Author: Davies Liu Closes #10605 from davies/unsafe_window. --- .../unsafe/sort/UnsafeExternalSorter.java | 21 ++++++++++++++++++++- .../unsafe/sort/UnsafeInMemorySorter.java | 18 ++++++++++++------ .../unsafe/sort/UnsafeSorterIterator.java | 2 ++ .../unsafe/sort/UnsafeSorterSpillMerger.java | 7 +++++++ .../unsafe/sort/UnsafeSorterSpillReader.java | 8 +++++++- 5 files changed, 48 insertions(+), 8 deletions(-) (limited to 'core/src') diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java index 77d0b70bb8..68dc0c6d41 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java @@ -45,7 +45,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class); + @Nullable private final PrefixComparator prefixComparator; + @Nullable private final RecordComparator recordComparator; private final TaskMemoryManager taskMemoryManager; private final BlockManager blockManager; @@ -431,7 +433,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer { public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) { this.upstream = inMemIterator; - this.numRecords = inMemIterator.numRecordsLeft(); + this.numRecords = inMemIterator.getNumRecords(); + } + + public int getNumRecords() { + return numRecords; } public long spill() throws IOException { @@ -558,13 +564,23 @@ public final class UnsafeExternalSorter extends MemoryConsumer { private final Queue iterators; private UnsafeSorterIterator current; + private int numRecords; public ChainedIterator(Queue iterators) { assert iterators.size() > 0; + this.numRecords = 0; + for (UnsafeSorterIterator iter: iterators) { + this.numRecords += iter.getNumRecords(); + } this.iterators = iterators; this.current = iterators.remove(); } + @Override + public int getNumRecords() { + return numRecords; + } + @Override public boolean hasNext() { while (!current.hasNext() && !iterators.isEmpty()) { @@ -575,6 +591,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer { @Override public void loadNext() throws IOException { + while (!current.hasNext() && !iterators.isEmpty()) { + current = iterators.remove(); + } current.loadNext(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java index b7ab45675e..f71b8d154c 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java @@ -19,6 +19,8 @@ package org.apache.spark.util.collection.unsafe.sort; import java.util.Comparator; +import org.apache.avro.reflect.Nullable; + import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.unsafe.Platform; @@ -66,7 +68,9 @@ public final class UnsafeInMemorySorter { private final MemoryConsumer consumer; private final TaskMemoryManager memoryManager; + @Nullable private final Sorter sorter; + @Nullable private final Comparator sortComparator; /** @@ -98,10 +102,11 @@ public final class UnsafeInMemorySorter { LongArray array) { this.consumer = consumer; this.memoryManager = memoryManager; - this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); if (recordComparator != null) { + this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE); this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager); } else { + this.sorter = null; this.sortComparator = null; } this.array = array; @@ -190,12 +195,13 @@ public final class UnsafeInMemorySorter { } @Override - public boolean hasNext() { - return position / 2 < numRecords; + public int getNumRecords() { + return numRecords; } - public int numRecordsLeft() { - return numRecords - position / 2; + @Override + public boolean hasNext() { + return position / 2 < numRecords; } @Override @@ -227,7 +233,7 @@ public final class UnsafeInMemorySorter { * {@code next()} will return the same mutable object. */ public SortedIterator getSortedIterator() { - if (sortComparator != null) { + if (sorter != null) { sorter.sort(array, 0, pos / 2, sortComparator); } return new SortedIterator(pos / 2); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java index 16ac2e8d82..1b3167fcc2 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java @@ -32,4 +32,6 @@ public abstract class UnsafeSorterIterator { public abstract int getRecordLength(); public abstract long getKeyPrefix(); + + public abstract int getNumRecords(); } diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java index 3874a9f9cb..ceb59352af 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java @@ -23,6 +23,7 @@ import java.util.PriorityQueue; final class UnsafeSorterSpillMerger { + private int numRecords = 0; private final PriorityQueue priorityQueue; public UnsafeSorterSpillMerger( @@ -59,6 +60,7 @@ final class UnsafeSorterSpillMerger { // priorityQueue, we will have n extra empty records in the result of the UnsafeSorterIterator. spillReader.loadNext(); priorityQueue.add(spillReader); + numRecords += spillReader.getNumRecords(); } } @@ -67,6 +69,11 @@ final class UnsafeSorterSpillMerger { private UnsafeSorterIterator spillReader; + @Override + public int getNumRecords() { + return numRecords; + } + @Override public boolean hasNext() { return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext()); diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java index dcb13e6581..20ee1c8eb0 100644 --- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java +++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java @@ -38,6 +38,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen // Variables that change with every record read: private int recordLength; private long keyPrefix; + private int numRecords; private int numRecordsRemaining; private byte[] arr = new byte[1024 * 1024]; @@ -53,13 +54,18 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen try { this.in = blockManager.wrapForCompression(blockId, bs); this.din = new DataInputStream(this.in); - numRecordsRemaining = din.readInt(); + numRecords = numRecordsRemaining = din.readInt(); } catch (IOException e) { Closeables.close(bs, /* swallowIOException = */ true); throw e; } } + @Override + public int getNumRecords() { + return numRecords; + } + @Override public boolean hasNext() { return (numRecordsRemaining > 0); -- cgit v1.2.3