aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java')
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java34
1 files changed, 14 insertions, 20 deletions
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index a5bbaa95fa..94d50b94fd 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -46,20 +46,19 @@ import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
+import org.apache.spark.memory.GrantEverythingMemoryManager;
import org.apache.spark.serializer.SerializerInstance;
-import org.apache.spark.shuffle.ShuffleMemoryManager;
import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
-import org.apache.spark.unsafe.memory.ExecutorMemoryManager;
-import org.apache.spark.unsafe.memory.MemoryAllocator;
-import org.apache.spark.unsafe.memory.TaskMemoryManager;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
public class UnsafeExternalSorterSuite {
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
- final TaskMemoryManager taskMemoryManager =
- new TaskMemoryManager(new ExecutorMemoryManager(MemoryAllocator.HEAP));
+ final GrantEverythingMemoryManager memoryManager =
+ new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
@Override
@@ -82,7 +81,6 @@ public class UnsafeExternalSorterSuite {
SparkConf sparkConf;
File tempDir;
- ShuffleMemoryManager shuffleMemoryManager;
@Mock(answer = RETURNS_SMART_NULLS) BlockManager blockManager;
@Mock(answer = RETURNS_SMART_NULLS) DiskBlockManager diskBlockManager;
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
@@ -102,7 +100,6 @@ public class UnsafeExternalSorterSuite {
MockitoAnnotations.initMocks(this);
sparkConf = new SparkConf();
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
- shuffleMemoryManager = ShuffleMemoryManager.create(Long.MAX_VALUE, pageSizeBytes);
spillFilesCreated.clear();
taskContext = mock(TaskContext.class);
when(taskContext.taskMetrics()).thenReturn(new TaskMetrics());
@@ -143,13 +140,7 @@ public class UnsafeExternalSorterSuite {
@After
public void tearDown() {
try {
- long leakedUnsafeMemory = taskMemoryManager.cleanUpAllAllocatedMemory();
- if (shuffleMemoryManager != null) {
- long leakedShuffleMemory = shuffleMemoryManager.getMemoryConsumptionForThisTask();
- shuffleMemoryManager = null;
- assertEquals(0L, leakedShuffleMemory);
- }
- assertEquals(0, leakedUnsafeMemory);
+ assertEquals(0L, taskMemoryManager.cleanUpAllAllocatedMemory());
} finally {
Utils.deleteRecursively(tempDir);
tempDir = null;
@@ -178,7 +169,6 @@ public class UnsafeExternalSorterSuite {
private UnsafeExternalSorter newSorter() throws IOException {
return UnsafeExternalSorter.create(
taskMemoryManager,
- shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,
@@ -236,12 +226,16 @@ public class UnsafeExternalSorterSuite {
@Test
public void spillingOccursInResponseToMemoryPressure() throws Exception {
- shuffleMemoryManager = ShuffleMemoryManager.create(pageSizeBytes * 2, pageSizeBytes);
final UnsafeExternalSorter sorter = newSorter();
- final int numRecords = (int) pageSizeBytes / 4;
- for (int i = 0; i <= numRecords; i++) {
+ // This should be enough records to completely fill up a data page:
+ final int numRecords = (int) (pageSizeBytes / (4 + 4));
+ for (int i = 0; i < numRecords; i++) {
insertNumber(sorter, numRecords - i);
}
+ assertEquals(1, sorter.getNumberOfAllocatedPages());
+ memoryManager.markExecutionAsOutOfMemory();
+ // The insertion of this record should trigger a spill:
+ insertNumber(sorter, 0);
// Ensure that spill files were created
assertThat(tempDir.listFiles().length, greaterThanOrEqualTo(1));
// Read back the sorted data:
@@ -255,6 +249,7 @@ public class UnsafeExternalSorterSuite {
assertEquals(i, Platform.getInt(iter.getBaseObject(), iter.getBaseOffset()));
i++;
}
+ assertEquals(numRecords + 1, i);
sorter.cleanupResources();
assertSpillFilesWereCleanedUp();
}
@@ -323,7 +318,6 @@ public class UnsafeExternalSorterSuite {
final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
final UnsafeExternalSorter sorter = UnsafeExternalSorter.create(
taskMemoryManager,
- shuffleMemoryManager,
blockManager,
taskContext,
recordComparator,