aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryConsumer.java7
-rw-r--r--core/src/main/java/org/apache/spark/memory/MemoryMode.java26
-rw-r--r--core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java72
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala153
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala246
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryPool.scala71
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala138
-rw-r--r--core/src/main/scala/org/apache/spark/memory/package.scala75
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/Spillable.scala8
-rw-r--r--core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java8
-rw-r--r--core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java10
-rw-r--r--core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java2
-rw-r--r--core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java4
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala104
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala39
-rw-r--r--core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala20
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala93
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala2
21 files changed, 828 insertions, 465 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
index 8fbdb72832..36138cc9a2 100644
--- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
+++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java
@@ -17,15 +17,15 @@
package org.apache.spark.memory;
-
import java.io.IOException;
import org.apache.spark.unsafe.array.LongArray;
import org.apache.spark.unsafe.memory.MemoryBlock;
-
/**
* An memory consumer of TaskMemoryManager, which support spilling.
+ *
+ * Note: this only supports allocation / spilling of Tungsten memory.
*/
public abstract class MemoryConsumer {
@@ -36,7 +36,6 @@ public abstract class MemoryConsumer {
protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) {
this.taskMemoryManager = taskMemoryManager;
this.pageSize = pageSize;
- this.used = 0;
}
protected MemoryConsumer(TaskMemoryManager taskMemoryManager) {
@@ -67,6 +66,8 @@ public abstract class MemoryConsumer {
*
* Note: In order to avoid possible deadlock, should not call acquireMemory() from spill().
*
+ * Note: today, this only frees Tungsten-managed pages.
+ *
* @param size the amount of memory should be released
* @param trigger the MemoryConsumer that trigger this spilling
* @return the amount of released memory in bytes
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
new file mode 100644
index 0000000000..3a5e72d8aa
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory;
+
+import org.apache.spark.annotation.Private;
+
+@Private
+public enum MemoryMode {
+ ON_HEAP,
+ OFF_HEAP
+}
diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
index 6440f9c0f3..5f743b2885 100644
--- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
+++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java
@@ -103,10 +103,10 @@ public class TaskMemoryManager {
* without doing any masking or lookups. Since this branching should be well-predicted by the JIT,
* this extra layer of indirection / abstraction hopefully shouldn't be too expensive.
*/
- private final boolean inHeap;
+ final MemoryMode tungstenMemoryMode;
/**
- * The size of memory granted to each consumer.
+ * Tracks spillable memory consumers.
*/
@GuardedBy("this")
private final HashSet<MemoryConsumer> consumers;
@@ -115,7 +115,7 @@ public class TaskMemoryManager {
* Construct a new TaskMemoryManager.
*/
public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) {
- this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap();
+ this.tungstenMemoryMode = memoryManager.tungstenMemoryMode();
this.memoryManager = memoryManager;
this.taskAttemptId = taskAttemptId;
this.consumers = new HashSet<>();
@@ -127,12 +127,19 @@ public class TaskMemoryManager {
*
* @return number of bytes successfully granted (<= N).
*/
- public long acquireExecutionMemory(long required, MemoryConsumer consumer) {
+ public long acquireExecutionMemory(
+ long required,
+ MemoryMode mode,
+ MemoryConsumer consumer) {
assert(required >= 0);
+ // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap
+ // memory here, then it may not make sense to spill since that would only end up freeing
+ // off-heap memory. This is subject to change, though, so it may be risky to make this
+ // optimization now in case we forget to undo it late when making changes.
synchronized (this) {
- long got = memoryManager.acquireExecutionMemory(required, taskAttemptId);
+ long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode);
- // try to release memory from other consumers first, then we can reduce the frequency of
+ // Try to release memory from other consumers first, then we can reduce the frequency of
// spilling, avoid to have too many spilled files.
if (got < required) {
// Call spill() on other consumers to release memory
@@ -140,10 +147,10 @@ public class TaskMemoryManager {
if (c != consumer && c.getUsed() > 0) {
try {
long released = c.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from {} for {}", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from {} for {}", taskAttemptId,
Utils.bytesToString(released), c, consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
if (got >= required) {
break;
}
@@ -161,10 +168,10 @@ public class TaskMemoryManager {
if (got < required && consumer != null) {
try {
long released = consumer.spill(required - got, consumer);
- if (released > 0) {
- logger.info("Task {} released {} from itself ({})", taskAttemptId,
+ if (released > 0 && mode == tungstenMemoryMode) {
+ logger.debug("Task {} released {} from itself ({})", taskAttemptId,
Utils.bytesToString(released), consumer);
- got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId);
+ got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode);
}
} catch (IOException e) {
logger.error("error while calling spill() on " + consumer, e);
@@ -184,9 +191,9 @@ public class TaskMemoryManager {
/**
* Release N bytes of execution memory for a MemoryConsumer.
*/
- public void releaseExecutionMemory(long size, MemoryConsumer consumer) {
+ public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) {
logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer);
- memoryManager.releaseExecutionMemory(size, taskAttemptId);
+ memoryManager.releaseExecutionMemory(size, taskAttemptId, mode);
}
/**
@@ -195,11 +202,19 @@ public class TaskMemoryManager {
public void showMemoryUsage() {
logger.info("Memory used in task " + taskAttemptId);
synchronized (this) {
+ long memoryAccountedForByConsumers = 0;
for (MemoryConsumer c: consumers) {
- if (c.getUsed() > 0) {
- logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed()));
+ long totalMemUsage = c.getUsed();
+ memoryAccountedForByConsumers += totalMemUsage;
+ if (totalMemUsage > 0) {
+ logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage));
}
}
+ long memoryNotAccountedFor =
+ memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers;
+ logger.info(
+ "{} bytes of memory were used by task {} but are not associated with specific consumers",
+ memoryNotAccountedFor, taskAttemptId);
}
}
@@ -214,7 +229,8 @@ public class TaskMemoryManager {
* Allocate a block of memory that will be tracked in the MemoryManager's page table; this is
* intended for allocating large blocks of Tungsten memory that will be shared between operators.
*
- * Returns `null` if there was not enough memory to allocate the page.
+ * Returns `null` if there was not enough memory to allocate the page. May return a page that
+ * contains fewer bytes than requested, so callers should verify the size of returned pages.
*/
public MemoryBlock allocatePage(long size, MemoryConsumer consumer) {
if (size > MAXIMUM_PAGE_SIZE_BYTES) {
@@ -222,7 +238,7 @@ public class TaskMemoryManager {
"Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes");
}
- long acquired = acquireExecutionMemory(size, consumer);
+ long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer);
if (acquired <= 0) {
return null;
}
@@ -231,7 +247,7 @@ public class TaskMemoryManager {
synchronized (this) {
pageNumber = allocatedPages.nextClearBit(0);
if (pageNumber >= PAGE_TABLE_SIZE) {
- releaseExecutionMemory(acquired, consumer);
+ releaseExecutionMemory(acquired, tungstenMemoryMode, consumer);
throw new IllegalStateException(
"Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages");
}
@@ -262,7 +278,7 @@ public class TaskMemoryManager {
}
long pageSize = page.size();
memoryManager.tungstenMemoryAllocator().free(page);
- releaseExecutionMemory(pageSize, consumer);
+ releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer);
}
/**
@@ -276,7 +292,7 @@ public class TaskMemoryManager {
* @return an encoded page address.
*/
public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) {
- if (!inHeap) {
+ if (tungstenMemoryMode == MemoryMode.OFF_HEAP) {
// In off-heap mode, an offset is an absolute address that may require a full 64 bits to
// encode. Due to our page size limitation, though, we can convert this into an offset that's
// relative to the page's base offset; this relative offset will fit in 51 bits.
@@ -305,7 +321,7 @@ public class TaskMemoryManager {
* {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)}
*/
public Object getPage(long pagePlusOffsetAddress) {
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
final int pageNumber = decodePageNumber(pagePlusOffsetAddress);
assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE);
final MemoryBlock page = pageTable[pageNumber];
@@ -323,7 +339,7 @@ public class TaskMemoryManager {
*/
public long getOffsetInPage(long pagePlusOffsetAddress) {
final long offsetInPage = decodeOffset(pagePlusOffsetAddress);
- if (inHeap) {
+ if (tungstenMemoryMode == MemoryMode.ON_HEAP) {
return offsetInPage;
} else {
// In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we
@@ -351,11 +367,19 @@ public class TaskMemoryManager {
}
consumers.clear();
}
+
+ for (MemoryBlock page : pageTable) {
+ if (page != null) {
+ memoryManager.tungstenMemoryAllocator().free(page);
+ }
+ }
+ Arrays.fill(pageTable, null);
+
return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId);
}
/**
- * Returns the memory consumption, in bytes, for the current task
+ * Returns the memory consumption, in bytes, for the current task.
*/
public long getMemoryConsumptionForThisTask() {
return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId);
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 23ae9360f6..4474a83bed 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -341,7 +341,7 @@ object SparkEnv extends Logging {
if (useLegacyMemoryManager) {
new StaticMemoryManager(conf, numUsableCores)
} else {
- new UnifiedMemoryManager(conf, numUsableCores)
+ UnifiedMemoryManager(conf, numUsableCores)
}
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
new file mode 100644
index 0000000000..7825bae425
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+
+import org.apache.spark.Logging
+
+/**
+ * Implements policies and bookkeeping for sharing a adjustable-sized pool of memory between tasks.
+ *
+ * Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up
+ * to a large amount first and then causing others to spill to disk repeatedly.
+ *
+ * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
+ * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
+ * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this
+ * set changes. This is all done by synchronizing access to mutable state and using wait() and
+ * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
+ * tasks was performed by the ShuffleMemoryManager.
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ * @param poolName a human-readable name for this pool, for use in log messages
+ */
+class ExecutionMemoryPool(
+ lock: Object,
+ poolName: String
+ ) extends MemoryPool(lock) with Logging {
+
+ /**
+ * Map from taskAttemptId -> memory consumption in bytes
+ */
+ @GuardedBy("lock")
+ private val memoryForTask = new mutable.HashMap[Long, Long]()
+
+ override def memoryUsed: Long = lock.synchronized {
+ memoryForTask.values.sum
+ }
+
+ /**
+ * Returns the memory consumption, in bytes, for the given task.
+ */
+ def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized {
+ memoryForTask.getOrElse(taskAttemptId, 0L)
+ }
+
+ /**
+ * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes
+ * obtained, or 0 if none can be allocated.
+ *
+ * This call may block until there is enough free memory in some situations, to make sure each
+ * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
+ * active tasks) before it is forced to spill. This can happen if the number of tasks increase
+ * but an older task had a lot of memory already.
+ *
+ * @return the number of bytes granted to the task.
+ */
+ def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized {
+ assert(numBytes > 0, s"invalid number of bytes requested: $numBytes")
+
+ // Add this task to the taskMemory map just so we can keep an accurate count of the number
+ // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory`
+ if (!memoryForTask.contains(taskAttemptId)) {
+ memoryForTask(taskAttemptId) = 0L
+ // This will later cause waiting tasks to wake up and check numTasks again
+ lock.notifyAll()
+ }
+
+ // Keep looping until we're either sure that we don't want to grant this request (because this
+ // task would have more than 1 / numActiveTasks of the memory) or we have enough free
+ // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
+ // TODO: simplify this to limit each task to its own slot
+ while (true) {
+ val numActiveTasks = memoryForTask.keys.size
+ val curMem = memoryForTask(taskAttemptId)
+
+ // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
+ // don't let it be negative
+ val maxToGrant =
+ math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem))
+ // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
+ val toGrant = math.min(maxToGrant, memoryFree)
+
+ if (curMem < poolSize / (2 * numActiveTasks)) {
+ // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
+ // if we can't give it this much now, wait for other tasks to free up memory
+ // (this happens if older tasks allocated lots of memory before N grew)
+ if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) {
+ memoryForTask(taskAttemptId) += toGrant
+ return toGrant
+ } else {
+ logInfo(
+ s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
+ lock.wait()
+ }
+ } else {
+ memoryForTask(taskAttemptId) += toGrant
+ return toGrant
+ }
+ }
+ 0L // Never reached
+ }
+
+ /**
+ * Release `numBytes` of memory acquired by the given task.
+ */
+ def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
+ val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
+ var memoryToFree = if (curMem < numBytes) {
+ logWarning(
+ s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " +
+ s"of memory from the $poolName pool")
+ curMem
+ } else {
+ numBytes
+ }
+ if (memoryForTask.contains(taskAttemptId)) {
+ memoryForTask(taskAttemptId) -= memoryToFree
+ if (memoryForTask(taskAttemptId) <= 0) {
+ memoryForTask.remove(taskAttemptId)
+ }
+ }
+ lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed
+ }
+
+ /**
+ * Release all memory for the given task and mark it as inactive (e.g. when a task ends).
+ * @return the number of bytes freed.
+ */
+ def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized {
+ val numBytesToFree = getMemoryUsageForTask(taskAttemptId)
+ releaseMemory(numBytesToFree, taskAttemptId)
+ numBytesToFree
+ }
+
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index b0cf2696a3..ceb8ea434e 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -20,12 +20,8 @@ package org.apache.spark.memory
import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-import com.google.common.annotations.VisibleForTesting
-
-import org.apache.spark.util.Utils
-import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging}
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
import org.apache.spark.unsafe.array.ByteArrayMethods
import org.apache.spark.unsafe.memory.MemoryAllocator
@@ -36,53 +32,40 @@ import org.apache.spark.unsafe.memory.MemoryAllocator
* In this context, execution memory refers to that used for computation in shuffles, joins,
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one MemoryManager per JVM.
- *
- * The MemoryManager abstract base class itself implements policies for sharing execution memory
- * between tasks; it tries to ensure that each task gets a reasonable share of memory, instead of
- * some task ramping up to a large amount first and then causing others to spill to disk repeatedly.
- * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory
- * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
- * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
- * this set changes. This is all done by synchronizing access to mutable state and using wait() and
- * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across
- * tasks was performed by the ShuffleMemoryManager.
*/
-private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) extends Logging {
+private[spark] abstract class MemoryManager(
+ conf: SparkConf,
+ numCores: Int,
+ storageMemory: Long,
+ onHeapExecutionMemory: Long) extends Logging {
// -- Methods related to memory allocation policies and bookkeeping ------------------------------
- // The memory store used to evict cached blocks
- private var _memoryStore: MemoryStore = _
- protected def memoryStore: MemoryStore = {
- if (_memoryStore == null) {
- throw new IllegalArgumentException("memory store not initialized yet")
- }
- _memoryStore
- }
+ @GuardedBy("this")
+ protected val storageMemoryPool = new StorageMemoryPool(this)
+ @GuardedBy("this")
+ protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution")
+ @GuardedBy("this")
+ protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution")
- // Amount of execution/storage memory in use, accesses must be synchronized on `this`
- @GuardedBy("this") protected var _executionMemoryUsed: Long = 0
- @GuardedBy("this") protected var _storageMemoryUsed: Long = 0
- // Map from taskAttemptId -> memory consumption in bytes
- @GuardedBy("this") private val executionMemoryForTask = new mutable.HashMap[Long, Long]()
-
- /**
- * Set the [[MemoryStore]] used by this manager to evict cached blocks.
- * This must be set after construction due to initialization ordering constraints.
- */
- final def setMemoryStore(store: MemoryStore): Unit = {
- _memoryStore = store
- }
+ storageMemoryPool.incrementPoolSize(storageMemory)
+ onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory)
+ offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0))
/**
- * Total available memory for execution, in bytes.
+ * Total available memory for storage, in bytes. This amount can vary over time, depending on
+ * the MemoryManager implementation.
+ * In this model, this is equivalent to the amount of memory not occupied by execution.
*/
- def maxExecutionMemory: Long
+ def maxStorageMemory: Long
/**
- * Total available memory for storage, in bytes.
+ * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+ * This must be set after construction due to initialization ordering constraints.
*/
- def maxStorageMemory: Long
+ final def setMemoryStore(store: MemoryStore): Unit = synchronized {
+ storageMemoryPool.setMemoryStore(store)
+ }
// TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
@@ -94,7 +77,9 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
+ }
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
@@ -109,103 +94,25 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes, evictedBlocks)
- }
-
- /**
- * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return number of bytes successfully granted (<= N).
- */
- @VisibleForTesting
- private[memory] def doAcquireExecutionMemory(
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
/**
- * Try to acquire up to `numBytes` of execution memory for the current task and return the number
- * of bytes obtained, or 0 if none can be allocated.
+ * Try to acquire up to `numBytes` of execution memory for the current task and return the
+ * number of bytes obtained, or 0 if none can be allocated.
*
* This call may block until there is enough free memory in some situations, to make sure each
* task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
* active tasks) before it is forced to spill. This can happen if the number of tasks increase
* but an older task had a lot of memory already.
- *
- * Subclasses should override `doAcquireExecutionMemory` in order to customize the policies
- * that control global sharing of memory between execution and storage.
*/
private[memory]
- final def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long): Long = synchronized {
- assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
-
- // Add this task to the taskMemory map just so we can keep an accurate count of the number
- // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
- if (!executionMemoryForTask.contains(taskAttemptId)) {
- executionMemoryForTask(taskAttemptId) = 0L
- // This will later cause waiting tasks to wake up and check numTasks again
- notifyAll()
- }
-
- // Once the cross-task memory allocation policy has decided to grant more memory to a task,
- // this method is called in order to actually obtain that execution memory, potentially
- // triggering eviction of storage memory:
- def acquire(toGrant: Long): Long = synchronized {
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
- val acquired = doAcquireExecutionMemory(toGrant, evictedBlocks)
- // Register evicted blocks, if any, with the active task metrics
- Option(TaskContext.get()).foreach { tc =>
- val metrics = tc.taskMetrics()
- val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
- metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
- }
- executionMemoryForTask(taskAttemptId) += acquired
- acquired
- }
-
- // Keep looping until we're either sure that we don't want to grant this request (because this
- // task would have more than 1 / numActiveTasks of the memory) or we have enough free
- // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
- // TODO: simplify this to limit each task to its own slot
- while (true) {
- val numActiveTasks = executionMemoryForTask.keys.size
- val curMem = executionMemoryForTask(taskAttemptId)
- val freeMemory = maxExecutionMemory - executionMemoryForTask.values.sum
-
- // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
- // don't let it be negative
- val maxToGrant =
- math.min(numBytes, math.max(0, (maxExecutionMemory / numActiveTasks) - curMem))
- // Only give it as much memory as is free, which might be none if it reached 1 / numTasks
- val toGrant = math.min(maxToGrant, freeMemory)
-
- if (curMem < maxExecutionMemory / (2 * numActiveTasks)) {
- // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
- // if we can't give it this much now, wait for other tasks to free up memory
- // (this happens if older tasks allocated lots of memory before N grew)
- if (
- freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * numActiveTasks) - curMem)) {
- return acquire(toGrant)
- } else {
- logInfo(
- s"TID $taskAttemptId waiting for at least 1/2N of execution memory pool to be free")
- wait()
- }
- } else {
- return acquire(toGrant)
- }
- }
- 0L // Never reached
- }
-
- @VisibleForTesting
- private[memory] def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _executionMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of execution " +
- s"memory when we only have ${_executionMemoryUsed} bytes")
- _executionMemoryUsed = 0
- } else {
- _executionMemoryUsed -= numBytes
+ def acquireExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
}
}
@@ -213,24 +120,14 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* Release numBytes of execution memory belonging to the given task.
*/
private[memory]
- final def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long): Unit = synchronized {
- val curMem = executionMemoryForTask.getOrElse(taskAttemptId, 0L)
- if (curMem < numBytes) {
- if (Utils.isTesting) {
- throw new SparkException(
- s"Internal error: release called on $numBytes bytes but task only has $curMem")
- } else {
- logWarning(s"Internal error: release called on $numBytes bytes but task only has $curMem")
- }
- }
- if (executionMemoryForTask.contains(taskAttemptId)) {
- executionMemoryForTask(taskAttemptId) -= numBytes
- if (executionMemoryForTask(taskAttemptId) <= 0) {
- executionMemoryForTask.remove(taskAttemptId)
- }
- releaseExecutionMemory(numBytes)
+ def releaseExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Unit = synchronized {
+ memoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId)
}
- notifyAll() // Notify waiters in acquireExecutionMemory() that memory has been freed
}
/**
@@ -238,35 +135,28 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* @return the number of bytes freed.
*/
private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized {
- val numBytesToFree = getExecutionMemoryUsageForTask(taskAttemptId)
- releaseExecutionMemory(numBytesToFree, taskAttemptId)
- numBytesToFree
+ onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) +
+ offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId)
}
/**
* Release N bytes of storage memory.
*/
def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _storageMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of storage " +
- s"memory when we only have ${_storageMemoryUsed} bytes")
- _storageMemoryUsed = 0
- } else {
- _storageMemoryUsed -= numBytes
- }
+ storageMemoryPool.releaseMemory(numBytes)
}
/**
* Release all storage memory acquired.
*/
- def releaseAllStorageMemory(): Unit = synchronized {
- _storageMemoryUsed = 0
+ final def releaseAllStorageMemory(): Unit = synchronized {
+ storageMemoryPool.releaseAllMemory()
}
/**
* Release N bytes of unroll memory.
*/
- def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+ final def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
releaseStorageMemory(numBytes)
}
@@ -274,26 +164,35 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
* Execution memory currently in use, in bytes.
*/
final def executionMemoryUsed: Long = synchronized {
- _executionMemoryUsed
+ onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed
}
/**
* Storage memory currently in use, in bytes.
*/
final def storageMemoryUsed: Long = synchronized {
- _storageMemoryUsed
+ storageMemoryPool.memoryUsed
}
/**
* Returns the execution memory consumption, in bytes, for the given task.
*/
private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized {
- executionMemoryForTask.getOrElse(taskAttemptId, 0L)
+ onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) +
+ offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId)
}
// -- Fields related to Tungsten managed memory -------------------------------------------------
/**
+ * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
+ * sun.misc.Unsafe.
+ */
+ final val tungstenMemoryMode: MemoryMode = {
+ if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP
+ }
+
+ /**
* The default page size, in bytes.
*
* If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value
@@ -306,21 +205,22 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte
val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors()
// Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case
val safetyFactor = 16
- val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / safetyFactor)
+ val maxTungstenMemory: Long = tungstenMemoryMode match {
+ case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize
+ case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize
+ }
+ val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor)
val default = math.min(maxPageSize, math.max(minPageSize, size))
conf.getSizeAsBytes("spark.buffer.pageSize", default)
}
/**
- * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using
- * sun.misc.Unsafe.
- */
- final val tungstenMemoryIsAllocatedInHeap: Boolean =
- !conf.getBoolean("spark.unsafe.offHeap", false)
-
- /**
* Allocates memory for use by Unsafe/Tungsten code.
*/
- private[memory] final val tungstenMemoryAllocator: MemoryAllocator =
- if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else MemoryAllocator.UNSAFE
+ private[memory] final val tungstenMemoryAllocator: MemoryAllocator = {
+ tungstenMemoryMode match {
+ case MemoryMode.ON_HEAP => MemoryAllocator.HEAP
+ case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE
+ }
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
new file mode 100644
index 0000000000..bfeec47e38
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+/**
+ * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to
+ * the [[MemoryManager]]. See subclasses for more details.
+ *
+ * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type
+ * to `Object` to avoid programming errors, since this object should only be used for
+ * synchronization purposes.
+ */
+abstract class MemoryPool(lock: Object) {
+
+ @GuardedBy("lock")
+ private[this] var _poolSize: Long = 0
+
+ /**
+ * Returns the current size of the pool, in bytes.
+ */
+ final def poolSize: Long = lock.synchronized {
+ _poolSize
+ }
+
+ /**
+ * Returns the amount of free memory in the pool, in bytes.
+ */
+ final def memoryFree: Long = lock.synchronized {
+ _poolSize - memoryUsed
+ }
+
+ /**
+ * Expands the pool by `delta` bytes.
+ */
+ final def incrementPoolSize(delta: Long): Unit = lock.synchronized {
+ require(delta >= 0)
+ _poolSize += delta
+ }
+
+ /**
+ * Shrinks the pool by `delta` bytes.
+ */
+ final def decrementPoolSize(delta: Long): Unit = lock.synchronized {
+ require(delta >= 0)
+ require(delta <= _poolSize)
+ require(_poolSize - delta >= memoryUsed)
+ _poolSize -= delta
+ }
+
+ /**
+ * Returns the amount of used memory in this pool (in bytes).
+ */
+ def memoryUsed: Long
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 9c2c2e90a2..12a0943068 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
-
/**
* A [[MemoryManager]] that statically partitions the heap space into disjoint regions.
*
@@ -32,10 +31,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus}
*/
private[spark] class StaticMemoryManager(
conf: SparkConf,
- override val maxExecutionMemory: Long,
+ maxOnHeapExecutionMemory: Long,
override val maxStorageMemory: Long,
numCores: Int)
- extends MemoryManager(conf, numCores) {
+ extends MemoryManager(
+ conf,
+ numCores,
+ maxStorageMemory,
+ maxOnHeapExecutionMemory) {
def this(conf: SparkConf, numCores: Int) {
this(
@@ -50,76 +53,15 @@ private[spark] class StaticMemoryManager(
(maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
}
- /**
- * Acquire N bytes of memory for execution.
- * @return number of bytes successfully granted (<= N).
- */
- override def doAcquireExecutionMemory(
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
- assert(numBytes >= 0)
- assert(_executionMemoryUsed <= maxExecutionMemory)
- val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
- _executionMemoryUsed += bytesToGrant
- bytesToGrant
- }
-
- /**
- * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return whether all N bytes were successfully granted.
- */
- override def acquireStorageMemory(
- blockId: BlockId,
- numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
- }
-
- /**
- * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
- *
- * This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage
- * space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any,
- * are added to `evictedBlocks`.
- *
- * @return whether all N bytes were successfully granted.
- */
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- val currentUnrollMemory = memoryStore.currentUnrollMemory
+ val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
- acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
+ storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks)
}
-
- /**
- * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
- *
- * @param blockId the ID of the block we are acquiring storage memory for
- * @param numBytesToAcquire the size of this block
- * @param numBytesToFree the size of space to be freed through evicting blocks
- * @param evictedBlocks a holder for blocks evicted in the process
- * @return whether all N bytes were successfully granted.
- */
- private def acquireStorageMemory(
- blockId: BlockId,
- numBytesToAcquire: Long,
- numBytesToFree: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
- assert(numBytesToAcquire >= 0)
- assert(numBytesToFree >= 0)
- memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- assert(_storageMemoryUsed <= maxStorageMemory)
- val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
- if (enoughMemory) {
- _storageMemoryUsed += numBytesToAcquire
- }
- enoughMemory
- }
-
}
@@ -135,7 +77,6 @@ private[spark] object StaticMemoryManager {
(systemMaxMemory * memoryFraction * safetyFraction).toLong
}
-
/**
* Return the total amount of memory available for the execution region, in bytes.
*/
diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
new file mode 100644
index 0000000000..6a322eabf8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import javax.annotation.concurrent.GuardedBy
+
+import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.{TaskContext, Logging}
+import org.apache.spark.storage.{MemoryStore, BlockStatus, BlockId}
+
+/**
+ * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage
+ * (caching).
+ *
+ * @param lock a [[MemoryManager]] instance to synchronize on
+ */
+class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging {
+
+ @GuardedBy("lock")
+ private[this] var _memoryUsed: Long = 0L
+
+ override def memoryUsed: Long = lock.synchronized {
+ _memoryUsed
+ }
+
+ private var _memoryStore: MemoryStore = _
+ def memoryStore: MemoryStore = {
+ if (_memoryStore == null) {
+ throw new IllegalStateException("memory store not initialized yet")
+ }
+ _memoryStore
+ }
+
+ /**
+ * Set the [[MemoryStore]] used by this manager to evict cached blocks.
+ * This must be set after construction due to initialization ordering constraints.
+ */
+ final def setMemoryStore(store: MemoryStore): Unit = {
+ _memoryStore = store
+ }
+
+ /**
+ * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return whether all N bytes were successfully granted.
+ */
+ def acquireMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ acquireMemory(blockId, numBytes, numBytes, evictedBlocks)
+ }
+
+ /**
+ * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary.
+ *
+ * @param blockId the ID of the block we are acquiring storage memory for
+ * @param numBytesToAcquire the size of this block
+ * @param numBytesToFree the size of space to be freed through evicting blocks
+ * @return whether all N bytes were successfully granted.
+ */
+ def acquireMemory(
+ blockId: BlockId,
+ numBytesToAcquire: Long,
+ numBytesToFree: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized {
+ assert(numBytesToAcquire >= 0)
+ assert(numBytesToFree >= 0)
+ assert(memoryUsed <= poolSize)
+ memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
+ // NOTE: If the memory store evicts blocks, then those evictions will synchronously call
+ // back into this StorageMemoryPool in order to free. Therefore, these variables should have
+ // been updated.
+ val enoughMemory = numBytesToAcquire <= memoryFree
+ if (enoughMemory) {
+ _memoryUsed += numBytesToAcquire
+ }
+ enoughMemory
+ }
+
+ def releaseMemory(size: Long): Unit = lock.synchronized {
+ if (size > _memoryUsed) {
+ logWarning(s"Attempted to release $size bytes of storage " +
+ s"memory when we only have ${_memoryUsed} bytes")
+ _memoryUsed = 0
+ } else {
+ _memoryUsed -= size
+ }
+ }
+
+ def releaseAllMemory(): Unit = lock.synchronized {
+ _memoryUsed = 0
+ }
+
+ /**
+ * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number
+ * of bytes removed from the pool's capacity.
+ */
+ def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized {
+ // First, shrink the pool by reclaiming free memory:
+ val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree)
+ decrementPoolSize(spaceFreedByReleasingUnusedMemory)
+ if (spaceFreedByReleasingUnusedMemory == spaceToFree) {
+ spaceFreedByReleasingUnusedMemory
+ } else {
+ // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
+ val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks)
+ val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum
+ _memoryUsed -= spaceFreedByEviction
+ decrementPoolSize(spaceFreedByEviction)
+ spaceFreedByReleasingUnusedMemory + spaceFreedByEviction
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
index a3093030a0..8be5b05419 100644
--- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -22,7 +22,6 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockStatus, BlockId}
-
/**
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
* either side can borrow memory from the other.
@@ -41,98 +40,105 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
* The implication is that attempts to cache blocks may fail if execution has already eaten
* up most of the storage space, in which case the new blocks will be evicted immediately
* according to their respective storage levels.
+ *
+ * @param storageRegionSize Size of the storage region, in bytes.
+ * This region is not statically reserved; execution can borrow from
+ * it if necessary. Cached blocks can be evicted only if actual
+ * storage memory usage exceeds this region.
*/
-private[spark] class UnifiedMemoryManager(
+private[spark] class UnifiedMemoryManager private[memory] (
conf: SparkConf,
maxMemory: Long,
+ private val storageRegionSize: Long,
numCores: Int)
- extends MemoryManager(conf, numCores) {
-
- def this(conf: SparkConf, numCores: Int) {
- this(conf, UnifiedMemoryManager.getMaxMemory(conf), numCores)
- }
-
- /**
- * Size of the storage region, in bytes.
- *
- * This region is not statically reserved; execution can borrow from it if necessary.
- * Cached blocks can be evicted only if actual storage memory usage exceeds this region.
- */
- private val storageRegionSize: Long = {
- (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
- }
-
- /**
- * Total amount of memory, in bytes, not currently occupied by either execution or storage.
- */
- private def totalFreeMemory: Long = synchronized {
- assert(_executionMemoryUsed <= maxMemory)
- assert(_storageMemoryUsed <= maxMemory)
- assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
- maxMemory - _executionMemoryUsed - _storageMemoryUsed
- }
+ extends MemoryManager(
+ conf,
+ numCores,
+ storageRegionSize,
+ maxMemory - storageRegionSize) {
- /**
- * Total available memory for execution, in bytes.
- * In this model, this is equivalent to the amount of memory not occupied by storage.
- */
- override def maxExecutionMemory: Long = synchronized {
- maxMemory - _storageMemoryUsed
- }
+ // We always maintain this invariant:
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
- /**
- * Total available memory for storage, in bytes.
- * In this model, this is equivalent to the amount of memory not occupied by execution.
- */
override def maxStorageMemory: Long = synchronized {
- maxMemory - _executionMemoryUsed
+ maxMemory - onHeapExecutionMemoryPool.memoryUsed
}
/**
- * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ * Try to acquire up to `numBytes` of execution memory for the current task and return the
+ * number of bytes obtained, or 0 if none can be allocated.
*
- * This method evicts blocks only up to the amount of memory borrowed by storage.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return number of bytes successfully granted (<= N).
+ * This call may block until there is enough free memory in some situations, to make sure each
+ * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of
+ * active tasks) before it is forced to spill. This can happen if the number of tasks increase
+ * but an older task had a lot of memory already.
*/
- private[memory] override def doAcquireExecutionMemory(
+ override private[memory] def acquireExecutionMemory(
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = synchronized {
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
- val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize)
- // If there is not enough free memory AND storage has borrowed some execution memory,
- // then evict as much memory borrowed by storage as needed to grant this request
- val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0
- if (shouldEvictStorage) {
- val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
- memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+ memoryMode match {
+ case MemoryMode.ON_HEAP =>
+ if (numBytes > onHeapExecutionMemoryPool.memoryFree) {
+ val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree
+ // There is not enough free memory in the execution pool, so try to reclaim memory from
+ // storage. We can reclaim any free memory from the storage pool. If the storage pool
+ // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim
+ // the memory that storage has borrowed from execution.
+ val memoryReclaimableFromStorage =
+ math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize)
+ if (memoryReclaimableFromStorage > 0) {
+ // Only reclaim as much space as is necessary and available:
+ val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace(
+ math.min(extraMemoryNeeded, memoryReclaimableFromStorage))
+ onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed)
+ }
+ }
+ onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId)
+ case MemoryMode.OFF_HEAP =>
+ // For now, we only support on-heap caching of data, so we do not need to interact with
+ // the storage pool when allocating off-heap memory. This will change in the future, though.
+ super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode)
}
- val bytesToGrant = math.min(numBytes, totalFreeMemory)
- _executionMemoryUsed += bytesToGrant
- bytesToGrant
}
- /**
- * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
- * Blocks evicted in the process, if any, are added to `evictedBlocks`.
- * @return whether all N bytes were successfully granted.
- */
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory)
assert(numBytes >= 0)
- memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
- val enoughMemory = totalFreeMemory >= numBytes
- if (enoughMemory) {
- _storageMemoryUsed += numBytes
+ if (numBytes > storageMemoryPool.memoryFree) {
+ // There is not enough free memory in the storage pool, so try to borrow free memory from
+ // the execution pool.
+ val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes)
+ onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution)
+ storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution)
}
- enoughMemory
+ storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks)
}
+ override def acquireUnrollMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, evictedBlocks)
+ }
}
-private object UnifiedMemoryManager {
+object UnifiedMemoryManager {
+
+ def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
+ val maxMemory = getMaxMemory(conf)
+ new UnifiedMemoryManager(
+ conf,
+ maxMemory = maxMemory,
+ storageRegionSize =
+ (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong,
+ numCores = numCores)
+ }
/**
* Return the total amount of memory shared between execution and storage, in bytes.
diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala b/core/src/main/scala/org/apache/spark/memory/package.scala
new file mode 100644
index 0000000000..564e30d2ff
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/package.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+/**
+ * This package implements Spark's memory management system. This system consists of two main
+ * components, a JVM-wide memory manager and a per-task manager:
+ *
+ * - [[org.apache.spark.memory.MemoryManager]] manages Spark's overall memory usage within a JVM.
+ * This component implements the policies for dividing the available memory across tasks and for
+ * allocating memory between storage (memory used caching and data transfer) and execution (memory
+ * used by computations, such as shuffles, joins, sorts, and aggregations).
+ * - [[org.apache.spark.memory.TaskMemoryManager]] manages the memory allocated by individual tasks.
+ * Tasks interact with TaskMemoryManager and never directly interact with the JVM-wide
+ * MemoryManager.
+ *
+ * Internally, each of these components have additional abstractions for memory bookkeeping:
+ *
+ * - [[org.apache.spark.memory.MemoryConsumer]]s are clients of the TaskMemoryManager and
+ * correspond to individual operators and data structures within a task. The TaskMemoryManager
+ * receives memory allocation requests from MemoryConsumers and issues callbacks to consumers
+ * in order to trigger spilling when running low on memory.
+ * - [[org.apache.spark.memory.MemoryPool]]s are a bookkeeping abstraction used by the
+ * MemoryManager to track the division of memory between storage and execution.
+ *
+ * Diagrammatically:
+ *
+ * {{{
+ * +-------------+
+ * | MemConsumer |----+ +------------------------+
+ * +-------------+ | +-------------------+ | MemoryManager |
+ * +--->| TaskMemoryManager |----+ | |
+ * +-------------+ | +-------------------+ | | +------------------+ |
+ * | MemConsumer |----+ | | | StorageMemPool | |
+ * +-------------+ +-------------------+ | | +------------------+ |
+ * | TaskMemoryManager |----+ | |
+ * +-------------------+ | | +------------------+ |
+ * +---->| |OnHeapExecMemPool | |
+ * * | | +------------------+ |
+ * * | | |
+ * +-------------+ * | | +------------------+ |
+ * | MemConsumer |----+ | | |OffHeapExecMemPool| |
+ * +-------------+ | +-------------------+ | | +------------------+ |
+ * +--->| TaskMemoryManager |----+ | |
+ * +-------------------+ +------------------------+
+ * }}}
+ *
+ *
+ * There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how
+ * they handle the sizing of their memory pools:
+ *
+ * - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft
+ * boundaries between storage and execution memory, allowing requests for memory in one region
+ * to be fulfilled by borrowing memory from the other.
+ * - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage
+ * and execution memory by statically partitioning Spark's memory and preventing storage and
+ * execution from borrowing memory from each other. This mode is retained only for legacy
+ * compatibility purposes.
+ */
+package object memory
diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
index 9e002621a6..3a48af82b1 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala
@@ -17,7 +17,7 @@
package org.apache.spark.util.collection
-import org.apache.spark.memory.TaskMemoryManager
+import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
import org.apache.spark.{Logging, SparkEnv}
/**
@@ -78,7 +78,8 @@ private[spark] trait Spillable[C] extends Logging {
if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) {
// Claim up to double our current memory from the shuffle memory pool
val amountToRequest = 2 * currentMemory - myMemoryThreshold
- val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, null)
+ val granted =
+ taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null)
myMemoryThreshold += granted
// If we were granted too little memory to grow further (either tryToAcquire returned 0,
// or we already had more memory than myMemoryThreshold), spill the current collection
@@ -107,7 +108,8 @@ private[spark] trait Spillable[C] extends Logging {
*/
def releaseMemory(): Unit = {
// The amount we requested does not include the initial memory tracking threshold
- taskMemoryManager.releaseExecutionMemory(myMemoryThreshold - initialMemoryThreshold, null)
+ taskMemoryManager.releaseExecutionMemory(
+ myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null)
myMemoryThreshold = initialMemoryThreshold
}
diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
index c731317395..711eed0193 100644
--- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
+++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java
@@ -28,8 +28,14 @@ public class TaskMemoryManagerSuite {
@Test
public void leakedPageMemoryIsDetected() {
final TaskMemoryManager manager = new TaskMemoryManager(
- new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0);
+ new StaticMemoryManager(
+ new SparkConf().set("spark.unsafe.offHeap", "false"),
+ Long.MAX_VALUE,
+ Long.MAX_VALUE,
+ 1),
+ 0);
manager.allocatePage(4096, null); // leak memory
+ Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask());
Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory());
}
diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
index 8ae3642738..e6e16fff80 100644
--- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
+++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java
@@ -32,13 +32,19 @@ public class TestMemoryConsumer extends MemoryConsumer {
}
void use(long size) {
- long got = taskMemoryManager.acquireExecutionMemory(size, this);
+ long got = taskMemoryManager.acquireExecutionMemory(
+ size,
+ taskMemoryManager.tungstenMemoryMode,
+ this);
used += got;
}
void free(long size) {
used -= size;
- taskMemoryManager.releaseExecutionMemory(size, this);
+ taskMemoryManager.releaseExecutionMemory(
+ size,
+ taskMemoryManager.tungstenMemoryMode,
+ this);
}
}
diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
index 4763395d7d..0e0eca515a 100644
--- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
+++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java
@@ -423,7 +423,7 @@ public class UnsafeShuffleWriterSuite {
memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16);
final UnsafeShuffleWriter<Object, Object> writer = createWriter(false);
final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>();
- for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) {
+ for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) {
dataToWrite.add(new Tuple2<Object, Object>(i, i));
}
writer.write(dataToWrite.iterator());
diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
index 92bd45e5fa..3bca790f30 100644
--- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
+++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java
@@ -83,7 +83,9 @@ public abstract class AbstractBytesToBytesMapSuite {
public void setup() {
memoryManager =
new TestMemoryManager(
- new SparkConf().set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator()));
+ new SparkConf()
+ .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())
+ .set("spark.memory.offHeapSize", "256mb"));
taskMemoryManager = new TaskMemoryManager(memoryManager, 0);
tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test");
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
index 4a9479cf49..f55d435fa3 100644
--- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import java.util.concurrent.atomic.AtomicLong
+import scala.collection.mutable
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Future}
@@ -29,7 +30,7 @@ import org.mockito.stubbing.Answer
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
-import org.apache.spark.storage.MemoryStore
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel}
/**
@@ -78,7 +79,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
val numBytes = args(numBytesPos).asInstanceOf[Long]
- mockEnsureFreeSpace(mm, numBytes)
+ val success = mockEnsureFreeSpace(mm, numBytes)
+ if (success) {
+ args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append(
+ (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L)))
+ }
+ success
}
}
}
@@ -132,93 +138,95 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
}
/**
- * Create a MemoryManager with the specified execution memory limit and no storage memory.
+ * Create a MemoryManager with the specified execution memory limits and no storage memory.
*/
- protected def createMemoryManager(maxExecutionMemory: Long): MemoryManager
+ protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long = 0L): MemoryManager
// -- Tests of sharing of execution memory between tasks ----------------------------------------
// Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite.
implicit val ec = ExecutionContext.global
- test("single task requesting execution memory") {
+ test("single task requesting on-heap execution memory") {
val manager = createMemoryManager(1000L)
val taskMemoryManager = new TaskMemoryManager(manager, 0)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 100L)
- assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
- assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L)
- assert(taskMemoryManager.acquireExecutionMemory(200L, null) === 100L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L)
+ assert(taskMemoryManager.acquireExecutionMemory(200L, MemoryMode.ON_HEAP, null) === 100L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
- taskMemoryManager.releaseExecutionMemory(500L, null)
- assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 300L)
- assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 200L)
+ taskMemoryManager.releaseExecutionMemory(500L, MemoryMode.ON_HEAP, null)
+ assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 300L)
+ assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 200L)
taskMemoryManager.cleanUpAllAllocatedMemory()
- assert(taskMemoryManager.acquireExecutionMemory(1000L, null) === 1000L)
- assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L)
+ assert(taskMemoryManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) === 1000L)
+ assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L)
}
- test("two tasks requesting full execution memory") {
+ test("two tasks requesting full on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// Have both tasks request 500 bytes, then wait until both requests have been granted:
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 500L)
assert(Await.result(t2Result1, futureTimeout) === 500L)
// Have both tasks each request 500 bytes more; both should immediately return 0 as they are
// both now at 1 / N
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, 200.millis) === 0L)
assert(Await.result(t2Result2, 200.millis) === 0L)
}
- test("two tasks cannot grow past 1 / N of execution memory") {
+ test("two tasks cannot grow past 1 / N of on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// Have both tasks request 250 bytes, then wait until both requests have been granted:
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, null) }
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 250L)
assert(Await.result(t2Result1, futureTimeout) === 250L)
// Have both tasks each request 500 bytes more.
// We should only grant 250 bytes to each of them on this second request
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) }
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, futureTimeout) === 250L)
assert(Await.result(t2Result2, futureTimeout) === 250L)
}
- test("tasks can block to get at least 1 / 2N of execution memory") {
+ test("tasks can block to get at least 1 / 2N of on-heap execution memory") {
val memoryManager = createMemoryManager(1000L)
val t1MemManager = new TaskMemoryManager(memoryManager, 1)
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 1000L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
- t1MemManager.releaseExecutionMemory(250L, null)
+ t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null)
// The memory freed from t1 should now be granted to t2.
assert(Await.result(t2Result1, futureTimeout) === 250L)
// Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory.
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result2, 200.millis) === 0L)
}
@@ -229,18 +237,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
val futureTimeout: Duration = 20.seconds
// t1 grabs 1000 bytes and then waits until t2 is ready to make a request.
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 1000L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
// Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult
// to make sure the other thread blocks for some time otherwise.
Thread.sleep(300)
// t1 releases all of its memory, so t2 should be able to grab all of the memory
t1MemManager.cleanUpAllAllocatedMemory()
assert(Await.result(t2Result1, futureTimeout) === 500L)
- val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result2, futureTimeout) === 500L)
- val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, null) }
+ val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result3, 200.millis) === 0L)
}
@@ -251,15 +259,35 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite {
val t2MemManager = new TaskMemoryManager(memoryManager, 2)
val futureTimeout: Duration = 20.seconds
- val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, null) }
+ val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result1, futureTimeout) === 700L)
- val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, null) }
+ val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t2Result1, futureTimeout) === 300L)
- val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, null) }
+ val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) }
assert(Await.result(t1Result2, 200.millis) === 0L)
}
+
+ test("off-heap execution allocations cannot exceed limit") {
+ val memoryManager = createMemoryManager(
+ maxOnHeapExecutionMemory = 0L,
+ maxOffHeapExecutionMemory = 1000L)
+
+ val tMemManager = new TaskMemoryManager(memoryManager, 1)
+ val result1 = Future { tMemManager.acquireExecutionMemory(1000L, MemoryMode.OFF_HEAP, null) }
+ assert(Await.result(result1, 200.millis) === 1000L)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+
+ val result2 = Future { tMemManager.acquireExecutionMemory(300L, MemoryMode.OFF_HEAP, null) }
+ assert(Await.result(result2, 200.millis) === 0L)
+
+ assert(tMemManager.getMemoryConsumptionForThisTask === 1000L)
+ tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 500L)
+ tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null)
+ assert(tMemManager.getMemoryConsumptionForThisTask === 0L)
+ }
}
private object MemoryManagerSuite {
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index 885c450d6d..54cb28c389 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -24,7 +24,6 @@ import org.mockito.Mockito.when
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-
class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
@@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
maxExecutionMem: Long,
maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1)
+ conf,
+ maxOnHeapExecutionMemory = maxExecutionMem,
+ maxStorageMemory = maxStorageMem,
+ numCores = 1)
val ms = makeMemoryStore(mm)
(mm, ms)
}
- override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
+ override protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long): StaticMemoryManager = {
new StaticMemoryManager(
- conf,
- maxExecutionMemory = maxMemory,
+ conf.clone
+ .set("spark.memory.fraction", "1")
+ .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+ .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString),
+ maxOnHeapExecutionMemory = maxOnHeapExecutionMemory,
maxStorageMemory = 0,
numCores = 1)
}
test("basic execution memory") {
val maxExecutionMem = 1000L
+ val taskAttemptId = 0L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
// Acquire up to the max
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- mm.releaseExecutionMemory(800L)
+ mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxExecutionMem)
+ mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 0L)
}
@@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
test("execution and storage isolation") {
val maxExecutionMem = 200L
val maxStorageMem = 1000L
+ val taskAttemptId = 0L
val dummyBlock = TestBlockId("ain't nobody love like you do")
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
@@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite {
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
- mm.releaseExecutionMemory(133L)
+ mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
index 77e43554ee..0706a6e45d 100644
--- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
+++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala
@@ -22,19 +22,20 @@ import scala.collection.mutable
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockStatus, BlockId}
-class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = 1) {
- private[memory] override def doAcquireExecutionMemory(
+class TestMemoryManager(conf: SparkConf)
+ extends MemoryManager(conf, numCores = 1, Long.MaxValue, Long.MaxValue) {
+
+ override private[memory] def acquireExecutionMemory(
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Long = {
if (oomOnce) {
oomOnce = false
0
} else if (available >= numBytes) {
- _executionMemoryUsed += numBytes // To suppress warnings when freeing unallocated memory
available -= numBytes
numBytes
} else {
- _executionMemoryUsed += available
val grant = available
available = 0
grant
@@ -48,12 +49,13 @@ class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores =
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = {
+ override def releaseStorageMemory(numBytes: Long): Unit = {}
+ override private[memory] def releaseExecutionMemory(
+ numBytes: Long,
+ taskAttemptId: Long,
+ memoryMode: MemoryMode): Unit = {
available += numBytes
- _executionMemoryUsed -= numBytes
}
- override def releaseStorageMemory(numBytes: Long): Unit = {}
- override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
private var oomOnce = false
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
index 0c97f2bd89..8cebe81c3b 100644
--- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -24,57 +24,52 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-
class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
- private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
private val dummyBlock = TestBlockId("--")
private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ private val storageFraction: Double = 0.5
+
/**
* Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
*/
private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
- val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
+ val mm = createMemoryManager(maxMemory)
val ms = makeMemoryStore(mm)
(mm, ms)
}
- override protected def createMemoryManager(maxMemory: Long): MemoryManager = {
- new UnifiedMemoryManager(conf, maxMemory, numCores = 1)
- }
-
- private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
- mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
- }
-
- test("storage region size") {
- val maxMemory = 1000L
- val (mm, _) = makeThings(maxMemory)
- val storageFraction = conf.get("spark.memory.storageFraction").toDouble
- val expectedStorageRegionSize = maxMemory * storageFraction
- val actualStorageRegionSize = getStorageRegionSize(mm)
- assert(expectedStorageRegionSize === actualStorageRegionSize)
+ override protected def createMemoryManager(
+ maxOnHeapExecutionMemory: Long,
+ maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = {
+ val conf = new SparkConf()
+ .set("spark.memory.fraction", "1")
+ .set("spark.testing.memory", maxOnHeapExecutionMemory.toString)
+ .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString)
+ .set("spark.memory.storageFraction", storageFraction.toString)
+ UnifiedMemoryManager(conf, numCores = 1)
}
test("basic execution memory") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, _) = makeThings(maxMemory)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
// Acquire up to the max
- assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L)
assert(mm.executionMemoryUsed === maxMemory)
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L)
assert(mm.executionMemoryUsed === maxMemory)
- mm.releaseExecutionMemory(800L)
+ mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
- mm.releaseExecutionMemory(maxMemory)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
assert(mm.executionMemoryUsed === 0L)
}
@@ -118,44 +113,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("execution evicts storage") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
- // First, ensure the test classes are set up as expected
- val expectedStorageRegionSize = 500L
- val expectedExecutionRegionSize = 500L
- val storageRegionSize = getStorageRegionSize(mm)
- val executionRegionSize = maxMemory - expectedStorageRegionSize
- require(storageRegionSize === expectedStorageRegionSize,
- "bad test: storage region size is unexpected")
- require(executionRegionSize === expectedExecutionRegionSize,
- "bad test: storage region size is unexpected")
// Acquire enough storage memory to exceed the storage region
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 750L)
assert(mm.executionMemoryUsed === 0L)
assert(mm.storageMemoryUsed === 750L)
- require(mm.storageMemoryUsed > storageRegionSize,
- s"bad test: storage memory used should exceed the storage region")
// Execution needs to request 250 bytes to evict storage memory
- assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L)
assert(mm.executionMemoryUsed === 100L)
assert(mm.storageMemoryUsed === 750L)
assertEnsureFreeSpaceNotCalled(ms)
// Execution wants 200 bytes but only 150 are free, so storage is evicted
- assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
- assertEnsureFreeSpaceCalled(ms, 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
+ assert(mm.executionMemoryUsed === 300L)
+ assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.executionMemoryUsed === 300L)
mm.releaseAllStorageMemory()
- require(mm.executionMemoryUsed < executionRegionSize,
- s"bad test: execution memory used should be within the execution region")
+ require(mm.executionMemoryUsed === 300L)
require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
// Acquire some storage memory again, but this time keep it within the storage region
assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
assertEnsureFreeSpaceCalled(ms, 400L)
- require(mm.storageMemoryUsed < storageRegionSize,
- s"bad test: storage memory used should be within the storage region")
+ assert(mm.storageMemoryUsed === 400L)
+ assert(mm.executionMemoryUsed === 300L)
// Execution cannot evict storage because the latter is within the storage fraction,
// so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
- assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L)
assert(mm.executionMemoryUsed === 600L)
assert(mm.storageMemoryUsed === 400L)
assertEnsureFreeSpaceNotCalled(ms)
@@ -163,23 +148,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
test("storage does not evict execution") {
val maxMemory = 1000L
+ val taskAttemptId = 0L
val (mm, ms) = makeThings(maxMemory)
- // First, ensure the test classes are set up as expected
- val expectedStorageRegionSize = 500L
- val expectedExecutionRegionSize = 500L
- val storageRegionSize = getStorageRegionSize(mm)
- val executionRegionSize = maxMemory - expectedStorageRegionSize
- require(storageRegionSize === expectedStorageRegionSize,
- "bad test: storage region size is unexpected")
- require(executionRegionSize === expectedExecutionRegionSize,
- "bad test: storage region size is unexpected")
// Acquire enough execution memory to exceed the execution region
- assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L)
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
- require(mm.executionMemoryUsed > executionRegionSize,
- s"bad test: execution memory used should exceed the execution region")
// Storage should not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
assert(mm.executionMemoryUsed === 800L)
@@ -189,15 +164,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
assert(mm.executionMemoryUsed === 800L)
assert(mm.storageMemoryUsed === 100L)
assertEnsureFreeSpaceCalled(ms, 250L)
- mm.releaseExecutionMemory(maxMemory)
+ mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP)
mm.releaseStorageMemory(maxMemory)
// Acquire some execution memory again, but this time keep it within the execution region
- assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L)
assert(mm.executionMemoryUsed === 200L)
assert(mm.storageMemoryUsed === 0L)
assertEnsureFreeSpaceNotCalled(ms)
- require(mm.executionMemoryUsed < executionRegionSize,
- s"bad test: execution memory used should be within the execution region")
// Storage should still not be able to evict execution
assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
assert(mm.executionMemoryUsed === 200L)
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index d49015afcd..53991d8a1a 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -825,7 +825,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
val memoryManager = new StaticMemoryManager(
conf,
- maxExecutionMemory = Long.MaxValue,
+ maxOnHeapExecutionMemory = Long.MaxValue,
maxStorageMemory = 1200,
numCores = 1)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,