aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-08-04 14:42:11 -0700
committerReynold Xin <rxin@databricks.com>2015-08-04 14:42:11 -0700
commitab8ee1a3b93286a62949569615086ef5030e9fae (patch)
tree88aa364451320f2a303bbeeb4857bcba57896c84 /core
parentf4b1ac08a1327e6d0ddc317cdf3997a0f68dec72 (diff)
downloadspark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.gz
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.tar.bz2
spark-ab8ee1a3b93286a62949569615086ef5030e9fae.zip
[SPARK-9452] [SQL] Support records larger than page size in UnsafeExternalSorter
This patch extends UnsafeExternalSorter to support records larger than the page size. The basic strategy is the same as in #7762: store large records in their own overflow pages. Author: Josh Rosen <joshrosen@databricks.com> Closes #7891 from JoshRosen/large-records-in-sql-sorter and squashes the following commits: 967580b [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 948c344 [Josh Rosen] Add large records tests for KV sorter. 3c17288 [Josh Rosen] Combine memory and disk cleanup into general cleanupResources() method 380f217 [Josh Rosen] Merge remote-tracking branch 'origin/master' into large-records-in-sql-sorter 27eafa0 [Josh Rosen] Fix page size in PackedRecordPointerSuite a49baef [Josh Rosen] Address initial round of review comments 3edb931 [Josh Rosen] Remove accidentally-committed debug statements. 2b164e2 [Josh Rosen] Support large records in UnsafeExternalSorter.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java173
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java8
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java129
3 files changed, 223 insertions, 87 deletions
diff --git a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index dec7fcfa0d..e6ddd08e5f 100644
--- a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -34,6 +34,7 @@ import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.BlockManager;
+import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.PlatformDependent;
import org.apache.spark.unsafe.memory.MemoryBlock;
import org.apache.spark.unsafe.memory.TaskMemoryManager;
@@ -143,8 +144,7 @@ public final class UnsafeExternalSorter {
taskContext.addOnCompleteCallback(new AbstractFunction0<BoxedUnit>() {
@Override
public BoxedUnit apply() {
- deleteSpillFiles();
- freeMemory();
+ cleanupResources();
return null;
}
});
@@ -249,7 +249,7 @@ public final class UnsafeExternalSorter {
*
* @return the number of bytes freed.
*/
- public long freeMemory() {
+ private long freeMemory() {
updatePeakMemoryUsed();
long memoryFreed = 0;
for (MemoryBlock block : allocatedPages) {
@@ -275,44 +275,32 @@ public final class UnsafeExternalSorter {
/**
* Deletes any spill files created by this sorter.
*/
- public void deleteSpillFiles() {
+ private void deleteSpillFiles() {
for (UnsafeSorterSpillWriter spill : spillWriters) {
File file = spill.getFile();
if (file != null && file.exists()) {
if (!file.delete()) {
logger.error("Was unable to delete spill file {}", file.getAbsolutePath());
- };
+ }
}
}
}
/**
- * Checks whether there is enough space to insert a new record into the sorter.
- *
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
- * the record size.
-
- * @return true if the record can be inserted without requiring more allocations, false otherwise.
+ * Frees this sorter's in-memory data structures and cleans up its spill files.
*/
- private boolean haveSpaceForRecord(int requiredSpace) {
- assert(requiredSpace > 0);
- assert(inMemSorter != null);
- return (inMemSorter.hasSpaceForAnotherRecord() && (requiredSpace <= freeSpaceInCurrentPage));
+ public void cleanupResources() {
+ deleteSpillFiles();
+ freeMemory();
}
/**
- * Allocates more memory in order to insert an additional record. This will request additional
- * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
- * obtained.
- *
- * @param requiredSpace the required space in the data page, in bytes, including space for storing
- * the record size.
+ * Checks whether there is enough space to insert an additional record in to the sort pointer
+ * array and grows the array if additional space is required. If the required space cannot be
+ * obtained, then the in-memory data will be spilled to disk.
*/
- private void allocateSpaceForRecord(int requiredSpace) throws IOException {
+ private void growPointerArrayIfNecessary() throws IOException {
assert(inMemSorter != null);
- // TODO: merge these steps to first calculate total memory requirements for this insert,
- // then try to acquire; no point in acquiring sort buffer only to spill due to no space in the
- // data page.
if (!inMemSorter.hasSpaceForAnotherRecord()) {
logger.debug("Attempting to expand sort pointer array");
final long oldPointerArrayMemoryUsage = inMemSorter.getMemoryUsage();
@@ -326,7 +314,20 @@ public final class UnsafeExternalSorter {
shuffleMemoryManager.release(oldPointerArrayMemoryUsage);
}
}
+ }
+ /**
+ * Allocates more memory in order to insert an additional record. This will request additional
+ * memory from the {@link ShuffleMemoryManager} and spill if the requested memory can not be
+ * obtained.
+ *
+ * @param requiredSpace the required space in the data page, in bytes, including space for storing
+ * the record size. This must be less than or equal to the page size (records
+ * that exceed the page size are handled via a different code path which uses
+ * special overflow pages).
+ */
+ private void acquireNewPageIfNecessary(int requiredSpace) throws IOException {
+ assert (requiredSpace <= pageSizeBytes);
if (requiredSpace > freeSpaceInCurrentPage) {
logger.trace("Required space {} is less than free space in current page ({})", requiredSpace,
freeSpaceInCurrentPage);
@@ -339,9 +340,7 @@ public final class UnsafeExternalSorter {
} else {
final long memoryAcquired = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquired < pageSizeBytes) {
- if (memoryAcquired > 0) {
- shuffleMemoryManager.release(memoryAcquired);
- }
+ shuffleMemoryManager.release(memoryAcquired);
spill();
final long memoryAcquiredAfterSpilling = shuffleMemoryManager.tryToAcquire(pageSizeBytes);
if (memoryAcquiredAfterSpilling != pageSizeBytes) {
@@ -365,26 +364,59 @@ public final class UnsafeExternalSorter {
long recordBaseOffset,
int lengthInBytes,
long prefix) throws IOException {
+
+ growPointerArrayIfNecessary();
// Need 4 bytes to store the record length.
final int totalSpaceRequired = lengthInBytes + 4;
- if (!haveSpaceForRecord(totalSpaceRequired)) {
- allocateSpaceForRecord(totalSpaceRequired);
+
+ // --- Figure out where to insert the new record ----------------------------------------------
+
+ final MemoryBlock dataPage;
+ long dataPagePosition;
+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+ if (useOverflowPage) {
+ long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+ // The record is larger than the page size, so allocate a special overflow page just to hold
+ // that record.
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGranted != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGranted);
+ spill();
+ final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGrantedAfterSpill != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGrantedAfterSpill);
+ throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+ }
+ }
+ MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+ allocatedPages.add(overflowPage);
+ dataPage = overflowPage;
+ dataPagePosition = overflowPage.getBaseOffset();
+ } else {
+ // The record is small enough to fit in a regular data page, but the current page might not
+ // have enough space to hold it (or no pages have been allocated yet).
+ acquireNewPageIfNecessary(totalSpaceRequired);
+ dataPage = currentPage;
+ dataPagePosition = currentPagePosition;
+ // Update bookkeeping information
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ currentPagePosition += totalSpaceRequired;
}
- assert(inMemSorter != null);
+ final Object dataPageBaseObject = dataPage.getBaseObject();
+
+ // --- Insert the record ----------------------------------------------------------------------
final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
- final Object dataPageBaseObject = currentPage.getBaseObject();
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, lengthInBytes);
- currentPagePosition += 4;
+ taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, lengthInBytes);
+ dataPagePosition += 4;
PlatformDependent.copyMemory(
recordBaseObject,
recordBaseOffset,
dataPageBaseObject,
- currentPagePosition,
+ dataPagePosition,
lengthInBytes);
- currentPagePosition += lengthInBytes;
- freeSpaceInCurrentPage -= totalSpaceRequired;
+ assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
@@ -399,33 +431,70 @@ public final class UnsafeExternalSorter {
public void insertKVRecord(
Object keyBaseObj, long keyOffset, int keyLen,
Object valueBaseObj, long valueOffset, int valueLen, long prefix) throws IOException {
+
+ growPointerArrayIfNecessary();
final int totalSpaceRequired = keyLen + valueLen + 4 + 4;
- if (!haveSpaceForRecord(totalSpaceRequired)) {
- allocateSpaceForRecord(totalSpaceRequired);
+
+ // --- Figure out where to insert the new record ----------------------------------------------
+
+ final MemoryBlock dataPage;
+ long dataPagePosition;
+ boolean useOverflowPage = totalSpaceRequired > pageSizeBytes;
+ if (useOverflowPage) {
+ long overflowPageSize = ByteArrayMethods.roundNumberOfBytesToNearestWord(totalSpaceRequired);
+ // The record is larger than the page size, so allocate a special overflow page just to hold
+ // that record.
+ final long memoryGranted = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGranted != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGranted);
+ spill();
+ final long memoryGrantedAfterSpill = shuffleMemoryManager.tryToAcquire(overflowPageSize);
+ if (memoryGrantedAfterSpill != overflowPageSize) {
+ shuffleMemoryManager.release(memoryGrantedAfterSpill);
+ throw new IOException("Unable to acquire " + overflowPageSize + " bytes of memory");
+ }
+ }
+ MemoryBlock overflowPage = taskMemoryManager.allocatePage(overflowPageSize);
+ allocatedPages.add(overflowPage);
+ dataPage = overflowPage;
+ dataPagePosition = overflowPage.getBaseOffset();
+ } else {
+ // The record is small enough to fit in a regular data page, but the current page might not
+ // have enough space to hold it (or no pages have been allocated yet).
+ acquireNewPageIfNecessary(totalSpaceRequired);
+ dataPage = currentPage;
+ dataPagePosition = currentPagePosition;
+ // Update bookkeeping information
+ freeSpaceInCurrentPage -= totalSpaceRequired;
+ currentPagePosition += totalSpaceRequired;
}
- assert(inMemSorter != null);
+ final Object dataPageBaseObject = dataPage.getBaseObject();
+
+ // --- Insert the record ----------------------------------------------------------------------
final long recordAddress =
- taskMemoryManager.encodePageNumberAndOffset(currentPage, currentPagePosition);
- final Object dataPageBaseObject = currentPage.getBaseObject();
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen + valueLen + 4);
- currentPagePosition += 4;
+ taskMemoryManager.encodePageNumberAndOffset(dataPage, dataPagePosition);
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen + valueLen + 4);
+ dataPagePosition += 4;
- PlatformDependent.UNSAFE.putInt(dataPageBaseObject, currentPagePosition, keyLen);
- currentPagePosition += 4;
+ PlatformDependent.UNSAFE.putInt(dataPageBaseObject, dataPagePosition, keyLen);
+ dataPagePosition += 4;
PlatformDependent.copyMemory(
- keyBaseObj, keyOffset, dataPageBaseObject, currentPagePosition, keyLen);
- currentPagePosition += keyLen;
+ keyBaseObj, keyOffset, dataPageBaseObject, dataPagePosition, keyLen);
+ dataPagePosition += keyLen;
PlatformDependent.copyMemory(
- valueBaseObj, valueOffset, dataPageBaseObject, currentPagePosition, valueLen);
- currentPagePosition += valueLen;
+ valueBaseObj, valueOffset, dataPageBaseObject, dataPagePosition, valueLen);
- freeSpaceInCurrentPage -= totalSpaceRequired;
+ assert(inMemSorter != null);
inMemSorter.insertRecord(recordAddress, prefix);
}
+ /**
+ * Returns a sorted iterator. It is the caller's responsibility to call `cleanupResources()`
+ * after consuming this iterator.
+ */
public UnsafeSorterIterator getSortedIterator() throws IOException {
assert(inMemSorter != null);
final UnsafeInMemorySorter.SortedIterator inMemoryIterator = inMemSorter.getSortedIterator();
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
index db9e827590..934b7e0305 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/PackedRecordPointerSuite.java
@@ -32,8 +32,8 @@ public class PackedRecordPointerSuite {
public void heap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
- final MemoryBlock page0 = memoryManager.allocatePage(100);
- final MemoryBlock page1 = memoryManager.allocatePage(100);
+ final MemoryBlock page0 = memoryManager.allocatePage(128);
+ final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
@@ -50,8 +50,8 @@ public class PackedRecordPointerSuite {
public void offHeap() {
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.UNSAFE));
- final MemoryBlock page0 = memoryManager.allocatePage(100);
- final MemoryBlock page1 = memoryManager.allocatePage(100);
+ final MemoryBlock page0 = memoryManager.allocatePage(128);
+ final MemoryBlock page1 = memoryManager.allocatePage(128);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index c11949d57a..968185bde7 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -18,8 +18,10 @@
package org.apache.spark.util.collection.unsafe.sort;
import java.io.File;
+import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
import java.util.LinkedList;
import java.util.UUID;
@@ -34,6 +36,7 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.Assert.*;
import static org.mockito.AdditionalAnswers.returnsSecondArg;
import static org.mockito.Answers.RETURNS_SMART_NULLS;
@@ -77,12 +80,13 @@ public class UnsafeExternalSorterSuite {
}
};
+ SparkConf sparkConf;
+ File tempDir;
ShuffleMemoryManager shuffleMemoryManager;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
- File tempDir;
private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m");
@@ -96,6 +100,7 @@ public class UnsafeExternalSorterSuite {
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ sparkConf = new SparkConf();
tempDir = new File(Utils.createTempDir$default$1());
shuffleMemoryManager = new ShuffleMemoryManager(Long.MAX_VALUE);
spillFilesCreated.clear();
@@ -155,14 +160,19 @@ public class UnsafeExternalSorterSuite {
}
private static void insertNumber(UnsafeExternalSorter sorter, int value) throws Exception {
- final int[] arr = new int[] { value };
+ final int[] arr = new int[]{ value };
sorter.insertRecord(arr, PlatformDependent.INT_ARRAY_OFFSET, 4, value);
}
- @Test
- public void testSortingOnlyByPrefix() throws Exception {
+ private static void insertRecord(
+ UnsafeExternalSorter sorter,
+ int[] record,
+ long prefix) throws IOException {
+ sorter.insertRecord(record, PlatformDependent.INT_ARRAY_OFFSET, record.length * 4, prefix);
+ }
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
+ private UnsafeExternalSorter newSorter() throws IOException {
+ return UnsafeExternalSorter.create(
taskMemoryManager,
shuffleMemoryManager,
blockManager,
@@ -171,7 +181,11 @@ public class UnsafeExternalSorterSuite {
prefixComparator,
/* initialSize */ 1024,
pageSizeBytes);
+ }
+ @Test
+ public void testSortingOnlyByPrefix() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
insertNumber(sorter, 5);
insertNumber(sorter, 1);
insertNumber(sorter, 3);
@@ -186,26 +200,16 @@ public class UnsafeExternalSorterSuite {
iter.loadNext();
assertEquals(i, iter.getKeyPrefix());
assertEquals(4, iter.getRecordLength());
- // TODO: read rest of value.
+ assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
}
- sorter.freeMemory();
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@Test
public void testSortingEmptyArrays() throws Exception {
-
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
- taskMemoryManager,
- shuffleMemoryManager,
- blockManager,
- taskContext,
- recordComparator,
- prefixComparator,
- /* initialSize */ 1024,
- pageSizeBytes);
-
+ final UnsafeExternalSorter sorter = newSorter();
sorter.insertRecord(null, 0, 0, 0);
sorter.insertRecord(null, 0, 0, 0);
sorter.spill();
@@ -222,28 +226,89 @@ public class UnsafeExternalSorterSuite {
assertEquals(0, iter.getRecordLength());
}
- sorter.freeMemory();
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@Test
- public void testFillingPage() throws Exception {
+ public void spillingOccursInResponseToMemoryPressure() throws Exception {
+ shuffleMemoryManager = new ShuffleMemoryManager(pageSizeBytes * 2);
+ final UnsafeExternalSorter sorter = newSorter();
+ final int numRecords = 100000;
+ for (int i = 0; i <= numRecords; i++) {
+ insertNumber(sorter, numRecords - i);
+ }
+ // Ensure that spill files were created
+ assertThat(tempDir.listFiles().length, greaterThanOrEqualTo(1));
+ // Read back the sorted data:
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
- final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
- taskMemoryManager,
- shuffleMemoryManager,
- blockManager,
- taskContext,
- recordComparator,
- prefixComparator,
- /* initialSize */ 1024,
- pageSizeBytes);
+ int i = 0;
+ while (iter.hasNext()) {
+ iter.loadNext();
+ assertEquals(i, iter.getKeyPrefix());
+ assertEquals(4, iter.getRecordLength());
+ assertEquals(i, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ i++;
+ }
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+ @Test
+ public void testFillingPage() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
byte[] record = new byte[16];
while (sorter.getNumberOfAllocatedPages() < 2) {
sorter.insertRecord(record, PlatformDependent.BYTE_ARRAY_OFFSET, record.length, 0);
}
- sorter.freeMemory();
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
+ public void sortingRecordsThatExceedPageSize() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ final int[] largeRecord = new int[(int) pageSizeBytes + 16];
+ Arrays.fill(largeRecord, 456);
+ final int[] smallRecord = new int[100];
+ Arrays.fill(smallRecord, 123);
+
+ insertRecord(sorter, largeRecord, 456);
+ sorter.spill();
+ insertRecord(sorter, smallRecord, 123);
+ sorter.spill();
+ insertRecord(sorter, smallRecord, 123);
+ insertRecord(sorter, largeRecord, 456);
+
+ UnsafeSorterIterator iter = sorter.getSortedIterator();
+ // Small record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(123, iter.getKeyPrefix());
+ assertEquals(smallRecord.length * 4, iter.getRecordLength());
+ assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Small record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(123, iter.getKeyPrefix());
+ assertEquals(smallRecord.length * 4, iter.getRecordLength());
+ assertEquals(123, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Large record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(456, iter.getKeyPrefix());
+ assertEquals(largeRecord.length * 4, iter.getRecordLength());
+ assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+ // Large record
+ assertTrue(iter.hasNext());
+ iter.loadNext();
+ assertEquals(456, iter.getKeyPrefix());
+ assertEquals(largeRecord.length * 4, iter.getRecordLength());
+ assertEquals(456, PlatformDependent.UNSAFE.getInt(iter.getBaseObject(), iter.getBaseOffset()));
+
+ assertFalse(iter.hasNext());
+ sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@@ -289,8 +354,10 @@ public class UnsafeExternalSorterSuite {
newPeakMemory = sorter.getPeakMemoryUsedBytes();
assertEquals(previousPeakMemory, newPeakMemory);
} finally {
- sorter.freeMemory();
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
}
}
}
+