aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-01-06 23:21:52 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-06 23:21:52 -0800
commit6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d (patch)
treef5096b12f8539866286f6ff6a97281bf6f10a86d /core
parent84e77a15df18ba3f1cc871a3c52c783b46e52369 (diff)
downloadspark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.gz
spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.tar.bz2
spark-6a1c864ab6ee3e869a16ffdbaf6fead21c7aac6d.zip
[SPARK-12295] [SQL] external spilling for window functions
This PR manage the memory used by window functions (buffered rows), also enable external spilling. After this PR, we can run window functions on a partition with hundreds of millions of rows with only 1G. Author: Davies Liu <davies@databricks.com> Closes #10605 from davies/unsafe_window.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java21
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java18
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java2
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java7
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java8
5 files changed, 48 insertions, 8 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 77d0b70bb8..68dc0c6d41 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -45,7 +45,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final Logger logger = LoggerFactory.getLogger(UnsafeExternalSorter.class);
+ @Nullable
private final PrefixComparator prefixComparator;
+ @Nullable
private final RecordComparator recordComparator;
private final TaskMemoryManager taskMemoryManager;
private final BlockManager blockManager;
@@ -431,7 +433,11 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
public SpillableIterator(UnsafeInMemorySorter.SortedIterator inMemIterator) {
this.upstream = inMemIterator;
- this.numRecords = inMemIterator.numRecordsLeft();
+ this.numRecords = inMemIterator.getNumRecords();
+ }
+
+ public int getNumRecords() {
+ return numRecords;
}
public long spill() throws IOException {
@@ -558,14 +564,24 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
private final Queue<UnsafeSorterIterator> iterators;
private UnsafeSorterIterator current;
+ private int numRecords;
public ChainedIterator(Queue<UnsafeSorterIterator> iterators) {
assert iterators.size() > 0;
+ this.numRecords = 0;
+ for (UnsafeSorterIterator iter: iterators) {
+ this.numRecords += iter.getNumRecords();
+ }
this.iterators = iterators;
this.current = iterators.remove();
}
@Override
+ public int getNumRecords() {
+ return numRecords;
+ }
+
+ @Override
public boolean hasNext() {
while (!current.hasNext() && !iterators.isEmpty()) {
current = iterators.remove();
@@ -575,6 +591,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
@Override
public void loadNext() throws IOException {
+ while (!current.hasNext() && !iterators.isEmpty()) {
+ current = iterators.remove();
+ }
current.loadNext();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index b7ab45675e..f71b8d154c 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -19,6 +19,8 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.util.Comparator;
+import org.apache.avro.reflect.Nullable;
+
import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
@@ -66,7 +68,9 @@ public final class UnsafeInMemorySorter {
private final MemoryConsumer consumer;
private final TaskMemoryManager memoryManager;
+ @Nullable
private final Sorter<RecordPointerAndKeyPrefix, LongArray> sorter;
+ @Nullable
private final Comparator<RecordPointerAndKeyPrefix> sortComparator;
/**
@@ -98,10 +102,11 @@ public final class UnsafeInMemorySorter {
LongArray array) {
this.consumer = consumer;
this.memoryManager = memoryManager;
- this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
if (recordComparator != null) {
+ this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);
} else {
+ this.sorter = null;
this.sortComparator = null;
}
this.array = array;
@@ -190,12 +195,13 @@ public final class UnsafeInMemorySorter {
}
@Override
- public boolean hasNext() {
- return position / 2 < numRecords;
+ public int getNumRecords() {
+ return numRecords;
}
- public int numRecordsLeft() {
- return numRecords - position / 2;
+ @Override
+ public boolean hasNext() {
+ return position / 2 < numRecords;
}
@Override
@@ -227,7 +233,7 @@ public final class UnsafeInMemorySorter {
* {@code next()} will return the same mutable object.
*/
public SortedIterator getSortedIterator() {
- if (sortComparator != null) {
+ if (sorter != null) {
sorter.sort(array, 0, pos / 2, sortComparator);
}
return new SortedIterator(pos / 2);
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
index 16ac2e8d82..1b3167fcc2 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterIterator.java
@@ -32,4 +32,6 @@ public abstract class UnsafeSorterIterator {
public abstract int getRecordLength();
public abstract long getKeyPrefix();
+
+ public abstract int getNumRecords();
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
index 3874a9f9cb..ceb59352af 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillMerger.java
@@ -23,6 +23,7 @@ import java.util.PriorityQueue;
final class UnsafeSorterSpillMerger {
+ private int numRecords = 0;
private final PriorityQueue<UnsafeSorterIterator> priorityQueue;
public UnsafeSorterSpillMerger(
@@ -59,6 +60,7 @@ final class UnsafeSorterSpillMerger {
// priorityQueue, we will have n extra empty records in the result of the UnsafeSorterIterator.
spillReader.loadNext();
priorityQueue.add(spillReader);
+ numRecords += spillReader.getNumRecords();
}
}
@@ -68,6 +70,11 @@ final class UnsafeSorterSpillMerger {
private UnsafeSorterIterator spillReader;
@Override
+ public int getNumRecords() {
+ return numRecords;
+ }
+
+ @Override
public boolean hasNext() {
return !priorityQueue.isEmpty() || (spillReader != null && spillReader.hasNext());
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index dcb13e6581..20ee1c8eb0 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -38,6 +38,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
// Variables that change with every record read:
private int recordLength;
private long keyPrefix;
+ private int numRecords;
private int numRecordsRemaining;
private byte[] arr = new byte[1024 * 1024];
@@ -53,7 +54,7 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
try {
this.in = blockManager.wrapForCompression(blockId, bs);
this.din = new DataInputStream(this.in);
- numRecordsRemaining = din.readInt();
+ numRecords = numRecordsRemaining = din.readInt();
} catch (IOException e) {
Closeables.close(bs, /* swallowIOException = */ true);
throw e;
@@ -61,6 +62,11 @@ public final class UnsafeSorterSpillReader extends UnsafeSorterIterator implemen
}
@Override
+ public int getNumRecords() {
+ return numRecords;
+ }
+
+ @Override
public boolean hasNext() {
return (numRecordsRemaining > 0);
}