aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java/org/apache
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test/java/org/apache')
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java28
-rw-r--r--core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java15
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java13
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java2
5 files changed, 42 insertions, 20 deletions
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 127789b632..ad755529de 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -34,7 +34,8 @@ public class TaskMemoryManagerSuite {
Long.MAX_VALUE,
1),
0);
- manager.allocatePage(4096, null); // leak memory
+ final MemoryConsumer c = new TestMemoryConsumer(manager);
+ manager.allocatePage(4096, c); // leak memory
Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
}
@@ -45,7 +46,8 @@ public class TaskMemoryManagerSuite {
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock dataPage = manager.allocatePage(256, null);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
// encode. This test exercises that corner-case:
final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
@@ -58,7 +60,8 @@ public class TaskMemoryManagerSuite {
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
- final MemoryBlock dataPage = manager.allocatePage(256, null);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
@@ -107,6 +110,25 @@ public class TaskMemoryManagerSuite {
}
@Test
+ public void shouldNotForceSpillingInDifferentModes() {
+ final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
+ memoryManager.limit(100);
+ final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
+
+ TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
+ c1.use(80);
+ Assert.assertEquals(80, c1.getUsed());
+ c2.use(80);
+ Assert.assertEquals(20, c2.getUsed()); // not enough memory
+ Assert.assertEquals(80, c1.getUsed()); // not spilled
+
+ c2.use(10);
+ Assert.assertEquals(10, c2.getUsed()); // spilled
+ Assert.assertEquals(80, c1.getUsed()); // not spilled
+ }
+
+ @Test
public void offHeapConfigurationBackwardsCompatibility() {
// Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
// was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index e6e16fff80..db91329c94 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -20,8 +20,11 @@ package org.apache.spark.memory;
import java.io.IOException;
public class TestMemoryConsumer extends MemoryConsumer {
+ public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) {
+ super(memoryManager, 1024L, mode);
+ }
public TestMemoryConsumer(TaskMemoryManager memoryManager) {
- super(memoryManager);
+ this(memoryManager, MemoryMode.ON_HEAP);
}
@Override
@@ -32,19 +35,13 @@ public class TestMemoryConsumer extends MemoryConsumer {
}
void use(long size) {
- long got = taskMemoryManager.acquireExecutionMemory(
- size,
- taskMemoryManager.tungstenMemoryMode,
- this);
+ long got = taskMemoryManager.acquireExecutionMemory(size, this);
used += got;
}
void free(long size) {
used -= size;
- taskMemoryManager.releaseExecutionMemory(
- size,
- taskMemoryManager.tungstenMemoryMode,
- this);
+ taskMemoryManager.releaseExecutionMemory(size, this);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
index fe5abc5c23..354efe18db 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
@@ -22,8 +22,7 @@ import java.io.IOException;
import org.junit.Test;
import org.apache.spark.SparkConf;
-import org.apache.spark.memory.TestMemoryManager;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.*;
import org.apache.spark.unsafe.memory.MemoryBlock;
import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@@ -38,8 +37,9 @@ public class PackedRecordPointerSuite {
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock page0 = memoryManager.allocatePage(128, null);
- final MemoryBlock page1 = memoryManager.allocatePage(128, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP);
+ final MemoryBlock page0 = memoryManager.allocatePage(128, c);
+ final MemoryBlock page1 = memoryManager.allocatePage(128, c);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
@@ -59,8 +59,9 @@ public class PackedRecordPointerSuite {
.set("spark.memory.offHeap.size", "10000");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock page0 = memoryManager.allocatePage(128, null);
- final MemoryBlock page1 = memoryManager.allocatePage(128, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP);
+ final MemoryBlock page0 = memoryManager.allocatePage(128, c);
+ final MemoryBlock page1 = memoryManager.allocatePage(128, c);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 278a827644..694352ee2a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
+import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
@@ -71,7 +72,8 @@ public class ShuffleInMemorySorterSuite {
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager);
+ final MemoryBlock dataPage = memoryManager.allocatePage(2048, c);
final Object baseObject = dataPage.getBaseObject();
final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
consumer, 4, shouldUseRadixSort());
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 4a2f65a0ed..383c5b3b08 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -78,7 +78,7 @@ public class UnsafeInMemorySorterSuite {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
+ final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
final Object baseObject = dataPage.getBaseObject();
// Write the records into the data page:
long position = dataPage.getBaseOffset();