aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@databricks.com>2015-11-06 18:17:34 -0800
committerJosh Rosen <joshrosen@databricks.com>2015-11-06 18:17:34 -0800
commit30b706b7b36482921ec04145a0121ca147984fa8 (patch)
tree79c8309fa8255ac238d2e9dc5c4e4f0349e9da22
parent105732dcc6b651b9779f4a5773a759c5b4fbd21d (diff)
downloadspark-30b706b7b36482921ec04145a0121ca147984fa8.tar.gz
spark-30b706b7b36482921ec04145a0121ca147984fa8.tar.bz2
spark-30b706b7b36482921ec04145a0121ca147984fa8.zip
[SPARK-11389][CORE] Add support for off-heap memory to MemoryManager
In order to lay the groundwork for proper off-heap memory support in SQL / Tungsten, we need to extend our MemoryManager to perform bookkeeping for off-heap memory. ## User-facing changes This PR introduces a new configuration, `spark.memory.offHeapSize` (name subject to change), which specifies the absolute amount of off-heap memory that Spark and Spark SQL can use. If Tungsten is configured to use off-heap execution memory for allocating data pages, then all data page allocations must fit within this size limit. ## Internals changes This PR contains a lot of internal refactoring of the MemoryManager. The key change at the heart of this patch is the introduction of a `MemoryPool` class (name subject to change) to manage the bookkeeping for a particular category of memory (storage, on-heap execution, and off-heap execution). These MemoryPools are not fixed-size; they can be dynamically grown and shrunk according to the MemoryManager's policies. In StaticMemoryManager, these pools have fixed sizes, proportional to the legacy `[storage|shuffle].memoryFraction`. In the new UnifiedMemoryManager, the sizes of these pools are dynamically adjusted according to its policies. There are two subclasses of `MemoryPool`: `StorageMemoryPool` manages storage memory and `ExecutionMemoryPool` manages execution memory. The MemoryManager creates two execution pools, one for on-heap memory and one for off-heap. Instances of `ExecutionMemoryPool` manage the logic for fair sharing of their pooled memory across running tasks (in other words, the ShuffleMemoryManager-like logic has been moved out of MemoryManager and pushed into these ExecutionMemoryPool instances). I think that this design is substantially easier to understand and reason about than the previous design, where most of these responsibilities were handled by MemoryManager and its subclasses. To see this, take at look at how simple the logic in `UnifiedMemoryManager` has become: it's now very easy to see when memory is dynamically shifted between storage and execution. ## TODOs - [x] Fix handful of test failures in the MemoryManagerSuites. - [x] Fix remaining TODO comments in code. - [ ] Document new configuration. - [x] Fix commented-out tests / asserts: - [x] UnifiedMemoryManagerSuite. - [x] Write tests that exercise the new off-heap memory management policies. Author: Josh Rosen <joshrosen@databricks.com> Closes #9344 from JoshRosen/offheap-memory-accounting.
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryConsumer.java7
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryMode.java26
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java72
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala153
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala246
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryPool.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/memory/package.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Spillable.scala8
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java8
-rw-r--r--core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java10
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala93
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
21 files changed, 828 insertions, 465 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72832..36138cc9a2 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -17,15 +17,15 @@
package org.apache.spark.memory;
-
import java.io.IOException;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
-
/**
* An memory consumer of TaskMemoryManager, which support spilling.
+ *
+ * Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {
@@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
- this.used = 0;
}
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
@@ -67,6 +66,8 @@ public abstract class MemoryConsumer {
*
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
+ * Note: today, this only frees Tungsten-managed pages.
+ *
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
new file mode 100644
index 0000000000..3a5e72d8aa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+import org.apache.spark.annotation.Private;
+
+@Private
+public enum MemoryMode {
+ ON_HEAP,
+ OFF_HEAP
+}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 6440f9c0f3..5f743b2885 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -103,10 +103,10 @@ public class TaskMemoryManager {
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
- private final boolean inHeap;
+ final MemoryMode tungstenMemoryMode;
/**
- * The size of memory granted to each consumer.
+ * Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
- this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
+ this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
@@ -127,12 +127,19 @@ public class TaskMemoryManager {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ public long acquireExecutionMemory(
+ long required,
+ MemoryMode mode,
+ MemoryConsumer consumer) {
assert(required >= 0);
+ // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
+ // memory here, then it may not make sense to spill since that would only end up freeing
+ // off-heap memory. This is subject to change, though, so it may be risky to make this
+ // optimization now in case we forget to undo it late when making changes.
synchronized (this) {
- long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
+ long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
- // try to release memory from other consumers first, then we can reduce the frequency of
+ // Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
@@ -140,10 +147,10 @@ public class TaskMemoryManager {
if (c != consumer && c.getUsed() > 0) {
try {
long released = c.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from {} for {}", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
@@ -161,10 +168,10 @@ public class TaskMemoryManager {
if (got < required && consumer != null) {
try {
long released = consumer.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from itself ({})", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
@@ -184,9 +191,9 @@ public class TaskMemoryManager {
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
- public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+ public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
- memoryManager.releaseExecutionMemory(size, taskAttemptId);
+ memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
}
/**
@@ -195,11 +202,19 @@ public class TaskMemoryManager {
public void showMemoryUsage() {
logger.info("Memory used in task " + taskAttemptId);
synchronized (this) {
+ long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
- if (c.getUsed() > 0) {
- logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
+ long totalMemUsage = c.getUsed();
+ memoryAccountedForByConsumers += totalMemUsage;
+ if (totalMemUsage > 0) {
+ logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
}
}
+ long memoryNotAccountedFor =
+ memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
+ logger.info(
+ "{} bytes of memory were used by task {} but are not associated with specific consumers",
+ memoryNotAccountedFor, taskAttemptId);
}
}
@@ -214,7 +229,8 @@ public class TaskMemoryManager {
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
- * Returns `null` if there was not enough memory to allocate the page.
+ * Returns `null` if there was not enough memory to allocate the page. May return a page that
+ * contains fewer bytes than requested, so callers should verify the size of returned pages.
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
@@ -222,7 +238,7 @@ public class TaskMemoryManager {
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
- long acquired = acquireExecutionMemory(size, consumer);
+ long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
if (acquired <= 0) {
return null;
}
@@ -231,7 +247,7 @@ public class TaskMemoryManager {
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
- releaseExecutionMemory(acquired, consumer);
+ releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
@@ -262,7 +278,7 @@ public class TaskMemoryManager {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize, consumer);
+ releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
}
/**
@@ -276,7 +292,7 @@ public class TaskMemoryManager {
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
- if (!inHeap) {
+ if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
@@ -305,7 +321,7 @@ public class TaskMemoryManager {
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
@@ -323,7 +339,7 @@ public class TaskMemoryManager {
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
@@ -351,11 +367,19 @@ public class TaskMemoryManager {
}
consumers.clear();
}
+
+ for (MemoryBlock page : pageTable) {
+ if (page != null) {
+ memoryManager.tungstenMemoryAllocator().free(page);
+ }
+ }
+ Arrays.fill(pageTable, null);
+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
/**
- * Returns the memory consumption, in bytes, for the current task
+ * Returns the memory consumption, in bytes, for the current task.
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 23ae9360f6..4474a83bed 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -341,7 +341,7 @@ object SparkEnv extends Logging {
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
- new UnifiedMemoryManager(conf, numUsableCores)
+ UnifiedMemoryManager(conf, numUsableCores)
}
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
new file mode 100644
index 0000000000..7825bae425
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+
+/**
+ * Implements policies and bookkeeping for sharing a adjustable-sized pool of memory between tasks.
+ *
+ * Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up
+ * to a large amount first and then causing others to spill to disk repeatedly.
+ *
+ * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
+ * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
+ * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this
+ * set changes. This is all done by synchronizing access to mutable state and using wait() and
+ * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
+ * tasks was performed by the ShuffleMemoryManager.
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ * @param poolName a human-readable name for this pool, for use in log messages
+ */
+class ExecutionMemoryPool(
+ lock: Object,
+ poolName: String
+ ) extends MemoryPool(lock) with Logging {
+
+ /**
+ * Map from taskAttemptId -> memory consumption in bytes
+ */
+ @GuardedBy("lock")
+ private val memoryForTask = new mutable.HashMap[Long, Long]()
+
+ override def memoryUsed: Long = lock.synchronized {
+ memoryForTask.values.sum
+ }
+
+ /**
+ * Returns the memory consumption, in bytes, for the given task.
+ */
+ def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
+ memoryForTask.getOrElse(taskAttemptId, 0L)
+ }
+
+ /**
+ * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
+ * obtained, or 0 if none can be allocated.
+ *
+ * This call may block until there is enough free memory in some situations, to make sure each
+ * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
+ * active tasks) before it is forced to spill. This can happen if the number of tasks increase
+ * but an older task had a lot of memory already.
+ *
+ * @return the number of bytes granted to the task.
+ */
+ def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+ assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
+
+ // Add this task to the taskMemory map just so we can keep an accurate count of the number
+ // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
+ if (!memoryForTask.contains(taskAttemptId)) {
+ memoryForTask(taskAttemptId) = 0L
+ // This will later cause waiting tasks to wake up and check numTasks again
+ lock.notifyAll()
+ }
+
+ // Keep looping until we're either sure that we don't want to grant this request (because this
+ // task would have more than 1 / numActiveTasks of the memory) or we have enough free
+ // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
+ // TODO: simplify this to limit each task to its own slot
+ while (true) {
+ val numActiveTasks = memoryForTask.keys.size
+ val curMem = memoryForTask(taskAttemptId)
+
+ // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
+ // don't let it be negative
+ val maxToGrant =
+ math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+ // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
+ val toGrant = math.min(maxToGrant, memoryFree)
+
+ if (curMem < poolSize / (2 * numActiveTasks)) {
+ // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+ // if we can't give it this much now, wait for other tasks to free up memory
+ // (this happens if older tasks allocated lots of memory before N grew)
+ if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
+ memoryForTask(taskAttemptId) += toGrant
+ return toGrant
+ } else {
+ logInfo(
+ s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+ lock.wait()
+ }
+ } else {
+ memoryForTask(taskAttemptId) += toGrant
+ return toGrant
+ }
+ }
+ 0L // Never reached
+ }
+
+ /**
+ * Release `numBytes` of memory acquired by the given task.
+ */
+ def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
+ val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
+ var memoryToFree = if (curMem < numBytes) {
+ logWarning(
+ s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
+ s"of memory from the $poolName pool")
+ curMem
+ } else {
+ numBytes
+ }
+ if (memoryForTask.contains(taskAttemptId)) {
+ memoryForTask(taskAttemptId) -= memoryToFree
+ if (memoryForTask(taskAttemptId) <= 0) {
+ memoryForTask.remove(taskAttemptId)
+ }
+ }
+ lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
+ }
+
+ /**
+ * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+ * @return the number of bytes freed.
+ */
+ def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
+ val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
+ releaseMemory(numBytesToFree, taskAttemptId)
+ numBytesToFree
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b0cf2696a3..ceb8ea434e 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,12 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging}
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -36,53 +32,40 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one MemoryManager per JVM.
- *
- * The MemoryManager abstract base class itself implements policies for sharing execution memory
- * between tasks; it tries to ensure that each task gets a reasonable share of memory, instead of
- * some task ramping up to a large amount first and then causing others to spill to disk repeatedly.
- * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
- * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
- * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
- * this set changes. This is all done by synchronizing access to mutable state and using wait() and
- * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
- * tasks was performed by the ShuffleMemoryManager.
*/
-private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) extends Logging {
+private[spark] abstract class MemoryManager(
+ conf: SparkConf,
+ numCores: Int,
+ storageMemory: Long,
+ onHeapExecutionMemory: Long) extends Logging {
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
- // The memory store used to evict cached blocks
- private var _memoryStore: MemoryStore = _
- protected def memoryStore: MemoryStore = {
- if (_memoryStore == null) {
- throw new IllegalArgumentException("memory store not initialized yet")
- }
- _memoryStore
- }
+ @GuardedBy("this")
+ protected val storageMemoryPool = new StorageMemoryPool(this)
+ @GuardedBy("this")
+ protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
+ @GuardedBy("this")
+ protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
- // Amount of execution/storage memory in use, accesses must be synchronized on `this`
- @GuardedBy("this") protected var _executionMemoryUsed: Long = 0
- @GuardedBy("this") protected var _storageMemoryUsed: Long = 0
- // Map from taskAttemptId -> memory consumption in bytes
- @GuardedBy("this") private val executionMemoryForTask = new mutable.HashMap[Long, Long]()
-
- /**
- * Set the [[MemoryStore]] used by this manager to evict cached blocks.
- * This must be set after construction due to initialization ordering constraints.
- */
- final def setMemoryStore(store: MemoryStore): Unit = {
- _memoryStore = store
- }
+ storageMemoryPool.incrementPoolSize(storageMemory)
+ onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
+ offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
/**
- * Total available memory for execution, in bytes.
+ * Total available memory for storage, in bytes. This amount can vary over time, depending on
+ * the MemoryManager implementation.
+ * In this model, this is equivalent to the amount of memory not occupied by execution.
*/
- def maxExecutionMemory: Long
+ def maxStorageMemory: Long
/**
- * Total available memory for storage, in bytes.
+ * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+ * This must be set after construction due to initialization ordering constraints.
*/
- def maxStorageMemory: Long
+ final def setMemoryStore(store: MemoryStore): Unit = synchronized {
+ storageMemoryPool.setMemoryStore(store)
+ }
// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
@@ -94,7 +77,9 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ }
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -109,103 +94,25 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes, evictedBlocks)
- }
-
- /**
- * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return number of bytes successfully granted (<= N).
- */
- @VisibleForTesting
- private[memory] def doAcquireExecutionMemory(
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
/**
- * Try to acquire up to `numBytes` of execution memory for the current task and return the number
- * of bytes obtained, or 0 if none can be allocated.
+ * Try to acquire up to `numBytes` of execution memory for the current task and return the
+ * number of bytes obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
- *
- * Subclasses should override `doAcquireExecutionMemory` in order to customize the policies
- * that control global sharing of memory between execution and storage.
*/
private[memory]
- final def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long): Long = synchronized {
- assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
-
- // Add this task to the taskMemory map just so we can keep an accurate count of the number
- // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
- if (!executionMemoryForTask.contains(taskAttemptId)) {
- executionMemoryForTask(taskAttemptId) = 0L
- // This will later cause waiting tasks to wake up and check numTasks again
- notifyAll()
- }
-
- // Once the cross-task memory allocation policy has decided to grant more memory to a task,
- // this method is called in order to actually obtain that execution memory, potentially
- // triggering eviction of storage memory:
- def acquire(toGrant: Long): Long = synchronized {
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val acquired = doAcquireExecutionMemory(toGrant, evictedBlocks)
- // Register evicted blocks, if any, with the active task metrics
- Option(TaskContext.get()).foreach { tc =>
- val metrics = tc.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
- }
- executionMemoryForTask(taskAttemptId) += acquired
- acquired
- }
-
- // Keep looping until we're either sure that we don't want to grant this request (because this
- // task would have more than 1 / numActiveTasks of the memory) or we have enough free
- // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
- // TODO: simplify this to limit each task to its own slot
- while (true) {
- val numActiveTasks = executionMemoryForTask.keys.size
- val curMem = executionMemoryForTask(taskAttemptId)
- val freeMemory = maxExecutionMemory - executionMemoryForTask.values.sum
-
- // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
- // don't let it be negative
- val maxToGrant =
- math.min(numBytes, math.max(0, (maxExecutionMemory / numActiveTasks) - curMem))
- // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
- val toGrant = math.min(maxToGrant, freeMemory)
-
- if (curMem < maxExecutionMemory / (2 * numActiveTasks)) {
- // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
- // if we can't give it this much now, wait for other tasks to free up memory
- // (this happens if older tasks allocated lots of memory before N grew)
- if (
- freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * numActiveTasks) - curMem)) {
- return acquire(toGrant)
- } else {
- logInfo(
- s"TID $taskAttemptId waiting for at least 1/2N of execution memory pool to be free")
- wait()
- }
- } else {
- return acquire(toGrant)
- }
- }
- 0L // Never reached
- }
-
- @VisibleForTesting
- private[memory] def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _executionMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of execution " +
- s"memory when we only have ${_executionMemoryUsed} bytes")
- _executionMemoryUsed = 0
- } else {
- _executionMemoryUsed -= numBytes
+ def acquireExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
@@ -213,24 +120,14 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* Release numBytes of execution memory belonging to the given task.
*/
private[memory]
- final def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long): Unit = synchronized {
- val curMem = executionMemoryForTask.getOrElse(taskAttemptId, 0L)
- if (curMem < numBytes) {
- if (Utils.isTesting) {
- throw new SparkException(
- s"Internal error: release called on $numBytes bytes but task only has $curMem")
- } else {
- logWarning(s"Internal error: release called on $numBytes bytes but task only has $curMem")
- }
- }
- if (executionMemoryForTask.contains(taskAttemptId)) {
- executionMemoryForTask(taskAttemptId) -= numBytes
- if (executionMemoryForTask(taskAttemptId) <= 0) {
- executionMemoryForTask.remove(taskAttemptId)
- }
- releaseExecutionMemory(numBytes)
+ def releaseExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Unit = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
- notifyAll() // Notify waiters in acquireExecutionMemory() that memory has been freed
}
/**
@@ -238,35 +135,28 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* @return the number of bytes freed.
*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
- val numBytesToFree = getExecutionMemoryUsageForTask(taskAttemptId)
- releaseExecutionMemory(numBytesToFree, taskAttemptId)
- numBytesToFree
+ onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
+ offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _storageMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of storage " +
- s"memory when we only have ${_storageMemoryUsed} bytes")
- _storageMemoryUsed = 0
- } else {
- _storageMemoryUsed -= numBytes
- }
+ storageMemoryPool.releaseMemory(numBytes)
}
/**
* Release all storage memory acquired.
*/
- def releaseAllStorageMemory(): Unit = synchronized {
- _storageMemoryUsed = 0
+ final def releaseAllStorageMemory(): Unit = synchronized {
+ storageMemoryPool.releaseAllMemory()
}
/**
* Release N bytes of unroll memory.
*/
- def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+ final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
releaseStorageMemory(numBytes)
}
@@ -274,26 +164,35 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* Execution memory currently in use, in bytes.
*/
final def executionMemoryUsed: Long = synchronized {
- _executionMemoryUsed
+ onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed
}
/**
* Storage memory currently in use, in bytes.
*/
final def storageMemoryUsed: Long = synchronized {
- _storageMemoryUsed
+ storageMemoryPool.memoryUsed
}
/**
* Returns the execution memory consumption, in bytes, for the given task.
*/
private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {
- executionMemoryForTask.getOrElse(taskAttemptId, 0L)
+ onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
+ offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
}
// -- Fields related to Tungsten managed memory -------------------------------------------------
/**
+ * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
+ * sun.misc.Unsafe.
+ */
+ final val tungstenMemoryMode: MemoryMode = {
+ if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
+ }
+
+ /**
* The default page size, in bytes.
*
* If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
@@ -306,21 +205,22 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
val safetyFactor = 16
- val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / safetyFactor)
+ val maxTungstenMemory: Long = tungstenMemoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
+ }
+ val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
}
/**
- * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
- * sun.misc.Unsafe.
- */
- final val tungstenMemoryIsAllocatedInHeap: Boolean =
- !conf.getBoolean("spark.unsafe.offHeap", false)
-
- /**
* Allocates memory for use by Unsafe/Tungsten code.
*/
- private[memory] final val tungstenMemoryAllocator: MemoryAllocator =
- if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else MemoryAllocator.UNSAFE
+ private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
+ tungstenMemoryMode match {
+ case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
+ case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
new file mode 100644
index 0000000000..bfeec47e38
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+/**
+ * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to
+ * the [[MemoryManager]]. See subclasses for more details.
+ *
+ * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type
+ * to `Object` to avoid programming errors, since this object should only be used for
+ * synchronization purposes.
+ */
+abstract class MemoryPool(lock: Object) {
+
+ @GuardedBy("lock")
+ private[this] var _poolSize: Long = 0
+
+ /**
+ * Returns the current size of the pool, in bytes.
+ */
+ final def poolSize: Long = lock.synchronized {
+ _poolSize
+ }
+
+ /**
+ * Returns the amount of free memory in the pool, in bytes.
+ */
+ final def memoryFree: Long = lock.synchronized {
+ _poolSize - memoryUsed
+ }
+
+ /**
+ * Expands the pool by `delta` bytes.
+ */
+ final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
+ require(delta >= 0)
+ _poolSize += delta
+ }
+
+ /**
+ * Shrinks the pool by `delta` bytes.
+ */
+ final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
+ require(delta >= 0)
+ require(delta <= _poolSize)
+ require(_poolSize - delta >= memoryUsed)
+ _poolSize -= delta
+ }
+
+ /**
+ * Returns the amount of used memory in this pool (in bytes).
+ */
+ def memoryUsed: Long
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 9c2c2e90a2..12a0943068 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
-
/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
@@ -32,10 +31,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
*/
private[spark] class StaticMemoryManager(
conf: SparkConf,
- override val maxExecutionMemory: Long,
+ maxOnHeapExecutionMemory: Long,
override val maxStorageMemory: Long,
numCores: Int)
- extends MemoryManager(conf, numCores) {
+ extends MemoryManager(
+ conf,
+ numCores,
+ maxStorageMemory,
+ maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
this(
@@ -50,76 +53,15 @@ private[spark] class StaticMemoryManager(
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
- /**
- * Acquire N bytes of memory for execution.
- * @return number of bytes successfully granted (<= N).
- */
- override def doAcquireExecutionMemory(
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
- assert(numBytes >= 0)
- assert(_executionMemoryUsed <= maxExecutionMemory)
- val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
- _executionMemoryUsed += bytesToGrant
- bytesToGrant
- }
-
- /**
- * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return whether all N bytes were successfully granted.
- */
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
- }
-
- /**
- * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
- *
- * This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
- * space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
- * are added to `evictedBlocks`.
- *
- * @return whether all N bytes were successfully granted.
- */
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- val currentUnrollMemory = memoryStore.currentUnrollMemory
+ val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
- acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+ storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
-
- /**
- * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
- *
- * @param blockId the ID of the block we are acquiring storage memory for
- * @param numBytesToAcquire the size of this block
- * @param numBytesToFree the size of space to be freed through evicting blocks
- * @param evictedBlocks a holder for blocks evicted in the process
- * @return whether all N bytes were successfully granted.
- */
- private def acquireStorageMemory(
- blockId: BlockId,
- numBytesToAcquire: Long,
- numBytesToFree: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- assert(numBytesToAcquire >= 0)
- assert(numBytesToFree >= 0)
- memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- assert(_storageMemoryUsed <= maxStorageMemory)
- val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
- if (enoughMemory) {
- _storageMemoryUsed += numBytesToAcquire
- }
- enoughMemory
- }
-
}
@@ -135,7 +77,6 @@ private[spark] object StaticMemoryManager {
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
-
/**
* Return the total amount of memory available for the execution region, in bytes.
*/
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
new file mode 100644
index 0000000000..6a322eabf8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{TaskContext, Logging}
+import org.apache.spark.storage.{MemoryStore, BlockStatus, BlockId}
+
+/**
+ * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
+ * (caching).
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ */
+class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
+
+ @GuardedBy("lock")
+ private[this] var _memoryUsed: Long = 0L
+
+ override def memoryUsed: Long = lock.synchronized {
+ _memoryUsed
+ }
+
+ private var _memoryStore: MemoryStore = _
+ def memoryStore: MemoryStore = {
+ if (_memoryStore == null) {
+ throw new IllegalStateException("memory store not initialized yet")
+ }
+ _memoryStore
+ }
+
+ /**
+ * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+ * This must be set after construction due to initialization ordering constraints.
+ */
+ final def setMemoryStore(store: MemoryStore): Unit = {
+ _memoryStore = store
+ }
+
+ /**
+ * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return whether all N bytes were successfully granted.
+ */
+ def acquireMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+ }
+
+ /**
+ * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
+ *
+ * @param blockId the ID of the block we are acquiring storage memory for
+ * @param numBytesToAcquire the size of this block
+ * @param numBytesToFree the size of space to be freed through evicting blocks
+ * @return whether all N bytes were successfully granted.
+ */
+ def acquireMemory(
+ blockId: BlockId,
+ numBytesToAcquire: Long,
+ numBytesToFree: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ assert(numBytesToAcquire >= 0)
+ assert(numBytesToFree >= 0)
+ assert(memoryUsed <= poolSize)
+ memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
+ // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
+ // back into this StorageMemoryPool in order to free. Therefore, these variables should have
+ // been updated.
+ val enoughMemory = numBytesToAcquire <= memoryFree
+ if (enoughMemory) {
+ _memoryUsed += numBytesToAcquire
+ }
+ enoughMemory
+ }
+
+ def releaseMemory(size: Long): Unit = lock.synchronized {
+ if (size > _memoryUsed) {
+ logWarning(s"Attempted to release $size bytes of storage " +
+ s"memory when we only have ${_memoryUsed} bytes")
+ _memoryUsed = 0
+ } else {
+ _memoryUsed -= size
+ }
+ }
+
+ def releaseAllMemory(): Unit = lock.synchronized {
+ _memoryUsed = 0
+ }
+
+ /**
+ * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
+ * of bytes removed from the pool's capacity.
+ */
+ def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
+ // First, shrink the pool by reclaiming free memory:
+ val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+ decrementPoolSize(spaceFreedByReleasingUnusedMemory)
+ if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
+ spaceFreedByReleasingUnusedMemory
+ } else {
+ // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
+ val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
+ val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
+ _memoryUsed -= spaceFreedByEviction
+ decrementPoolSize(spaceFreedByEviction)
+ spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a3093030a0..8be5b05419 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockStatus, BlockId}
-
/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
@@ -41,98 +40,105 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
* The implication is that attempts to cache blocks may fail if execution has already eaten
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
+ *
+ * @param storageRegionSize Size of the storage region, in bytes.
+ * This region is not statically reserved; execution can borrow from
+ * it if necessary. Cached blocks can be evicted only if actual
+ * storage memory usage exceeds this region.
*/
-private[spark] class UnifiedMemoryManager(
+private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
maxMemory: Long,
+ private val storageRegionSize: Long,
numCores: Int)
- extends MemoryManager(conf, numCores) {
-
- def this(conf: SparkConf, numCores: Int) {
- this(conf, UnifiedMemoryManager.getMaxMemory(conf), numCores)
- }
-
- /**
- * Size of the storage region, in bytes.
- *
- * This region is not statically reserved; execution can borrow from it if necessary.
- * Cached blocks can be evicted only if actual storage memory usage exceeds this region.
- */
- private val storageRegionSize: Long = {
- (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
- }
-
- /**
- * Total amount of memory, in bytes, not currently occupied by either execution or storage.
- */
- private def totalFreeMemory: Long = synchronized {
- assert(_executionMemoryUsed <= maxMemory)
- assert(_storageMemoryUsed <= maxMemory)
- assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
- maxMemory - _executionMemoryUsed - _storageMemoryUsed
- }
+ extends MemoryManager(
+ conf,
+ numCores,
+ storageRegionSize,
+ maxMemory - storageRegionSize) {
- /**
- * Total available memory for execution, in bytes.
- * In this model, this is equivalent to the amount of memory not occupied by storage.
- */
- override def maxExecutionMemory: Long = synchronized {
- maxMemory - _storageMemoryUsed
- }
+ // We always maintain this invariant:
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
- /**
- * Total available memory for storage, in bytes.
- * In this model, this is equivalent to the amount of memory not occupied by execution.
- */
override def maxStorageMemory: Long = synchronized {
- maxMemory - _executionMemoryUsed
+ maxMemory - onHeapExecutionMemoryPool.memoryUsed
}
/**
- * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ * Try to acquire up to `numBytes` of execution memory for the current task and return the
+ * number of bytes obtained, or 0 if none can be allocated.
*
- * This method evicts blocks only up to the amount of memory borrowed by storage.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return number of bytes successfully granted (<= N).
+ * This call may block until there is enough free memory in some situations, to make sure each
+ * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
+ * active tasks) before it is forced to spill. This can happen if the number of tasks increase
+ * but an older task had a lot of memory already.
*/
- private[memory] override def doAcquireExecutionMemory(
+ override private[memory] def acquireExecutionMemory(
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
- val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize)
- // If there is not enough free memory AND storage has borrowed some execution memory,
- // then evict as much memory borrowed by storage as needed to grant this request
- val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0
- if (shouldEvictStorage) {
- val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
- memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+ memoryMode match {
+ case MemoryMode.ON_HEAP =>
+ if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
+ val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage =
+ math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+ }
+ }
+ onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP =>
+ // For now, we only support on-heap caching of data, so we do not need to interact with
+ // the storage pool when allocating off-heap memory. This will change in the future, though.
+ super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
}
- val bytesToGrant = math.min(numBytes, totalFreeMemory)
- _executionMemoryUsed += bytesToGrant
- bytesToGrant
}
- /**
- * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return whether all N bytes were successfully granted.
- */
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
- memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
- val enoughMemory = totalFreeMemory >= numBytes
- if (enoughMemory) {
- _storageMemoryUsed += numBytes
+ if (numBytes > storageMemoryPool.memoryFree) {
+ // There is not enough free memory in the storage pool, so try to borrow free memory from
+ // the execution pool.
+ val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
+ onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
+ storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
- enoughMemory
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, evictedBlocks)
+ }
}
-private object UnifiedMemoryManager {
+object UnifiedMemoryManager {
+
+ def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
+ val maxMemory = getMaxMemory(conf)
+ new UnifiedMemoryManager(
+ conf,
+ maxMemory = maxMemory,
+ storageRegionSize =
+ (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
+ numCores = numCores)
+ }
/**
* Return the total amount of memory shared between execution and storage, in bytes.
diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala b/core/src/main/scala/org/apache/spark/memory/package.scala
new file mode 100644
index 0000000000..564e30d2ff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/package.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * This package implements Spark's memory management system. This system consists of two main
+ * components, a JVM-wide memory manager and a per-task manager:
+ *
+ * - [[org.apache.spark.memory.MemoryManager]] manages Spark's overall memory usage within a JVM.
+ * This component implements the policies for dividing the available memory across tasks and for
+ * allocating memory between storage (memory used caching and data transfer) and execution (memory
+ * used by computations, such as shuffles, joins, sorts, and aggregations).
+ * - [[org.apache.spark.memory.TaskMemoryManager]] manages the memory allocated by individual tasks.
+ * Tasks interact with TaskMemoryManager and never directly interact with the JVM-wide
+ * MemoryManager.
+ *
+ * Internally, each of these components have additional abstractions for memory bookkeeping:
+ *
+ * - [[org.apache.spark.memory.MemoryConsumer]]s are clients of the TaskMemoryManager and
+ * correspond to individual operators and data structures within a task. The TaskMemoryManager
+ * receives memory allocation requests from MemoryConsumers and issues callbacks to consumers
+ * in order to trigger spilling when running low on memory.
+ * - [[org.apache.spark.memory.MemoryPool]]s are a bookkeeping abstraction used by the
+ * MemoryManager to track the division of memory between storage and execution.
+ *
+ * Diagrammatically:
+ *
+ * {{{
+ * +-------------+
+ * | MemConsumer |----+ +------------------------+
+ * +-------------+ | +-------------------+ | MemoryManager |
+ * +--->| TaskMemoryManager |----+ | |
+ * +-------------+ | +-------------------+ | | +------------------+ |
+ * | MemConsumer |----+ | | | StorageMemPool | |
+ * +-------------+ +-------------------+ | | +------------------+ |
+ * | TaskMemoryManager |----+ | |
+ * +-------------------+ | | +------------------+ |
+ * +---->| |OnHeapExecMemPool | |
+ * * | | +------------------+ |
+ * * | | |
+ * +-------------+ * | | +------------------+ |
+ * | MemConsumer |----+ | | |OffHeapExecMemPool| |
+ * +-------------+ | +-------------------+ | | +------------------+ |
+ * +--->| TaskMemoryManager |----+ | |
+ * +-------------------+ +------------------------+
+ * }}}
+ *
+ *
+ * There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how
+ * they handle the sizing of their memory pools:
+ *
+ * - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft
+ * boundaries between storage and execution memory, allowing requests for memory in one region
+ * to be fulfilled by borrowing memory from the other.
+ * - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage
+ * and execution memory by statically partitioning Spark's memory and preventing storage and
+ * execution from borrowing memory from each other. This mode is retained only for legacy
+ * compatibility purposes.
+ */
+package object memory
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9e002621a6..3a48af82b1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.collection
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.{Logging, SparkEnv}
/**
@@ -78,7 +78,8 @@ private[spark] trait Spillable[C] extends Logging {
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
- val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, null)
+ val granted =
+ taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
@@ -107,7 +108,8 @@ private[spark] trait Spillable[C] extends Logging {
*/
def releaseMemory(): Unit = {
// The amount we requested does not include the initial memory tracking threshold
- taskMemoryManager.releaseExecutionMemory(myMemoryThreshold - initialMemoryThreshold, null)
+ taskMemoryManager.releaseExecutionMemory(
+ myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
myMemoryThreshold = initialMemoryThreshold
}
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index c731317395..711eed0193 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -28,8 +28,14 @@ public class TaskMemoryManagerSuite {
@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new StaticMemoryManager(
+ new SparkConf().set("spark.unsafe.offHeap", "false"),
+ Long.MAX_VALUE,
+ Long.MAX_VALUE,
+ 1),
+ 0);
manager.allocatePage(4096, null); // leak memory
+ Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
}
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 8ae3642738..e6e16fff80 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -32,13 +32,19 @@ public class TestMemoryConsumer extends MemoryConsumer {
}
void use(long size) {
- long got = taskMemoryManager.acquireExecutionMemory(size, this);
+ long got = taskMemoryManager.acquireExecutionMemory(
+ size,
+ taskMemoryManager.tungstenMemoryMode,
+ this);
used += got;
}
void free(long size) {
used -= size;
- taskMemoryManager.releaseExecutionMemory(size, this);
+ taskMemoryManager.releaseExecutionMemory(
+ size,
+ taskMemoryManager.tungstenMemoryMode,
+ this);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 4763395d7d..0e0eca515a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -423,7 +423,7 @@ public class UnsafeShuffleWriterSuite {
memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
- for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
+ for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 92bd45e5fa..3bca790f30 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -83,7 +83,9 @@ public abstract class AbstractBytesToBytesMapSuite {
public void setup() {
memoryManager =
new TestMemoryManager(
- new SparkConf().set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator()));
+ new SparkConf()
+ .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
+ .set("spark.memory.offHeapSize", "256mb"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 4a9479cf49..f55d435fa3 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -29,7 +30,7 @@ import org.mockito.stubbing.Answer
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.MemoryStore
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
/**
@@ -78,7 +79,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
val numBytes = args(numBytesPos).asInstanceOf[Long]
- mockEnsureFreeSpace(mm, numBytes)
+ val success = mockEnsureFreeSpace(mm, numBytes)
+ if (success) {
+ args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+ }
+ success
}
}
}
@@ -132,93 +138,95 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
}
/**
- * Create a MemoryManager with the specified execution memory limit and no storage memory.
+ * Create a MemoryManager with the specified execution memory limits and no storage memory.
*/
- protected def createMemoryManager(maxExecutionMemory: Long): MemoryManager
+ protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long = 0L): MemoryManager
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
implicit val ec = ExecutionContext.global
- test("single task requesting execution memory") {
+ test("single task requesting on-heap execution memory") {
val manager = createMemoryManager(1000L)
val taskMemoryManager = new TaskMemoryManager(manager, 0)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 100L)
- assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
- assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
- assert(taskMemoryManager.acquireExecutionMemory(200L, null) === 100L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(200L, MemoryMode.ON_HEAP, null) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
- taskMemoryManager.releaseExecutionMemory(500L, null)
- assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 300L)
- assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 200L)
+ taskMemoryManager.releaseExecutionMemory(500L, MemoryMode.ON_HEAP, null)
+ assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 300L)
+ assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 200L)
taskMemoryManager.cleanUpAllAllocatedMemory()
- assert(taskMemoryManager.acquireExecutionMemory(1000L, null) === 1000L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) === 1000L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
}
- test("two tasks requesting full execution memory") {
+ test("two tasks requesting full on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// Have both tasks request 500 bytes, then wait until both requests have been granted:
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 500L)
assert(Await.result(t2Result1, futureTimeout) === 500L)
// Have both tasks each request 500 bytes more; both should immediately return 0 as they are
// both now at 1 / N
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, 200.millis) === 0L)
assert(Await.result(t2Result2, 200.millis) === 0L)
}
- test("two tasks cannot grow past 1 / N of execution memory") {
+ test("two tasks cannot grow past 1 / N of on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// Have both tasks request 250 bytes, then wait until both requests have been granted:
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, null) }
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 250L)
assert(Await.result(t2Result1, futureTimeout) === 250L)
// Have both tasks each request 500 bytes more.
// We should only grant 250 bytes to each of them on this second request
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, futureTimeout) === 250L)
assert(Await.result(t2Result2, futureTimeout) === 250L)
}
- test("tasks can block to get at least 1 / 2N of execution memory") {
+ test("tasks can block to get at least 1 / 2N of on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 1000L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
- t1MemManager.releaseExecutionMemory(250L, null)
+ t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null)
// The memory freed from t1 should now be granted to t2.
assert(Await.result(t2Result1, futureTimeout) === 250L)
// Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory.
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result2, 200.millis) === 0L)
}
@@ -229,18 +237,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
val futureTimeout: Duration = 20.seconds
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 1000L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
// t1 releases all of its memory, so t2 should be able to grab all of the memory
t1MemManager.cleanUpAllAllocatedMemory()
assert(Await.result(t2Result1, futureTimeout) === 500L)
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result2, futureTimeout) === 500L)
- val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result3, 200.millis) === 0L)
}
@@ -251,15 +259,35 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 700L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result1, futureTimeout) === 300L)
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, 200.millis) === 0L)
}
+
+ test("off-heap execution allocations cannot exceed limit") {
+ val memoryManager = createMemoryManager(
+ maxOnHeapExecutionMemory = 0L,
+ maxOffHeapExecutionMemory = 1000L)
+
+ val tMemManager = new TaskMemoryManager(memoryManager, 1)
+ val result1 = Future { tMemManager.acquireExecutionMemory(1000L, MemoryMode.OFF_HEAP, null) }
+ assert(Await.result(result1, 200.millis) === 1000L)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+
+ val result2 = Future { tMemManager.acquireExecutionMemory(300L, MemoryMode.OFF_HEAP, null) }
+ assert(Await.result(result2, 200.millis) === 0L)
+
+ assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+ tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 500L)
+ tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
+ }
}
private object MemoryManagerSuite {
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 885c450d6d..54cb28c389 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Mockito.when
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
maxExecutionMem: Long,
maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1)
+ conf,
+ maxOnHeapExecutionMemory = maxExecutionMem,
+ maxStorageMemory = maxStorageMem,
+ numCores = 1)
val ms = makeMemoryStore(mm)
(mm, ms)
}
- override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
+ override protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
new StaticMemoryManager(
- conf,
- maxExecutionMemory = maxMemory,
+ conf.clone
+ .set("spark.memory.fraction", "1")
+ .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+ .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+ maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxStorageMemory = 0,
numCores = 1)
}
test("basic execution memory") {
val maxExecutionMem = 1000L
+ val taskAttemptId = 0L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
// Acquire up to the max
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- mm.releaseExecutionMemory(800L)
+ mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxExecutionMem)
+ mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 0L)
}
@@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
test("execution and storage isolation") {
val maxExecutionMem = 200L
val maxStorageMem = 1000L
+ val taskAttemptId = 0L
val dummyBlock = TestBlockId("ain't nobody love like you do")
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
@@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
- mm.releaseExecutionMemory(133L)
+ mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 77e43554ee..0706a6e45d 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -22,19 +22,20 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockStatus, BlockId}
-class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = 1) {
- private[memory] override def doAcquireExecutionMemory(
+class TestMemoryManager(conf: SparkConf)
+ extends MemoryManager(conf, numCores = 1, Long.MaxValue, Long.MaxValue) {
+
+ override private[memory] def acquireExecutionMemory(
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = {
if (oomOnce) {
oomOnce = false
0
} else if (available >= numBytes) {
- _executionMemoryUsed += numBytes // To suppress warnings when freeing unallocated memory
available -= numBytes
numBytes
} else {
- _executionMemoryUsed += available
val grant = available
available = 0
grant
@@ -48,12 +49,13 @@ class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores =
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = {
+ override def releaseStorageMemory(numBytes: Long): Unit = {}
+ override private[memory] def releaseExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Unit = {
available += numBytes
- _executionMemoryUsed -= numBytes
}
- override def releaseStorageMemory(numBytes: Long): Unit = {}
- override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
private var oomOnce = false
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 0c97f2bd89..8cebe81c3b 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -24,57 +24,52 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
- private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
private val dummyBlock = TestBlockId("--")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ private val storageFraction: Double = 0.5
+
/**
* Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
*/
private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
- val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
+ val mm = createMemoryManager(maxMemory)
val ms = makeMemoryStore(mm)
(mm, ms)
}
- override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
- new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
- }
-
- private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
- mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
- }
-
- test("storage region size") {
- val maxMemory = 1000L
- val (mm, _) = makeThings(maxMemory)
- val storageFraction = conf.get("spark.memory.storageFraction").toDouble
- val expectedStorageRegionSize = maxMemory * storageFraction
- val actualStorageRegionSize = getStorageRegionSize(mm)
- assert(expectedStorageRegionSize === actualStorageRegionSize)
+ override protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", "1")
+ .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+ .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
+ .set("spark.memory.storageFraction", storageFraction.toString)
+ UnifiedMemoryManager(conf, numCores = 1)
}
test("basic execution memory") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, _) = makeThings(maxMemory)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
// Acquire up to the max
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
assert(mm.executionMemoryUsed === maxMemory)
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
assert(mm.executionMemoryUsed === maxMemory)
- mm.releaseExecutionMemory(800L)
+ mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxMemory)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 0L)
}
@@ -118,44 +113,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("execution evicts storage") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
- // First, ensure the test classes are set up as expected
- val expectedStorageRegionSize = 500L
- val expectedExecutionRegionSize = 500L
- val storageRegionSize = getStorageRegionSize(mm)
- val executionRegionSize = maxMemory - expectedStorageRegionSize
- require(storageRegionSize === expectedStorageRegionSize,
- "bad test: storage region size is unexpected")
- require(executionRegionSize === expectedExecutionRegionSize,
- "bad test: storage region size is unexpected")
// Acquire enough storage memory to exceed the storage region
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 750L)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
- require(mm.storageMemoryUsed > storageRegionSize,
- s"bad test: storage memory used should exceed the storage region")
// Execution needs to request 250 bytes to evict storage memory
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
assertEnsureFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
- assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
- assertEnsureFreeSpaceCalled(ms, 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
+ assert(mm.executionMemoryUsed === 300L)
+ assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.executionMemoryUsed === 300L)
mm.releaseAllStorageMemory()
- require(mm.executionMemoryUsed < executionRegionSize,
- s"bad test: execution memory used should be within the execution region")
+ require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 400L)
- require(mm.storageMemoryUsed < storageRegionSize,
- s"bad test: storage memory used should be within the storage region")
+ assert(mm.storageMemoryUsed === 400L)
+ assert(mm.executionMemoryUsed === 300L)
// Execution cannot evict storage because the latter is within the storage fraction,
// so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
- assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
assertEnsureFreeSpaceNotCalled(ms)
@@ -163,23 +148,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("storage does not evict execution") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
- // First, ensure the test classes are set up as expected
- val expectedStorageRegionSize = 500L
- val expectedExecutionRegionSize = 500L
- val storageRegionSize = getStorageRegionSize(mm)
- val executionRegionSize = maxMemory - expectedStorageRegionSize
- require(storageRegionSize === expectedStorageRegionSize,
- "bad test: storage region size is unexpected")
- require(executionRegionSize === expectedExecutionRegionSize,
- "bad test: storage region size is unexpected")
// Acquire enough execution memory to exceed the execution region
- assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
- require(mm.executionMemoryUsed > executionRegionSize,
- s"bad test: execution memory used should exceed the execution region")
// Storage should not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
@@ -189,15 +164,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
assertEnsureFreeSpaceCalled(ms, 250L)
- mm.releaseExecutionMemory(maxMemory)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(maxMemory)
// Acquire some execution memory again, but this time keep it within the execution region
- assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
- require(mm.executionMemoryUsed < executionRegionSize,
- s"bad test: execution memory used should be within the execution region")
// Storage should still not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d49015afcd..53991d8a1a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,7 +825,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memoryManager = new StaticMemoryManager(
conf,
- maxExecutionMemory = Long.MaxValue,
+ maxOnHeapExecutionMemory = Long.MaxValue,
maxStorageMemory = 1200,
numCores = 1)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,