aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java46
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java39
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java15
3 files changed, 70 insertions, 30 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index 07241c827c..6656fd1d0b 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -20,6 +20,7 @@ package org.apache.spark.unsafe.map;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Iterator;
import java.util.LinkedList;
@@ -638,7 +639,11 @@ public final class BytesToBytesMap extends MemoryConsumer {
assert (valueLength % 8 == 0);
assert(longArray != null);
- if (numElements == MAX_CAPACITY || !canGrowArray) {
+
+ if (numElements == MAX_CAPACITY
+ // The map could be reused from last spill (because of no enough memory to grow),
+ // then we don't try to grow again if hit the `growthThreshold`.
+ || !canGrowArray && numElements > growthThreshold) {
return false;
}
@@ -730,25 +735,18 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
- * Free the memory used by longArray.
+ * Free all allocated memory associated with this map, including the storage for keys and values
+ * as well as the hash map array itself.
+ *
+ * This method is idempotent and can be called multiple times.
*/
- public void freeArray() {
+ public void free() {
updatePeakMemoryUsed();
if (longArray != null) {
long used = longArray.memoryBlock().size();
longArray = null;
releaseMemory(used);
}
- }
-
- /**
- * Free all allocated memory associated with this map, including the storage for keys and values
- * as well as the hash map array itself.
- *
- * This method is idempotent and can be called multiple times.
- */
- public void free() {
- freeArray();
Iterator<MemoryBlock> dataPagesIterator = dataPages.iterator();
while (dataPagesIterator.hasNext()) {
MemoryBlock dataPage = dataPagesIterator.next();
@@ -834,6 +832,28 @@ public final class BytesToBytesMap extends MemoryConsumer {
}
/**
+ * Returns the underline long[] of longArray.
+ */
+ public long[] getArray() {
+ assert(longArray != null);
+ return (long[]) longArray.memoryBlock().getBaseObject();
+ }
+
+ /**
+ * Reset this map to initialized state.
+ */
+ public void reset() {
+ numElements = 0;
+ Arrays.fill(getArray(), 0);
+ while (dataPages.size() > 0) {
+ MemoryBlock dataPage = dataPages.removeLast();
+ freePage(dataPage);
+ }
+ currentPage = null;
+ pageCursor = 0;
+ }
+
+ /**
* Grows the size of the hash table and re-hash everything.
*/
@VisibleForTesting
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index 509fb0a044..cba043bc48 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -79,9 +79,13 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
PrefixComparator prefixComparator,
int initialSize,
long pageSizeBytes,
- UnsafeInMemorySorter inMemorySorter) {
- return new UnsafeExternalSorter(taskMemoryManager, blockManager,
+ UnsafeInMemorySorter inMemorySorter) throws IOException {
+ UnsafeExternalSorter sorter = new UnsafeExternalSorter(taskMemoryManager, blockManager,
taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
+ sorter.spill(Long.MAX_VALUE, sorter);
+ // The external sorter will be used to insert records, in-memory sorter is not needed.
+ sorter.inMemSorter = null;
+ return sorter;
}
public static UnsafeExternalSorter create(
@@ -124,7 +128,6 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
acquireMemory(inMemSorter.getMemoryUsage());
} else {
this.inMemSorter = existingInMemorySorter;
- // will acquire after free the map
}
this.peakMemoryUsedBytes = getMemoryUsage();
@@ -157,12 +160,9 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
*/
@Override
public long spill(long size, MemoryConsumer trigger) throws IOException {
- assert(inMemSorter != null);
if (trigger != this) {
if (readingIterator != null) {
return readingIterator.spill();
- } else {
-
}
return 0L; // this should throw exception
}
@@ -389,24 +389,37 @@ public final class UnsafeExternalSorter extends MemoryConsumer {
}
/**
+ * Merges another UnsafeExternalSorters into this one, the other one will be emptied.
+ *
+ * @throws IOException
+ */
+ public void merge(UnsafeExternalSorter other) throws IOException {
+ other.spill();
+ spillWriters.addAll(other.spillWriters);
+ // remove them from `spillWriters`, or the files will be deleted in `cleanupResources`.
+ other.spillWriters.clear();
+ other.cleanupResources();
+ }
+
+ /**
* Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
* after consuming this iterator.
*/
public UnsafeSorterIterator getSortedIterator() throws IOException {
- assert(inMemSorter != null);
- readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
- int numIteratorsToMerge = spillWriters.size() + (readingIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
+ assert(inMemSorter != null);
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
return readingIterator;
} else {
final UnsafeSorterSpillMerger spillMerger =
- new UnsafeSorterSpillMerger(recordComparator, prefixComparator, numIteratorsToMerge);
+ new UnsafeSorterSpillMerger(recordComparator, prefixComparator, spillWriters.size());
for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
spillMerger.addSpillIfNotEmpty(spillWriter.getReader(blockManager));
}
- spillWriters.clear();
- spillMerger.addSpillIfNotEmpty(readingIterator);
-
+ if (inMemSorter != null) {
+ readingIterator = new SpillableIterator(inMemSorter.getSortedIterator());
+ spillMerger.addSpillIfNotEmpty(readingIterator);
+ }
return spillMerger.getSortedIterator();
}
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index 1480f0681e..d57213b9b8 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -19,9 +19,9 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.util.Comparator;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.unsafe.Platform;
import org.apache.spark.util.collection.Sorter;
-import org.apache.spark.memory.TaskMemoryManager;
/**
* Sorts records using an AlphaSort-style key-prefix sort. This sort stores pointers to records
@@ -78,12 +78,19 @@ public final class UnsafeInMemorySorter {
private int pos = 0;
public UnsafeInMemorySorter(
+ final TaskMemoryManager memoryManager,
+ final RecordComparator recordComparator,
+ final PrefixComparator prefixComparator,
+ int initialSize) {
+ this(memoryManager, recordComparator, prefixComparator, new long[initialSize * 2]);
+ }
+
+ public UnsafeInMemorySorter(
final TaskMemoryManager memoryManager,
final RecordComparator recordComparator,
final PrefixComparator prefixComparator,
- int initialSize) {
- assert (initialSize > 0);
- this.array = new long[initialSize * 2];
+ long[] array) {
+ this.array = array;
this.memoryManager = memoryManager;
this.sorter = new Sorter<>(UnsafeSortDataFormat.INSTANCE);
this.sortComparator = new SortComparator(recordComparator, prefixComparator, memoryManager);