aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java')
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java97
1 files changed, 69 insertions, 28 deletions
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index 94d50b94fd..cfead0e592 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -36,28 +36,29 @@ import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
-import static org.hamcrest.Matchers.greaterThanOrEqualTo;
-import static org.junit.Assert.*;
-import static org.mockito.AdditionalAnswers.returnsSecondArg;
-import static org.mockito.Answers.RETURNS_SMART_NULLS;
-import static org.mockito.Mockito.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
-import org.apache.spark.memory.GrantEverythingMemoryManager;
+import org.apache.spark.memory.TestMemoryManager;
+import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.serializer.SerializerInstance;
import org.apache.spark.storage.*;
import org.apache.spark.unsafe.Platform;
-import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.util.Utils;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.junit.Assert.*;
+import static org.mockito.AdditionalAnswers.returnsSecondArg;
+import static org.mockito.Answers.RETURNS_SMART_NULLS;
+import static org.mockito.Mockito.*;
+
public class UnsafeExternalSorterSuite {
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
- final GrantEverythingMemoryManager memoryManager =
- new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ final TestMemoryManager memoryManager =
+ new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
@@ -86,7 +87,7 @@ public class UnsafeExternalSorterSuite {
@Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext;
- private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m");
+ private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m");
private static final class CompressStream extends AbstractFunction1<OutputStream, OutputStream> {
@Override
@@ -233,7 +234,7 @@ public class UnsafeExternalSorterSuite {
insertNumber(sorter, numRecords - i);
}
assertEquals(1, sorter.getNumberOfAllocatedPages());
- memoryManager.markExecutionAsOutOfMemory();
+ memoryManager.markExecutionAsOutOfMemoryOnce();
// The insertion of this record should trigger a spill:
insertNumber(sorter, 0);
// Ensure that spill files were created
@@ -312,6 +313,62 @@ public class UnsafeExternalSorterSuite {
}
@Test
+ public void forcedSpillingWithReadIterator() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long[] record = new long[100];
+ int recordSize = record.length * 8;
+ int n = (int) pageSizeBytes / recordSize * 3;
+ for (int i = 0; i < n; i++) {
+ record[0] = (long) i;
+ sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0);
+ }
+ assert(sorter.getNumberOfAllocatedPages() >= 2);
+ UnsafeExternalSorter.SpillableIterator iter =
+ (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+ int lastv = 0;
+ for (int i = 0; i < n / 3; i++) {
+ iter.hasNext();
+ iter.loadNext();
+ assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ lastv = i;
+ }
+ assert(iter.spill() > 0);
+ assert(iter.spill() == 0);
+ assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == lastv);
+ for (int i = n / 3; i < n; i++) {
+ iter.hasNext();
+ iter.loadNext();
+ assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ }
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
+ public void forcedSpillingWithNotReadIterator() throws Exception {
+ final UnsafeExternalSorter sorter = newSorter();
+ long[] record = new long[100];
+ int recordSize = record.length * 8;
+ int n = (int) pageSizeBytes / recordSize * 3;
+ for (int i = 0; i < n; i++) {
+ record[0] = (long) i;
+ sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0);
+ }
+ assert(sorter.getNumberOfAllocatedPages() >= 2);
+ UnsafeExternalSorter.SpillableIterator iter =
+ (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator();
+ assert(iter.spill() > 0);
+ assert(iter.spill() == 0);
+ for (int i = 0; i < n; i++) {
+ iter.hasNext();
+ iter.loadNext();
+ assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i);
+ }
+ sorter.cleanupResources();
+ assertSpillFilesWereCleanedUp();
+ }
+
+ @Test
public void testPeakMemoryUsed() throws Exception {
final long recordLengthBytes = 8;
final long pageSizeBytes = 256;
@@ -334,7 +391,7 @@ public class UnsafeExternalSorterSuite {
insertNumber(sorter, i);
newPeakMemory = sorter.getPeakMemoryUsedBytes();
// The first page is pre-allocated on instantiation
- if (i % numRecordsPerPage == 0 && i > 0) {
+ if (i % numRecordsPerPage == 0) {
// We allocated a new page for this record, so peak memory should change
assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
} else {
@@ -358,21 +415,5 @@ public class UnsafeExternalSorterSuite {
}
}
- @Test
- public void testReservePageOnInstantiation() throws Exception {
- final UnsafeExternalSorter sorter = newSorter();
- try {
- assertEquals(1, sorter.getNumberOfAllocatedPages());
- // Inserting a new record doesn't allocate more memory since we already have a page
- long peakMemory = sorter.getPeakMemoryUsedBytes();
- insertNumber(sorter, 100);
- assertEquals(peakMemory, sorter.getPeakMemoryUsedBytes());
- assertEquals(1, sorter.getNumberOfAllocatedPages());
- } finally {
- sorter.cleanupResources();
- assertSpillFilesWereCleanedUp();
- }
- }
-
}