aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala10
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java21
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java6
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java4
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala2
-rw-r--r--docs/configuration.md16
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala6
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala2
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala2
15 files changed, 65 insertions, 22 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index 19633a3ce6..d3384fb297 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -597,7 +597,9 @@ private[spark] object SparkConf extends Logging {
"spark.streaming.fileStream.minRememberDuration" -> Seq(
AlternateConfig("spark.streaming.minRememberDuration", "1.5")),
"spark.yarn.max.executor.failures" -> Seq(
- AlternateConfig("spark.yarn.max.worker.failures", "1.5"))
+ AlternateConfig("spark.yarn.max.worker.failures", "1.5")),
+ "spark.memory.offHeap.enabled" -> Seq(
+ AlternateConfig("spark.unsafe.offHeap", "1.6"))
)
/**
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index ae9e1ac0e2..e707e27d96 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -50,7 +50,7 @@ private[spark] abstract class MemoryManager(
storageMemoryPool.incrementPoolSize(storageMemory)
onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
- offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
+ offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeap.size", 0))
/**
* Total available memory for storage, in bytes. This amount can vary over time, depending on
@@ -182,7 +182,13 @@ private[spark] abstract class MemoryManager(
* sun.misc.Unsafe.
*/
final val tungstenMemoryMode: MemoryMode = {
- if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
+ if (conf.getBoolean("spark.memory.offHeap.enabled", false)) {
+ require(conf.getSizeAsBytes("spark.memory.offHeap.size", 0) > 0,
+ "spark.memory.offHeap.size must be > 0 when spark.memory.offHeap.enabled == true")
+ MemoryMode.OFF_HEAP
+ } else {
+ MemoryMode.ON_HEAP
+ }
}
/**
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index 711eed0193..776a2997cf 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -29,7 +29,7 @@ public class TaskMemoryManagerSuite {
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.unsafe.offHeap", "false"),
+ new SparkConf().set("spark.memory.offHeap.enabled", "false"),
Long.MAX_VALUE,
Long.MAX_VALUE,
1),
@@ -41,8 +41,10 @@ public class TaskMemoryManagerSuite {
@Test
public void encodePageNumberAndOffsetOffHeap() {
- final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "true")), 0);
+ final SparkConf conf = new SparkConf()
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "1000");
+ final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
// In off-heap mode, an offset is an absolute address that may require more than 51 bits to
// encode. This test exercises that corner-case:
@@ -55,7 +57,7 @@ public class TaskMemoryManagerSuite {
@Test
public void encodePageNumberAndOffsetOnHeap() {
final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final MemoryBlock dataPage = manager.allocatePage(256, null);
final long encodedAddress = manager.encodePageNumberAndOffset(dataPage, 64);
Assert.assertEquals(dataPage.getBaseObject(), manager.getPage(encodedAddress));
@@ -104,4 +106,15 @@ public class TaskMemoryManagerSuite {
assert(manager.cleanUpAllAllocatedMemory() == 0);
}
+ @Test
+ public void offHeapConfigurationBackwardsCompatibility() {
+ // Tests backwards-compatibility with the old `spark.unsafe.offHeap` configuration, which
+ // was deprecated in Spark 1.6 and replaced by `spark.memory.offHeap.enabled` (see SPARK-12251).
+ final SparkConf conf = new SparkConf()
+ .set("spark.unsafe.offHeap", "true")
+ .set("spark.memory.offHeap.size", "1000");
+ final TaskMemoryManager manager = new TaskMemoryManager(new TestMemoryManager(conf), 0);
+ assert(manager.tungstenMemoryMode == MemoryMode.OFF_HEAP);
+ }
+
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
index 9a43f1f3a9..fe5abc5c23 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/PackedRecordPointerSuite.java
@@ -35,7 +35,7 @@ public class PackedRecordPointerSuite {
@Test
public void heap() throws IOException {
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
+ final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
@@ -54,7 +54,9 @@ public class PackedRecordPointerSuite {
@Test
public void offHeap() throws IOException {
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "true");
+ final SparkConf conf = new SparkConf()
+ .set("spark.memory.offHeap.enabled", "true")
+ .set("spark.memory.offHeap.size", "10000");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock page0 = memoryManager.allocatePage(128, null);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
index faa5a863ee..0328e63e45 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorterSuite.java
@@ -34,7 +34,7 @@ import org.apache.spark.unsafe.memory.MemoryBlock;
public class ShuffleInMemorySorterSuite {
final TestMemoryManager memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(taskMemoryManager);
@@ -64,7 +64,7 @@ public class ShuffleInMemorySorterSuite {
"Lychee",
"Mango"
};
- final SparkConf conf = new SparkConf().set("spark.unsafe.offHeap", "false");
+ final SparkConf conf = new SparkConf().set("spark.memory.offHeap.enabled", "false");
final TaskMemoryManager memoryManager =
new TaskMemoryManager(new TestMemoryManager(conf), 0);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index bc85918c59..5fe64bde36 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -108,7 +108,7 @@ public class UnsafeShuffleWriterSuite {
spillFilesCreated.clear();
conf = new SparkConf()
.set("spark.buffer.pageSize", "1m")
- .set("spark.unsafe.offHeap", "false");
+ .set("spark.memory.offHeap.enabled", "false");
taskMetrics = new TaskMetrics();
memoryManager = new TestMemoryManager(conf);
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 8724a34988..702ba5469b 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -85,8 +85,8 @@ public abstract class AbstractBytesToBytesMapSuite {
memoryManager =
new TestMemoryManager(
new SparkConf()
- .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
- .set("spark.memory.offHeapSize", "256mb"));
+ .set("spark.memory.offHeap.enabled", "" + useOffHeapMemoryAllocator())
+ .set("spark.memory.offHeap.size", "256mb"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
index a1c9f6fab8..e0ee281e98 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorterSuite.java
@@ -58,7 +58,7 @@ public class UnsafeExternalSorterSuite {
final LinkedList<File> spillFilesCreated = new LinkedList<File>();
final TestMemoryManager memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"));
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"));
final TaskMemoryManager taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
// Use integer comparison for comparing prefixes (which are partition ids, in this case)
final PrefixComparator prefixComparator = new PrefixComparator() {
diff --git a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
index a203a09648..93efd033eb 100644
--- a/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
+++ b/core/src/test/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorterSuite.java
@@ -46,7 +46,7 @@ public class UnsafeInMemorySorterSuite {
@Test
public void testSortingEmptyInput() {
final TaskMemoryManager memoryManager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final UnsafeInMemorySorter sorter = new UnsafeInMemorySorter(consumer,
memoryManager,
@@ -71,7 +71,7 @@ public class UnsafeInMemorySorterSuite {
"Mango"
};
final TaskMemoryManager memoryManager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false")), 0);
final TestMemoryConsumer consumer = new TestMemoryConsumer(memoryManager);
final MemoryBlock dataPage = memoryManager.allocatePage(2048, null);
final Object baseObject = dataPage.getBaseObject();
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 272253bc94..68cf26fc3e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -47,7 +47,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
conf.clone
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+ .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString),
maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxStorageMemory = 0,
numCores = 1)
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 71221deeb4..e21a028b7f 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -42,7 +42,7 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
val conf = new SparkConf()
.set("spark.memory.fraction", "1")
.set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
- .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
+ .set("spark.memory.offHeap.size", maxOffHeapExecutionMemory.toString)
.set("spark.memory.storageFraction", storageFraction.toString)
UnifiedMemoryManager(conf, numCores = 1)
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 873a2d0b30..55cf4b2dac 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -739,6 +739,22 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.memory.offHeap.enabled</code></td>
+ <td>true</td>
+ <td>
+ If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.memory.offHeap.size</code></td>
+ <td>0</td>
+ <td>
+ The absolute amount of memory which can be used for off-heap allocation.
+ This setting has no impact on heap memory usage, so if your executors' total memory consumption must fit within some hard limit then be sure to shrink your JVM heap size accordingly.
+ This must be set to a positive value when <code>spark.memory.offHeap.enabled=true</code>.
+ </td>
+</tr>
+<tr>
<td><code>spark.memory.useLegacyMode</code></td>
<td>false</td>
<td>
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index aebfea5832..8c7099ab5a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -334,7 +334,11 @@ private[joins] final class UnsafeHashedRelation(
// so that tests compile:
val taskMemoryManager = new TaskMemoryManager(
new StaticMemoryManager(
- new SparkConf().set("spark.unsafe.offHeap", "false"), Long.MaxValue, Long.MaxValue, 1), 0)
+ new SparkConf().set("spark.memory.offHeap.enabled", "false"),
+ Long.MaxValue,
+ Long.MaxValue,
+ 1),
+ 0)
val pageSizeBytes = Option(SparkEnv.get).map(_.memoryManager.pageSizeBytes)
.getOrElse(new SparkConf().getSizeAsBytes("spark.buffer.pageSize", "16m"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
index 7ceaee38d1..5a8406789a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeFixedWidthAggregationMapSuite.scala
@@ -61,7 +61,7 @@ class UnsafeFixedWidthAggregationMapSuite
}
test(name) {
- val conf = new SparkConf().set("spark.unsafe.offHeap", "false")
+ val conf = new SparkConf().set("spark.memory.offHeap.enabled", "false")
memoryManager = new TestMemoryManager(conf)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 7b80963ec8..29027a664b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -109,7 +109,7 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
pageSize: Long,
spill: Boolean): Unit = {
val memoryManager =
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false"))
+ new TestMemoryManager(new SparkConf().set("spark.memory.offHeap.enabled", "false"))
val taskMemMgr = new TaskMemoryManager(memoryManager, 0)
TaskContext.setTaskContext(new TaskContextImpl(
stageId = 0,