aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2015-11-04 21:30:21 -0800
committerDavies Liu <davies.liu@gmail.com>2015-11-04 21:30:21 -0800
commit81498dd5c86ca51d2fb351c8ef52cbb28e6844f4 (patch)
tree609fd1a3df2aeb64592dee8930ccbcf6efc0ec2e /core
parentd0b56339625727744e2c30fc2167bc6a457d37f7 (diff)
downloadspark-81498dd5c86ca51d2fb351c8ef52cbb28e6844f4.tar.gz
spark-81498dd5c86ca51d2fb351c8ef52cbb28e6844f4.tar.bz2
spark-81498dd5c86ca51d2fb351c8ef52cbb28e6844f4.zip
[SPARK-11425] [SPARK-11486] Improve hybrid aggregation
After aggregation, the dataset could be smaller than inputs, so it's better to do hash based aggregation for all inputs, then using sort based aggregation to merge them. Author: Davies Liu <davies@databricks.com> Closes #9383 from davies/fix_switch.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java46
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java39
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java15
3 files changed, 70 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 07241c827c..6656fd1d0b 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -20,6 +20,7 @@ package org.apache.spark.unsafe.map;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
@@ -638,7 +639,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (valueLength % 8 == 0);
assert(longArray != null);
- if (numElements == MAX_CAPACITY || !canGrowArray) {
+
+ if (numElements == MAX_CAPACITY
+ // The map could be reused from last spill (because of no enough memory to grow),
+ // then we don't try to grow again if hit the `growthThreshold`.
+ || !canGrowArray && numElements > growthThreshold) {
return false;
}
@@ -730,25 +735,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
- * Free the memory used by longArray.
+ * Free all allocated memory associated with this map, including the storage for keys and values
+ * as well as the hash map array itself.
+ *
+ * This method is idempotent and can be called multiple times.
*/
- public void freeArray() {
+ public void free() {
updatePeakMemoryUsed();
if (longArray != null) {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
}
- }
-
- /**
- * Free all allocated memory associated with this map, including the storage for keys and values
- * as well as the hash map array itself.
- *
- * This method is idempotent and can be called multiple times.
- */
- public void free() {
- freeArray();
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
while (dataPagesIterator.hasNext()) {
MemoryBlock dataPage = dataPagesIterator.next();
@@ -834,6 +832,28 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
+ * Returns the underline long[] of longArray.
+ */
+ public long[] getArray() {
+ assert(longArray != null);
+ return (long[]) longArray.memoryBlock().getBaseObject();
+ }
+
+ /**
+ * Reset this map to initialized state.
+ */
+ public void reset() {
+ numElements = 0;
+ Arrays.fill(getArray(), 0);
+ while (dataPages.size() > 0) {
+ MemoryBlock dataPage = dataPages.removeLast();
+ freePage(dataPage);
+ }
+ currentPage = null;
+ pageCursor = 0;
+ }
+
+ /**
* Grows the size of the hash table and re-hash everything.
*/
@VisibleForTesting
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 509fb0a044..cba043bc48 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -79,9 +79,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
- UnsafeInMemorySorter inMemorySorter) {
- return new UnsafeExternalSorter(taskMemoryManager, blockManager,
+ UnsafeInMemorySorter inMemorySorter) throws IOException {
+ UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
+ sorter.spill(Long.MAX_VALUE, sorter);
+ // The external sorter will be used to insert records, in-memory sorter is not needed.
+ sorter.inMemSorter = null;
+ return sorter;
}
public static UnsafeExternalSorter create(
@@ -124,7 +128,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
acquireMemory(inMemSorter.getMemoryUsage());
} else {
this.inMemSorter = existingInMemorySorter;
- // will acquire after free the map
}
this.peakMemoryUsedBytes = getMemoryUsage();
@@ -157,12 +160,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
*/
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
- assert(inMemSorter != null);
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
- } else {
-
}
return 0L; // this should throw exception
}
@@ -389,24 +389,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * Merges another UnsafeExternalSorters into this one, the other one will be emptied.
+ *
+ * @throws IOException
+ */
+ public void merge(UnsafeExternalSorter other) throws IOException {
+ other.spill();
+ spillWriters.addAll(other.spillWriters);
+ // remove them from `spillWriters`, or the files will be deleted in `cleanupResources`.
+ other.spillWriters.clear();
+ other.cleanupResources();
+ }
+
+ /**
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
- assert(inMemSorter != null);
- readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
- int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
+ assert(inMemSorter != null);
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
- new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
+ new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
- spillWriters.clear();
- spillMerger.addSpillIfNotEmpty(readingIterator);
-
+ if (inMemSorter != null) {
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
+ spillMerger.addSpillIfNotEmpty(readingIterator);
+ }
return spillMerger.getSortedIterator();
}
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 1480f0681e..d57213b9b8 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -19,9 +19,9 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.util.Comparator;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.Sorter;
-import org.apache.spark.memory.TaskMemoryManager;
/**
* Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
@@ -78,12 +78,19 @@ public final class UnsafeInMemorySorter {
private int pos = 0;
public UnsafeInMemorySorter(
+ final TaskMemoryManager memoryManager,
+ final RecordComparator recordComparator,
+ final PrefixComparator prefixComparator,
+ int initialSize) {
+ this(memoryManager, recordComparator, prefixComparator, new long[initialSize * 2]);
+ }
+
+ public UnsafeInMemorySorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
final PrefixComparator prefixComparator,
- int initialSize) {
- assert (initialSize > 0);
- this.array = new long[initialSize * 2];
+ long[] array) {
+ this.array = array;
this.memoryManager = memoryManager;
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);