aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java')
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java54
1 files changed, 54 insertions, 0 deletions
diff --git a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
index 04fc09b323..98c32bbc29 100644
--- a/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriterSuite.java
@@ -190,6 +190,7 @@ public class UnsafeShuffleWriterSuite {
});
when(taskContext.taskMetrics()).thenReturn(taskMetrics);
+ when(taskContext.internalMetricsToAccumulators()).thenReturn(null);
when(shuffleDep.serializer()).thenReturn(Option.<Serializer>apply(serializer));
when(shuffleDep.partitioner()).thenReturn(hashPartitioner);
@@ -542,4 +543,57 @@ public class UnsafeShuffleWriterSuite {
writer.stop(false);
assertSpillFilesWereCleanedUp();
}
+
+ @Test
+ public void testPeakMemoryUsed() throws Exception {
+ final long recordLengthBytes = 8;
+ final long pageSizeBytes = 256;
+ final long numRecordsPerPage = pageSizeBytes / recordLengthBytes;
+ final SparkConf conf = new SparkConf().set("spark.buffer.pageSize", pageSizeBytes + "b");
+ final UnsafeShuffleWriter<Object, Object> writer =
+ new UnsafeShuffleWriter<Object, Object>(
+ blockManager,
+ shuffleBlockResolver,
+ taskMemoryManager,
+ shuffleMemoryManager,
+ new UnsafeShuffleHandle<Object, Object>(0, 1, shuffleDep),
+ 0, // map id
+ taskContext,
+ conf);
+
+ // Peak memory should be monotonically increasing. More specifically, every time
+ // we allocate a new page it should increase by exactly the size of the page.
+ long previousPeakMemory = writer.getPeakMemoryUsedBytes();
+ long newPeakMemory;
+ try {
+ for (int i = 0; i < numRecordsPerPage * 10; i++) {
+ writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
+ newPeakMemory = writer.getPeakMemoryUsedBytes();
+ if (i % numRecordsPerPage == 0) {
+ // We allocated a new page for this record, so peak memory should change
+ assertEquals(previousPeakMemory + pageSizeBytes, newPeakMemory);
+ } else {
+ assertEquals(previousPeakMemory, newPeakMemory);
+ }
+ previousPeakMemory = newPeakMemory;
+ }
+
+ // Spilling should not change peak memory
+ writer.forceSorterToSpill();
+ newPeakMemory = writer.getPeakMemoryUsedBytes();
+ assertEquals(previousPeakMemory, newPeakMemory);
+ for (int i = 0; i < numRecordsPerPage; i++) {
+ writer.insertRecordIntoSorter(new Tuple2<Object, Object>(1, 1));
+ }
+ newPeakMemory = writer.getPeakMemoryUsedBytes();
+ assertEquals(previousPeakMemory, newPeakMemory);
+
+ // Closing the writer should not change peak memory
+ writer.closeAndWriteOutput();
+ newPeakMemory = writer.getPeakMemoryUsedBytes();
+ assertEquals(previousPeakMemory, newPeakMemory);
+ } finally {
+ writer.stop(false);
+ }
+ }
}