aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorReynold Xin <rxin@databricks.com>2015-08-02 12:32:14 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-08-02 12:32:14 -0700
commit2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f (patch)
treef7458ae297d36bba1acf21fd08169defef6c2ef8 /core
parent66924ffa6bdb8e0df1b90b789cb7ad443377e729 (diff)
downloadspark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.gz
spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.tar.bz2
spark-2e981b7bfa9dec93fdcf25f3e7220cd6aaba744f.zip
[SPARK-9531] [SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter
This pull request adds a destructAndCreateExternalSorter method to UnsafeFixedWidthAggregationMap. The new method does the following: 1. Creates a new external sorter UnsafeKVExternalSorter 2. Adds all the data into an in-memory sorter, sorts them 3. Spills the sorted in-memory data to disk This method can be used to fallback to sort-based aggregation when under memory pressure. The pull request also includes accounting fixes from JoshRosen. TODOs (that can be done in follow-up PRs) - [x] Address Josh's feedbacks from #7849 - [x] More documentation and test cases - [x] Make sure we are doing memory accounting correctly with test cases (e.g. did we release the memory in BytesToBytesMap twice?) - [ ] Look harder at possible memory leaks and exception handling - [ ] Randomized tester for the KV sorter as well as the aggregation map Author: Reynold Xin <rxin@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Closes #7860 from rxin/kvsorter and squashes the following commits: 986a58c [Reynold Xin] Bug fix. 599317c [Reynold Xin] Style fix and slightly more compact code. fe7bd4e [Reynold Xin] Bug fixes. fd71bef [Reynold Xin] Merge remote-tracking branch 'josh/large-records-in-sql-sorter' into kvsorter-with-josh-fix 3efae38 [Reynold Xin] More fixes and documentation. 45f1b09 [Josh Rosen] Ensure that spill files are cleaned up f6a9bd3 [Reynold Xin] Josh feedback. 9be8139 [Reynold Xin] Remove testSpillFrequency. 7cbe759 [Reynold Xin] [SPARK-9531][SQL] UnsafeFixedWidthAggregationMap.destructAndCreateExternalSorter. ae4a8af [Josh Rosen] Detect leaked unsafe memory in UnsafeExternalSorterSuite. 52f9b06 [Josh Rosen] Detect ShuffleMemoryManager leaks in UnsafeExternalSorter.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java32
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java197
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java4
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java3
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java4
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java7
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java65
7 files changed, 237 insertions, 75 deletions
diff --git a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
index cf222b7272..01a66084e9 100644
--- a/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
+++ b/core/src/main/java/org/apache/spark/unsafe/map/BytesToBytesMap.java
@@ -39,14 +39,22 @@ import org.apache.spark.unsafe.memory.*;
/**
* An append-only hash map where keys and values are contiguous regions of bytes.
- * <p>
+ *
* This is backed by a power-of-2-sized hash table, using quadratic probing with triangular numbers,
* which is guaranteed to exhaust the space.
- * <p>
+ *
* The map can support up to 2^29 keys. If the key cardinality is higher than this, you should
* probably be using sorting instead of hashing for better cache locality.
- * <p>
- * This class is not thread safe.
+ *
+ * The key and values under the hood are stored together, in the following format:
+ * Bytes 0 to 4: len(k) (key length in bytes) + len(v) (value length in bytes) + 4
+ * Bytes 4 to 8: len(k)
+ * Bytes 8 to 8 + len(k): key data
+ * Bytes 8 + len(k) to 8 + len(k) + len(v): value data
+ *
+ * This means that the first four bytes store the entire record (key + value) length. This format
+ * is consistent with {@link org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter},
+ * so we can pass records from this map directly into the sorter to sort records in place.
*/
public final class BytesToBytesMap {
@@ -253,7 +261,7 @@ public final class BytesToBytesMap {
totalLength = PlatformDependent.UNSAFE.getInt(pageBaseObject, offsetInPage);
}
loc.with(currentPage, offsetInPage);
- offsetInPage += 8 + totalLength;
+ offsetInPage += 4 + totalLength;
currentRecordNumber++;
return loc;
}
@@ -366,7 +374,7 @@ public final class BytesToBytesMap {
position += 4;
keyLength = PlatformDependent.UNSAFE.getInt(page, position);
position += 4;
- valueLength = totalLength - keyLength;
+ valueLength = totalLength - keyLength - 4;
keyMemoryLocation.setObjAndOffset(page, position);
@@ -565,7 +573,7 @@ public final class BytesToBytesMap {
insertCursor += valueLengthBytes; // word used to store the value size
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, recordOffset,
- keyLengthBytes + valueLengthBytes);
+ keyLengthBytes + valueLengthBytes + 4);
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, keyLengthOffset, keyLengthBytes);
// Copy the key
PlatformDependent.copyMemory(
@@ -620,7 +628,7 @@ public final class BytesToBytesMap {
* Free all allocated memory associated with this map, including the storage for keys and values
* as well as the hash map array itself.
*
- * This method is idempotent.
+ * This method is idempotent and can be called multiple times.
*/
public void free() {
longArray = null;
@@ -639,6 +647,14 @@ public final class BytesToBytesMap {
return taskMemoryManager;
}
+ public ShuffleMemoryManager getShuffleMemoryManager() {
+ return shuffleMemoryManager;
+ }
+
+ public long getPageSizeBytes() {
+ return pageSizeBytes;
+ }
+
/** Returns the total amount of memory, in bytes, consumed by this map's managed structures. */
public long getTotalMemoryConsumption() {
long totalDataPagesSize = 0L;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index c05f2c332e..b984301cbb 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -17,9 +17,12 @@
package org.apache.spark.util.collection.unsafe.sort;
+import java.io.File;
import java.io.IOException;
import java.util.LinkedList;
+import javax.annotation.Nullable;
+
import scala.runtime.AbstractFunction0;
import scala.runtime.BoxedUnit;
@@ -27,7 +30,6 @@ import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ShuffleMemoryManager;
@@ -48,7 +50,7 @@ public final class UnsafeExternalSorter {
private final PrefixComparator prefixComparator;
private final RecordComparator recordComparator;
private final int initialSize;
- private final TaskMemoryManager memoryManager;
+ private final TaskMemoryManager taskMemoryManager;
private final ShuffleMemoryManager shuffleMemoryManager;
private final BlockManager blockManager;
private final TaskContext taskContext;
@@ -63,26 +65,57 @@ public final class UnsafeExternalSorter {
* this might not be necessary if we maintained a pool of re-usable pages in the TaskMemoryManager
* itself).
*/
- private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<MemoryBlock>();
+ private final LinkedList<MemoryBlock> allocatedPages = new LinkedList<>();
+
+ private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
// These variables are reset after spilling:
- private UnsafeInMemorySorter sorter;
+ private UnsafeInMemorySorter inMemSorter;
+ // Whether the in-mem sorter is created internally, or passed in from outside.
+ // If it is passed in from outside, we shouldn't release the in-mem sorter's memory.
+ private boolean isInMemSorterExternal = false;
private MemoryBlock currentPage = null;
private long currentPagePosition = -1;
private long freeSpaceInCurrentPage = 0;
- private final LinkedList<UnsafeSorterSpillWriter> spillWriters = new LinkedList<>();
+ public static UnsafeExternalSorter createWithExistingInMemorySorter(
+ TaskMemoryManager taskMemoryManager,
+ ShuffleMemoryManager shuffleMemoryManager,
+ BlockManager blockManager,
+ TaskContext taskContext,
+ RecordComparator recordComparator,
+ PrefixComparator prefixComparator,
+ int initialSize,
+ long pageSizeBytes,
+ UnsafeInMemorySorter inMemorySorter) throws IOException {
+ return new UnsafeExternalSorter(taskMemoryManager, shuffleMemoryManager, blockManager,
+ taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, inMemorySorter);
+ }
- public UnsafeExternalSorter(
- TaskMemoryManager memoryManager,
+ public static UnsafeExternalSorter create(
+ TaskMemoryManager taskMemoryManager,
ShuffleMemoryManager shuffleMemoryManager,
BlockManager blockManager,
TaskContext taskContext,
RecordComparator recordComparator,
PrefixComparator prefixComparator,
int initialSize,
- SparkConf conf) throws IOException {
- this.memoryManager = memoryManager;
+ long pageSizeBytes) throws IOException {
+ return new UnsafeExternalSorter(taskMemoryManager, shuffleMemoryManager, blockManager,
+ taskContext, recordComparator, prefixComparator, initialSize, pageSizeBytes, null);
+ }
+
+ private UnsafeExternalSorter(
+ TaskMemoryManager taskMemoryManager,
+ ShuffleMemoryManager shuffleMemoryManager,
+ BlockManager blockManager,
+ TaskContext taskContext,
+ RecordComparator recordComparator,
+ PrefixComparator prefixComparator,
+ int initialSize,
+ long pageSizeBytes,
+ @Nullable UnsafeInMemorySorter existingInMemorySorter) throws IOException {
+ this.taskMemoryManager = taskMemoryManager;
this.shuffleMemoryManager = shuffleMemoryManager;
this.blockManager = blockManager;
this.taskContext = taskContext;
@@ -90,9 +123,18 @@ public final class UnsafeExternalSorter {
this.prefixComparator = prefixComparator;
this.initialSize = initialSize;
// Use getSizeAsKb (not bytes) to maintain backwards compatibility for units
- this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
- this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
- initializeForWriting();
+ // this.fileBufferSizeBytes = (int) conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
+ this.fileBufferSizeBytes = 32 * 1024;
+ // this.pageSizeBytes = conf.getSizeAsBytes("spark.buffer.pageSize", "64m");
+ this.pageSizeBytes = pageSizeBytes;
+ this.writeMetrics = new ShuffleWriteMetrics();
+
+ if (existingInMemorySorter == null) {
+ initializeForWriting();
+ } else {
+ this.isInMemSorterExternal = true;
+ this.inMemSorter = existingInMemorySorter;
+ }
// Register a cleanup task with TaskContext to ensure that memory is guaranteed to be freed at
// the end of the task. This is necessary to avoid memory leaks in when the downstream operator
@@ -100,6 +142,7 @@ public final class UnsafeExternalSorter {
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
+ deleteSpillFiles();
freeMemory();
return null;
}
@@ -114,22 +157,31 @@ public final class UnsafeExternalSorter {
*/
private void initializeForWriting() throws IOException {
this.writeMetrics = new ShuffleWriteMetrics();
- // TODO: move this sizing calculation logic into a static method of sorter:
- final long memoryRequested = initialSize * 8L * 2;
- final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryRequested);
- if (memoryAcquired != memoryRequested) {
+ final long pointerArrayMemory =
+ UnsafeInMemorySorter.getMemoryRequirementsForPointerArray(initialSize);
+ final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pointerArrayMemory);
+ if (memoryAcquired != pointerArrayMemory) {
shuffleMemoryManager.release(memoryAcquired);
- throw new IOException("Could not acquire " + memoryRequested + " bytes of memory");
+ throw new IOException("Could not acquire " + pointerArrayMemory + " bytes of memory");
}
- this.sorter =
- new UnsafeInMemorySorter(memoryManager, recordComparator, prefixComparator, initialSize);
+ this.inMemSorter =
+ new UnsafeInMemorySorter(taskMemoryManager, recordComparator, prefixComparator, initialSize);
+ this.isInMemSorterExternal = false;
}
/**
- * Sort and spill the current records in response to memory pressure.
+ * Marks the current page as no-more-space-available, and as a result, either allocate a
+ * new page or spill when we see the next record.
*/
@VisibleForTesting
+ public void closeCurrentPage() {
+ freeSpaceInCurrentPage = 0;
+ }
+
+ /**
+ * Sort and spill the current records in response to memory pressure.
+ */
public void spill() throws IOException {
logger.info("Thread {} spilling sort data of {} to disk ({} {} so far)",
Thread.currentThread().getId(),
@@ -139,9 +191,9 @@ public final class UnsafeExternalSorter {
final UnsafeSorterSpillWriter spillWriter =
new UnsafeSorterSpillWriter(blockManager, fileBufferSizeBytes, writeMetrics,
- sorter.numRecords());
+ inMemSorter.numRecords());
spillWriters.add(spillWriter);
- final UnsafeSorterIterator sortedRecords = sorter.getSortedIterator();
+ final UnsafeSorterIterator sortedRecords = inMemSorter.getSortedIterator();
while (sortedRecords.hasNext()) {
sortedRecords.loadNext();
final Object baseObject = sortedRecords.getBaseObject();
@@ -150,20 +202,24 @@ public final class UnsafeExternalSorter {
spillWriter.write(baseObject, baseOffset, recordLength, sortedRecords.getKeyPrefix());
}
spillWriter.close();
- final long sorterMemoryUsage = sorter.getMemoryUsage();
- sorter = null;
- shuffleMemoryManager.release(sorterMemoryUsage);
final long spillSize = freeMemory();
+ // Note that this is more-or-less going to be a multiple of the page size, so wasted space in
+ // pages will currently be counted as memory spilled even though that space isn't actually
+ // written to disk. This also counts the space needed to store the sorter's pointer array.
taskContext.taskMetrics().incMemoryBytesSpilled(spillSize);
initializeForWriting();
}
+ /**
+ * Return the total memory usage of this sorter, including the data pages and the sorter's pointer
+ * array.
+ */
private long getMemoryUsage() {
long totalPageSize = 0;
for (MemoryBlock page : allocatedPages) {
totalPageSize += page.size();
}
- return sorter.getMemoryUsage() + totalPageSize;
+ return inMemSorter.getMemoryUsage() + totalPageSize;
}
@VisibleForTesting
@@ -171,13 +227,26 @@ public final class UnsafeExternalSorter {
return allocatedPages.size();
}
+ /**
+ * Free this sorter's in-memory data structures, including its data pages and pointer array.
+ *
+ * @return the number of bytes freed.
+ */
public long freeMemory() {
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
- memoryManager.freePage(block);
+ taskMemoryManager.freePage(block);
shuffleMemoryManager.release(block.size());
memoryFreed += block.size();
}
+ if (inMemSorter != null) {
+ if (!isInMemSorterExternal) {
+ long sorterMemoryUsage = inMemSorter.getMemoryUsage();
+ memoryFreed += sorterMemoryUsage;
+ shuffleMemoryManager.release(sorterMemoryUsage);
+ }
+ inMemSorter = null;
+ }
allocatedPages.clear();
currentPage = null;
currentPagePosition = -1;
@@ -186,6 +255,20 @@ public final class UnsafeExternalSorter {
}
/**
+ * Deletes any spill files created by this sorter.
+ */
+ public void deleteSpillFiles() {
+ for (UnsafeSorterSpillWriter spill : spillWriters) {
+ File file = spill.getFile();
+ if (file != null && file.exists()) {
+ if (!file.delete()) {
+ logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
+ };
+ }
+ }
+ }
+
+ /**
* Checks whether there is enough space to insert a new record into the sorter.
*
* @param requiredSpace the required space in the data page, in bytes, including space for storing
@@ -195,7 +278,7 @@ public final class UnsafeExternalSorter {
*/
private boolean haveSpaceForRecord(int requiredSpace) {
assert (requiredSpace > 0);
- return (sorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
+ return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
}
/**
@@ -210,16 +293,16 @@ public final class UnsafeExternalSorter {
// TODO: merge these steps to first calculate total memory requirements for this insert,
// then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
// data page.
- if (!sorter.hasSpaceForAnotherRecord()) {
+ if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
- final long oldPointerArrayMemoryUsage = sorter.getMemoryUsage();
+ final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
final long memoryToGrowPointerArray = oldPointerArrayMemoryUsage * 2;
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(memoryToGrowPointerArray);
if (memoryAcquired < memoryToGrowPointerArray) {
shuffleMemoryManager.release(memoryAcquired);
spill();
} else {
- sorter.expandPointerArray();
+ inMemSorter.expandPointerArray();
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
@@ -236,7 +319,9 @@ public final class UnsafeExternalSorter {
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
- shuffleMemoryManager.release(memoryAcquired);
+ if (memoryAcquired > 0) {
+ shuffleMemoryManager.release(memoryAcquired);
+ }
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
@@ -244,7 +329,7 @@ public final class UnsafeExternalSorter {
throw new IOException("Unable to acquire " + pageSizeBytes + " bytes of memory");
}
}
- currentPage = memoryManager.allocatePage(pageSizeBytes);
+ currentPage = taskMemoryManager.allocatePage(pageSizeBytes);
currentPagePosition = currentPage.getBaseOffset();
freeSpaceInCurrentPage = pageSizeBytes;
allocatedPages.add(currentPage);
@@ -267,7 +352,7 @@ public final class UnsafeExternalSorter {
}
final long recordAddress =
- memoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
+ taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
final Object dataPageBaseObject = currentPage.getBaseObject();
PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
currentPagePosition += 4;
@@ -279,26 +364,48 @@ public final class UnsafeExternalSorter {
lengthInBytes);
currentPagePosition += lengthInBytes;
freeSpaceInCurrentPage -= totalSpaceRequired;
- sorter.insertRecord(recordAddress, prefix);
+ inMemSorter.insertRecord(recordAddress, prefix);
}
/**
- * Write a record to the sorter. The record is broken down into two different parts, and
+ * Write a key-value record to the sorter. The key and value will be put together in-memory,
+ * using the following format:
*
+ * record length (4 bytes), key length (4 bytes), key data, value data
+ *
+ * record length = key length + value length + 4
*/
- public void insertRecord(
- Object recordBaseObject1,
- long recordBaseOffset1,
- int lengthInBytes1,
- Object recordBaseObject2,
- long recordBaseOffset2,
- int lengthInBytes2,
- long prefix) throws IOException {
+ public void insertKVRecord(
+ Object keyBaseObj, long keyOffset, int keyLen,
+ Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {
+ final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
+ if (!haveSpaceForRecord(totalSpaceRequired)) {
+ allocateSpaceForRecord(totalSpaceRequired);
+ }
+
+ final long recordAddress =
+ taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
+ final Object dataPageBaseObject = currentPage.getBaseObject();
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4);
+ currentPagePosition += 4;
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen);
+ currentPagePosition += 4;
+
+ PlatformDependent.copyMemory(
+ keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen);
+ currentPagePosition += keyLen;
+
+ PlatformDependent.copyMemory(
+ valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen);
+ currentPagePosition += valueLen;
+
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ inMemSorter.insertRecord(recordAddress, prefix);
}
public UnsafeSorterIterator getSortedIterator() throws IOException {
- final UnsafeSorterIterator inMemoryIterator = sorter.getSortedIterator();
+ final UnsafeSorterIterator inMemoryIterator = inMemSorter.getSortedIterator();
int numIteratorsToMerge = spillWriters.size() + (inMemoryIterator.hasNext() ? 1 : 0);
if (spillWriters.isEmpty()) {
return inMemoryIterator;
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index fc34ad9cff..3131465391 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -100,6 +100,10 @@ public final class UnsafeInMemorySorter {
return pointerArray.length * 8L;
}
+ static long getMemoryRequirementsForPointerArray(long numEntries) {
+ return numEntries * 2L * 8L;
+ }
+
public boolean hasSpaceForAnotherRecord() {
return pointerArrayInsertPosition + 2 < pointerArray.length;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
index 29e9e0f30f..ca1ccedc93 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
@@ -31,6 +31,7 @@ import org.apache.spark.unsafe.PlatformDependent;
*/
final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
+ private final File file;
private InputStream in;
private DataInputStream din;
@@ -48,6 +49,7 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
File file,
BlockId blockId) throws IOException {
assert (file.length() > 0);
+ this.file = file;
final BufferedInputStream bs = new BufferedInputStream(new FileInputStream(file));
this.in = blockManager.wrapForCompression(blockId, bs);
this.din = new DataInputStream(this.in);
@@ -71,6 +73,7 @@ final class UnsafeSorterSpillReader extends UnsafeSorterIterator {
numRecordsRemaining--;
if (numRecordsRemaining == 0) {
in.close();
+ file.delete();
in = null;
din = null;
}
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
index 71eed29563..44cf6c756d 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillWriter.java
@@ -140,6 +140,10 @@ final class UnsafeSorterSpillWriter {
writeBuffer = null;
}
+ public File getFile() {
+ return file;
+ }
+
public UnsafeSorterSpillReader getReader(BlockManager blockManager) throws IOException {
return new UnsafeSorterSpillReader(blockManager, file, blockId);
}
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 70f8ca4d21..dbb7c662d7 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -67,12 +67,11 @@ public abstract class AbstractBytesToBytesMapSuite {
@After
public void tearDown() {
- if (taskMemoryManager != null) {
+ Assert.assertEquals(0L, taskMemoryManager.cleanUpAllAllocatedMemory());
+ if (shuffleMemoryManager != null) {
long leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask();
- Assert.assertEquals(0, taskMemoryManager.cleanUpAllAllocatedMemory());
- Assert.assertEquals(0, leakedShuffleMemory);
shuffleMemoryManager = null;
- taskMemoryManager = null;
+ Assert.assertEquals(0L, leakedShuffleMemory);
}
}
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 0e391b7512..52fa8bcd57 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -20,12 +20,14 @@ package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.LinkedList;
import java.util.UUID;
import scala.Tuple2;
import scala.Tuple2$;
import scala.runtime.AbstractFunction1;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
@@ -33,7 +35,6 @@ import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.*;
-import static org.mockito.AdditionalAnswers.returnsFirstArg;
import static org.mockito.AdditionalAnswers.returnsSecondArg;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
import static org.mockito.Mockito.*;
@@ -53,7 +54,8 @@ import org.apache.spark.util.Utils;
public class UnsafeExternalSorterSuite {
- final TaskMemoryManager memoryManager =
+ final LinkedList<File> spillFilesCreated = new LinkedList<File>();
+ final TaskMemoryManager taskMemoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
@@ -75,13 +77,15 @@ public class UnsafeExternalSorterSuite {
}
};
- @Mock(answer = RETURNS_SMART_NULLS) ShuffleMemoryManager shuffleMemoryManager;
+ ShuffleMemoryManager shuffleMemoryManager;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
File tempDir;
+ private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m");
+
private static final class CompressStream extends AbstractFunction1<OutputStream, OutputStream> {
@Override
public OutputStream apply(OutputStream stream) {
@@ -93,15 +97,17 @@ public class UnsafeExternalSorterSuite {
public void setUp() {
MockitoAnnotations.initMocks(this);
tempDir = new File(Utils.createTempDir$default$1());
+ shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
+ spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
- when(shuffleMemoryManager.tryToAcquire(anyLong())).then(returnsFirstArg());
when(blockManager.diskBlockManager()).thenReturn(diskBlockManager);
when(diskBlockManager.createTempLocalBlock()).thenAnswer(new Answer<Tuple2<TempLocalBlockId, File>>() {
@Override
public Tuple2<TempLocalBlockId, File> answer(InvocationOnMock invocationOnMock) throws Throwable {
TempLocalBlockId blockId = new TempLocalBlockId(UUID.randomUUID());
File file = File.createTempFile("spillFile", ".spill", tempDir);
+ spillFilesCreated.add(file);
return Tuple2$.MODULE$.apply(blockId, file);
}
});
@@ -130,6 +136,24 @@ public class UnsafeExternalSorterSuite {
.then(returnsSecondArg());
}
+ @After
+ public void tearDown() {
+ long leakedUnsafeMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
+ if (shuffleMemoryManager != null) {
+ long leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask();
+ shuffleMemoryManager = null;
+ assertEquals(0L, leakedShuffleMemory);
+ }
+ assertEquals(0, leakedUnsafeMemory);
+ }
+
+ private void assertSpillFilesWereCleanedUp() {
+ for (File spillFile : spillFilesCreated) {
+ assertFalse("Spill file " + spillFile.getPath() + " was not cleaned up",
+ spillFile.exists());
+ }
+ }
+
private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception {
final int[] arr = new int[] { value };
sorter.insertRecord(arr, PlatformDependent.INT_ARRAY_OFFSET, 4, value);
@@ -138,15 +162,15 @@ public class UnsafeExternalSorterSuite {
@Test
public void testSortingOnlyByPrefix() throws Exception {
- final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
- memoryManager,
+ final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
+ taskMemoryManager,
shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
prefixComparator,
- 1024,
- new SparkConf());
+ /* initialSize */ 1024,
+ pageSizeBytes);
insertNumber(sorter, 5);
insertNumber(sorter, 1);
@@ -165,22 +189,22 @@ public class UnsafeExternalSorterSuite {
// TODO: read rest of value.
}
- // TODO: test for cleanup:
- // assert(tempDir.isEmpty)
+ sorter.freeMemory();
+ assertSpillFilesWereCleanedUp();
}
@Test
public void testSortingEmptyArrays() throws Exception {
- final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
- memoryManager,
+ final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
+ taskMemoryManager,
shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
prefixComparator,
- 1024,
- new SparkConf());
+ /* initialSize */ 1024,
+ pageSizeBytes);
sorter.insertRecord(null, 0, 0, 0);
sorter.insertRecord(null, 0, 0, 0);
@@ -197,25 +221,30 @@ public class UnsafeExternalSorterSuite {
assertEquals(0, iter.getKeyPrefix());
assertEquals(0, iter.getRecordLength());
}
+
+ sorter.freeMemory();
+ assertSpillFilesWereCleanedUp();
}
@Test
public void testFillingPage() throws Exception {
- final UnsafeExternalSorter sorter = new UnsafeExternalSorter(
- memoryManager,
+
+ final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
+ taskMemoryManager,
shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
prefixComparator,
- 1024,
- new SparkConf());
+ /* initialSize */ 1024,
+ pageSizeBytes);
byte[] record = new byte[16];
while (sorter.getNumberOfAllocatedPages() < 2) {
sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0);
}
sorter.freeMemory();
+ assertSpillFilesWereCleanedUp();
}
}