diff options
Diffstat (limited to 'core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java')
-rw-r--r-- | core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java | 97 |
1 files changed, 69 insertions, 28 deletions
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java index 94d50b94fd..cfead0e592 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java @@ -36,28 +36,29 @@ import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.Assert.*; -import static org.mockito.AdditionalAnswers.returnsSecondArg; -import static org.mockito.Answers.RETURNS_SMART_NULLS; -import static org.mockito.Mockito.*; import org.apache.spark.SparkConf; import org.apache.spark.TaskContext; import org.apache.spark.executor.ShuffleWriteMetrics; import org.apache.spark.executor.TaskMetrics; -import org.apache.spark.memory.GrantEverythingMemoryManager; +import org.apache.spark.memory.TestMemoryManager; +import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.serializer.SerializerInstance; import org.apache.spark.storage.*; import org.apache.spark.unsafe.Platform; -import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.util.Utils; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.Assert.*; +import static org.mockito.AdditionalAnswers.returnsSecondArg; +import static org.mockito.Answers.RETURNS_SMART_NULLS; +import static org.mockito.Mockito.*; + public class UnsafeExternalSorterSuite { final LinkedList<File> spillFilesCreated = new LinkedList<File>(); - final GrantEverythingMemoryManager memoryManager = - new GrantEverythingMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")); + final TestMemoryManager memoryManager = + new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")); final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0); // Use integer comparison for comparing prefixes (which are partition ids, in this case) final PrefixComparator prefixComparator = new PrefixComparator() { @@ -86,7 +87,7 @@ public class UnsafeExternalSorterSuite { @Mock(answer = RETURNS_SMART_NULLS) TaskContext taskContext; - private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "64m"); + private final long pageSizeBytes = new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "4m"); private static final class CompressStream extends AbstractFunction1<OutputStream, OutputStream> { @Override @@ -233,7 +234,7 @@ public class UnsafeExternalSorterSuite { insertNumber(sorter, numRecords - i); } assertEquals(1, sorter.getNumberOfAllocatedPages()); - memoryManager.markExecutionAsOutOfMemory(); + memoryManager.markExecutionAsOutOfMemoryOnce(); // The insertion of this record should trigger a spill: insertNumber(sorter, 0); // Ensure that spill files were created @@ -312,6 +313,62 @@ public class UnsafeExternalSorterSuite { } @Test + public void forcedSpillingWithReadIterator() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + long[] record = new long[100]; + int recordSize = record.length * 8; + int n = (int) pageSizeBytes / recordSize * 3; + for (int i = 0; i < n; i++) { + record[0] = (long) i; + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0); + } + assert(sorter.getNumberOfAllocatedPages() >= 2); + UnsafeExternalSorter.SpillableIterator iter = + (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator(); + int lastv = 0; + for (int i = 0; i < n / 3; i++) { + iter.hasNext(); + iter.loadNext(); + assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i); + lastv = i; + } + assert(iter.spill() > 0); + assert(iter.spill() == 0); + assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == lastv); + for (int i = n / 3; i < n; i++) { + iter.hasNext(); + iter.loadNext(); + assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i); + } + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + + @Test + public void forcedSpillingWithNotReadIterator() throws Exception { + final UnsafeExternalSorter sorter = newSorter(); + long[] record = new long[100]; + int recordSize = record.length * 8; + int n = (int) pageSizeBytes / recordSize * 3; + for (int i = 0; i < n; i++) { + record[0] = (long) i; + sorter.insertRecord(record, Platform.LONG_ARRAY_OFFSET, recordSize, 0); + } + assert(sorter.getNumberOfAllocatedPages() >= 2); + UnsafeExternalSorter.SpillableIterator iter = + (UnsafeExternalSorter.SpillableIterator) sorter.getSortedIterator(); + assert(iter.spill() > 0); + assert(iter.spill() == 0); + for (int i = 0; i < n; i++) { + iter.hasNext(); + iter.loadNext(); + assert(Platform.getLong(iter.getBaseObject(), iter.getBaseOffset()) == i); + } + sorter.cleanupResources(); + assertSpillFilesWereCleanedUp(); + } + + @Test public void testPeakMemoryUsed() throws Exception { final long recordLengthBytes = 8; final long pageSizeBytes = 256; @@ -334,7 +391,7 @@ public class UnsafeExternalSorterSuite { insertNumber(sorter, i); newPeakMemory = sorter.getPeakMemoryUsedBytes(); // The first page is pre-allocated on instantiation - if (i % numRecordsPerPage == 0 && i > 0) { + if (i % numRecordsPerPage == 0) { // We allocated a new page for this record, so peak memory should change assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory); } else { @@ -358,21 +415,5 @@ public class UnsafeExternalSorterSuite { } } - @Test - public void testReservePageOnInstantiation() throws Exception { - final UnsafeExternalSorter sorter = newSorter(); - try { - assertEquals(1, sorter.getNumberOfAllocatedPages()); - // Inserting a new record doesn't allocate more memory since we already have a page - long peakMemory = sorter.getPeakMemoryUsedBytes(); - insertNumber(sorter, 100); - assertEquals(peakMemory, sorter.getPeakMemoryUsedBytes()); - assertEquals(1, sorter.getNumberOfAllocatedPages()); - } finally { - sorter.cleanupResources(); - assertSpillFilesWereCleanedUp(); - } - } - } |