aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/java
diff options
context:
space:
mode:
authorDavies Liu <davies@databricks.com>2016-05-18 09:44:21 -0700
committerDavies Liu <davies.liu@gmail.com>2016-05-18 09:44:21 -0700
commit8fb1d1c7f3ed1b62625052a532b7388ebec71bbf (patch)
tree76c973c4bc76359f29ba1b22aede20808eaf469d /core/src/test/java
parentc1fd9cacba8e602cc55a893f0beb05ea48c3a1f6 (diff)
downloadspark-8fb1d1c7f3ed1b62625052a532b7388ebec71bbf.tar.gz
spark-8fb1d1c7f3ed1b62625052a532b7388ebec71bbf.tar.bz2
spark-8fb1d1c7f3ed1b62625052a532b7388ebec71bbf.zip
[SPARK-15357] Cooperative spilling should check consumer memory mode
## What changes were proposed in this pull request? Since we support forced spilling for Spillable, which only works in OnHeap mode, different from other SQL operators (could be OnHeap or OffHeap), we should considering the mode of consumer before calling trigger forced spilling. ## How was this patch tested? Add new test. Author: Davies Liu <davies@databricks.com> Closes #13151 from davies/fix_mode.
Diffstat (limited to 'core/src/test/java')
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java28
-rw-r--r--core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java15
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java13
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java2
5 files changed, 42 insertions, 20 deletions
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 127789b632..ad755529de 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -34,7 +34,8 @@ public class TaskMemoryManagerSuite {
Long.MAX_VALUE,
1),
0);
- manager.allocatePage(4096, null); // leak memory
+ final MemoryConsumer c = new TestMemoryConsumer(manager);
+ manager.allocatePage(4096, c); // leak memory
Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
}
@@ -45,7 +46,8 @@ public class TaskMemoryManagerSuite {
.set("spark.memory.offHeap.enabled", "true")
.set("spark.memory.offHeap.size", "1000");
final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock dataPage = manager.allocatePage(256, null);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
// encode. This test exercises that corner-case:
final long offset = ((1L << TaskMemoryManager.OFFSET_BITS) + 10);
@@ -58,7 +60,8 @@ public class TaskMemoryManagerSuite {
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
- final MemoryBlock dataPage = manager.allocatePage(256, null);
+ final MemoryConsumer c = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ final MemoryBlock dataPage = manager.allocatePage(256, c);
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
Assert.assertEquals(64, manager.getOffsetInPage(encodedAddress));
@@ -107,6 +110,25 @@ public class TaskMemoryManagerSuite {
}
@Test
+ public void shouldNotForceSpillingInDifferentModes() {
+ final TestMemoryManager memoryManager = new TestMemoryManager(new SparkConf());
+ memoryManager.limit(100);
+ final TaskMemoryManager manager = new TaskMemoryManager(memoryManager, 0);
+
+ TestMemoryConsumer c1 = new TestMemoryConsumer(manager, MemoryMode.ON_HEAP);
+ TestMemoryConsumer c2 = new TestMemoryConsumer(manager, MemoryMode.OFF_HEAP);
+ c1.use(80);
+ Assert.assertEquals(80, c1.getUsed());
+ c2.use(80);
+ Assert.assertEquals(20, c2.getUsed()); // not enough memory
+ Assert.assertEquals(80, c1.getUsed()); // not spilled
+
+ c2.use(10);
+ Assert.assertEquals(10, c2.getUsed()); // spilled
+ Assert.assertEquals(80, c1.getUsed()); // not spilled
+ }
+
+ @Test
public void offHeapConfigurationBackwardsCompatibility() {
// Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
// was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index e6e16fff80..db91329c94 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -20,8 +20,11 @@ package org.apache.spark.memory;
import java.io.IOException;
public class TestMemoryConsumer extends MemoryConsumer {
+ public TestMemoryConsumer(TaskMemoryManager memoryManager, MemoryMode mode) {
+ super(memoryManager, 1024L, mode);
+ }
public TestMemoryConsumer(TaskMemoryManager memoryManager) {
- super(memoryManager);
+ this(memoryManager, MemoryMode.ON_HEAP);
}
@Override
@@ -32,19 +35,13 @@ public class TestMemoryConsumer extends MemoryConsumer {
}
void use(long size) {
- long got = taskMemoryManager.acquireExecutionMemory(
- size,
- taskMemoryManager.tungstenMemoryMode,
- this);
+ long got = taskMemoryManager.acquireExecutionMemory(size, this);
used += got;
}
void free(long size) {
used -= size;
- taskMemoryManager.releaseExecutionMemory(
- size,
- taskMemoryManager.tungstenMemoryMode,
- this);
+ taskMemoryManager.releaseExecutionMemory(size, this);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
index fe5abc5c23..354efe18db 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
@@ -22,8 +22,7 @@ import java.io.IOException;
import org.junit.Test;
import org.apache.spark.SparkConf;
-import org.apache.spark.memory.TestMemoryManager;
-import org.apache.spark.memory.TaskMemoryManager;
+import org.apache.spark.memory.*;
import org.apache.spark.unsafe.memory.MemoryBlock;
import static org.apache.spark.shuffle.sort.PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES;
@@ -38,8 +37,9 @@ public class PackedRecordPointerSuite {
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock page0 = memoryManager.allocatePage(128, null);
- final MemoryBlock page1 = memoryManager.allocatePage(128, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.ON_HEAP);
+ final MemoryBlock page0 = memoryManager.allocatePage(128, c);
+ final MemoryBlock page1 = memoryManager.allocatePage(128, c);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
@@ -59,8 +59,9 @@ public class PackedRecordPointerSuite {
.set("spark.memory.offHeap.size", "10000");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock page0 = memoryManager.allocatePage(128, null);
- final MemoryBlock page1 = memoryManager.allocatePage(128, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager, MemoryMode.OFF_HEAP);
+ final MemoryBlock page0 = memoryManager.allocatePage(128, c);
+ final MemoryBlock page1 = memoryManager.allocatePage(128, c);
final long addressInPage1 = memoryManager.encodePageNumberAndOffset(page1,
page1.getBaseOffset() + 42);
PackedRecordPointer packedPointer = new PackedRecordPointer();
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index 278a827644..694352ee2a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -26,6 +26,7 @@ import org.junit.Test;
import org.apache.spark.HashPartitioner;
import org.apache.spark.SparkConf;
+import org.apache.spark.memory.MemoryConsumer;
import org.apache.spark.memory.TaskMemoryManager;
import org.apache.spark.memory.TestMemoryConsumer;
import org.apache.spark.memory.TestMemoryManager;
@@ -71,7 +72,8 @@ public class ShuffleInMemorySorterSuite {
final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
+ final MemoryConsumer c = new TestMemoryConsumer(memoryManager);
+ final MemoryBlock dataPage = memoryManager.allocatePage(2048, c);
final Object baseObject = dataPage.getBaseObject();
final ShuffleInMemorySorter sorter = new ShuffleInMemorySorter(
consumer, 4, shouldUseRadixSort());
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index 4a2f65a0ed..383c5b3b08 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -78,7 +78,7 @@ public class UnsafeInMemorySorterSuite {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
- final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
+ final MemoryBlock dataPage = memoryManager.allocatePage(2048, consumer);
final Object baseObject = dataPage.getBaseObject();
// Write the records into the data page:
long position = dataPage.getBaseOffset();