diff options
Diffstat (limited to 'core/src/test/java/org/apache')
5 files changed, 42 insertions, 20 deletions
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index 127789b632..ad755529de 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -34,7 +34,8 @@ public class TaskMemoryManagerSuite { Long.MAX_VALUE, 1), 0); - manager.allocatePage(4096, null); // leak memory + final MemoryConsumer c = new TestMemoryConsumer(manager); + manager.allocatePage(4096, c); // leak memory Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask()); Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory()); } @@ -45,7 +46,8 @@ public class TaskMemoryManagerSuite { .set("spark.memory.offHeap.enabled", "true") .set("spark.memory.offHeap.size", "1000"); final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0); - final MemoryBlock dataPage = manager.allocatePage(256, null); + final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP); + final MemoryBlock dataPage = manager.allocatePage(256, c); // In off-heap mode, an offset is an absolute address that may require more than 51 bits to // encode. This test exercises that corner-case: final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10); @@ -58,7 +60,8 @@ public class TaskMemoryManagerSuite { public void encodePageNumberAndOffsetOnHeap() { final TaskMemoryManager manager = new TaskMemoryManager( new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); - final MemoryBlock dataPage = manager.allocatePage(256, null); + final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); + final MemoryBlock dataPage = manager.allocatePage(256, c); final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64); Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress)); Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress)); @@ -107,6 +110,25 @@ public class TaskMemoryManagerSuite { } @Test + public void shouldNotForceSpillingInDifferentModes() { + final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf()); + memoryManager.limit(100); + final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0); + + TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP); + TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP); + c1.use(80); + Assert.assertEquals(80, c1.getUsed()); + c2.use(80); + Assert.assertEquals(20, c2.getUsed()); // not enough memory + Assert.assertEquals(80, c1.getUsed()); // not spilled + + c2.use(10); + Assert.assertEquals(10, c2.getUsed()); // spilled + Assert.assertEquals(80, c1.getUsed()); // not spilled + } + + @Test public void offHeapConfigurationBackwardsCompatibility() { // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251). diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java index e6e16fff80..db91329c94 100644 --- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java +++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java @@ -20,8 +20,11 @@ package org.apache.spark.memory; import java.io.IOException; public class TestMemoryConsumer extends MemoryConsumer { + public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) { + super(memoryManager, 1024L, mode); + } public TestMemoryConsumer(TaskMemoryManager memoryManager) { - super(memoryManager); + this(memoryManager, MemoryMode.ON_HEAP); } @Override @@ -32,19 +35,13 @@ public class TestMemoryConsumer extends MemoryConsumer { } void use(long size) { - long got = taskMemoryManager.acquireExecutionMemory( - size, - taskMemoryManager.tungstenMemoryMode, - this); + long got = taskMemoryManager.acquireExecutionMemory(size, this); used += got; } void free(long size) { used -= size; - taskMemoryManager.releaseExecutionMemory( - size, - taskMemoryManager.tungstenMemoryMode, - this); + taskMemoryManager.releaseExecutionMemory(size, this); } } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java index fe5abc5c23..354efe18db 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java @@ -22,8 +22,7 @@ import java.io.IOException; import org.junit.Test; import org.apache.spark.SparkConf; -import org.apache.spark.memory.TestMemoryManager; -import org.apache.spark.memory.TaskMemoryManager; +import org.apache.spark.memory.*; import org.apache.spark.unsafe.memory.MemoryBlock; import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES; @@ -38,8 +37,9 @@ public class PackedRecordPointerSuite { final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); - final MemoryBlock page0 = memoryManager.allocatePage(128, null); - final MemoryBlock page1 = memoryManager.allocatePage(128, null); + final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP); + final MemoryBlock page0 = memoryManager.allocatePage(128, c); + final MemoryBlock page1 = memoryManager.allocatePage(128, c); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); @@ -59,8 +59,9 @@ public class PackedRecordPointerSuite { .set("spark.memory.offHeap.size", "10000"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); - final MemoryBlock page0 = memoryManager.allocatePage(128, null); - final MemoryBlock page1 = memoryManager.allocatePage(128, null); + final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP); + final MemoryBlock page0 = memoryManager.allocatePage(128, c); + final MemoryBlock page1 = memoryManager.allocatePage(128, c); final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1, page1.getBaseOffset() + 42); PackedRecordPointer packedPointer = new PackedRecordPointer(); diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java index 278a827644..694352ee2a 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java @@ -26,6 +26,7 @@ import org.junit.Test; import org.apache.spark.HashPartitioner; import org.apache.spark.SparkConf; +import org.apache.spark.memory.MemoryConsumer; import org.apache.spark.memory.TaskMemoryManager; import org.apache.spark.memory.TestMemoryConsumer; import org.apache.spark.memory.TestMemoryManager; @@ -71,7 +72,8 @@ public class ShuffleInMemorySorterSuite { final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false"); final TaskMemoryManager memoryManager = new TaskMemoryManager(new TestMemoryManager(conf), 0); - final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); + final MemoryConsumer c = new TestMemoryConsumer(memoryManager); + final MemoryBlock dataPage = memoryManager.allocatePage(2048, c); final Object baseObject = dataPage.getBaseObject(); final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter( consumer, 4, shouldUseRadixSort()); diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java index 4a2f65a0ed..383c5b3b08 100644 --- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java +++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java @@ -78,7 +78,7 @@ public class UnsafeInMemorySorterSuite { final TaskMemoryManager memoryManager = new TaskMemoryManager( new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0); final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager); - final MemoryBlock dataPage = memoryManager.allocatePage(2048, null); + final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer); final Object baseObject = dataPage.getBaseObject(); // Write the records into the data page: long position = dataPage.getBaseOffset(); |