aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-12-10 15:29:04 -0800
committerAndrew Or <andrew@databricks.com>2015-12-10 15:29:04 -0800
commit23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5 (patch)
tree67ab5872819bf8a25ec2de0c0d3af5dceba8d877 /core
parent6a6c1fc5c807ba4e8aba3e260537aa527ff5d46a (diff)
downloadspark-23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5.tar.gz
spark-23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5.tar.bz2
spark-23a9e62bad9669e9ff5dc4bd714f58d12f9be0b5.zip
[SPARK-12251] Document and improve off-heap memory configurations
This patch adds documentation for Spark configurations that affect off-heap memory and makes some naming and validation improvements for those configs. - Change `spark.memory.offHeapSize` to `spark.memory.offHeap.size`. This is fine because this configuration has not shipped in any Spark release yet (it's new in Spark 1.6). - Deprecated `spark.unsafe.offHeap` in favor of a new `spark.memory.offHeap.enabled` configuration. The motivation behind this change is to gather all memory-related configurations under the same prefix. - Add a check which prevents users from setting `spark.memory.offHeap.enabled=true` when `spark.memory.offHeap.size == 0`. After SPARK-11389 (#9344), which was committed in Spark 1.6, Spark enforces a hard limit on the amount of off-heap memory that it will allocate to tasks. As a result, enabling off-heap execution memory without setting `spark.memory.offHeap.size` will lead to immediate OOMs. The new configuration validation makes this scenario easier to diagnose, helping to avoid user confusion. - Document these configurations on the configuration page. Author: Josh Rosen <joshrosen@databricks.com> Closes #10237 from JoshRosen/SPARK-12251.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala10
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java21
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala2
11 files changed, 42 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 19633a3ce6..d3384fb297 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
- AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
+ AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
+ "spark.memory.offHeap.enabled" -> Seq(
+ AlternateConfig("spark.unsafe.offHeap", "1.6"))
)
/**
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ae9e1ac0e2..e707e27d96 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager(
storageMemoryPool.incrementPoolSize(storageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
+ offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
@@ -182,7 +182,13 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
- if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
+ if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
+ require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
+ "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
+ MemoryMode.OFF_HEAP
+ } else {
+ MemoryMode.ON_HEAP
+ }
}
/**
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 711eed0193..776a2997cf 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.unsafe.offHeap", "false"),
+ new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
1),
@@ -41,8 +41,10 @@ public class TaskMemoryManagerSuite {
@Test
public void encodePageNumberAndOffsetOffHeap() {
- final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "true")), 0);
+ final SparkConf conf = new SparkConf()
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1000");
+ final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
// encode. This test exercises that corner-case:
@@ -55,7 +57,7 @@ public class TaskMemoryManagerSuite {
@Test
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
@@ -104,4 +106,15 @@ public class TaskMemoryManagerSuite {
assert(manager.cleanUpAllAllocatedMemory() == 0);
}
+ @Test
+ public void offHeapConfigurationBackwardsCompatibility() {
+ // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
+ // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
+ final SparkConf conf = new SparkConf()
+ .set("spark.unsafe.offHeap", "true")
+ .set("spark.memory.offHeap.size", "1000");
+ final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
+ assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP);
+ }
+
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
index 9a43f1f3a9..fe5abc5c23 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
@@ -35,7 +35,7 @@ public class PackedRecordPointerSuite {
@Test
public void heap() throws IOException {
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
+ final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
@@ -54,7 +54,9 @@ public class PackedRecordPointerSuite {
@Test
public void offHeap() throws IOException {
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true");
+ final SparkConf conf = new SparkConf()
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "10000");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index faa5a863ee..0328e63e45 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -34,7 +34,7 @@ import org.apache.spark.unsafe.memory.MemoryBlock;
public class ShuffleInMemorySorterSuite {
final TestMemoryManager memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager);
@@ -64,7 +64,7 @@ public class ShuffleInMemorySorterSuite {
"Lychee",
"Mango"
};
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
+ final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index bc85918c59..5fe64bde36 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -108,7 +108,7 @@ public class UnsafeShuffleWriterSuite {
spillFilesCreated.clear();
conf = new SparkConf()
.set("spark.buffer.pageSize", "1m")
- .set("spark.unsafe.offHeap", "false");
+ .set("spark.memory.offHeap.enabled", "false");
taskMetrics = new TaskMetrics();
memoryManager = new TestMemoryManager(conf);
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 8724a34988..702ba5469b 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -85,8 +85,8 @@ public abstract class AbstractBytesToBytesMapSuite {
memoryManager =
new TestMemoryManager(
new SparkConf()
- .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
- .set("spark.memory.offHeapSize", "256mb"));
+ .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator())
+ .set("spark.memory.offHeap.size", "256mb"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index a1c9f6fab8..e0ee281e98 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite {
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
final TestMemoryManager memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index a203a09648..93efd033eb 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -46,7 +46,7 @@ public class UnsafeInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer,
memoryManager,
@@ -71,7 +71,7 @@ public class UnsafeInMemorySorterSuite {
"Mango"
};
final TaskMemoryManager memoryManager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 272253bc94..68cf26fc3e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -47,7 +47,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
conf.clone
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+ .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxStorageMemory = 0,
numCores = 1)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 71221deeb4..e21a028b7f 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -42,7 +42,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
+ .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}