diff options
21 files changed, 828 insertions, 465 deletions
diff --git a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java index 8fbdb72832..36138cc9a2 100644 --- a/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java +++ b/core/src/main/java/org/apache/spark/memory/MemoryConsumer.java @@ -17,15 +17,15 @@ package org.apache.spark.memory; - import java.io.IOException; import org.apache.spark.unsafe.array.LongArray; import org.apache.spark.unsafe.memory.MemoryBlock; - /** * An memory consumer of TaskMemoryManager, which support spilling. + * + * Note: this only supports allocation / spilling of Tungsten memory. */ public abstract class MemoryConsumer { @@ -36,7 +36,6 @@ public abstract class MemoryConsumer { protected MemoryConsumer(TaskMemoryManager taskMemoryManager, long pageSize) { this.taskMemoryManager = taskMemoryManager; this.pageSize = pageSize; - this.used = 0; } protected MemoryConsumer(TaskMemoryManager taskMemoryManager) { @@ -67,6 +66,8 @@ public abstract class MemoryConsumer { * * Note: In order to avoid possible deadlock, should not call acquireMemory() from spill(). * + * Note: today, this only frees Tungsten-managed pages. + * * @param size the amount of memory should be released * @param trigger the MemoryConsumer that trigger this spilling * @return the amount of released memory in bytes diff --git a/core/src/main/java/org/apache/spark/memory/MemoryMode.java b/core/src/main/java/org/apache/spark/memory/MemoryMode.java new file mode 100644 index 0000000000..3a5e72d8aa --- /dev/null +++ b/core/src/main/java/org/apache/spark/memory/MemoryMode.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory; + +import org.apache.spark.annotation.Private; + +@Private +public enum MemoryMode { + ON_HEAP, + OFF_HEAP +} diff --git a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java index 6440f9c0f3..5f743b2885 100644 --- a/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java +++ b/core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java @@ -103,10 +103,10 @@ public class TaskMemoryManager { * without doing any masking or lookups. Since this branching should be well-predicted by the JIT, * this extra layer of indirection / abstraction hopefully shouldn't be too expensive. */ - private final boolean inHeap; + final MemoryMode tungstenMemoryMode; /** - * The size of memory granted to each consumer. + * Tracks spillable memory consumers. */ @GuardedBy("this") private final HashSet<MemoryConsumer> consumers; @@ -115,7 +115,7 @@ public class TaskMemoryManager { * Construct a new TaskMemoryManager. */ public TaskMemoryManager(MemoryManager memoryManager, long taskAttemptId) { - this.inHeap = memoryManager.tungstenMemoryIsAllocatedInHeap(); + this.tungstenMemoryMode = memoryManager.tungstenMemoryMode(); this.memoryManager = memoryManager; this.taskAttemptId = taskAttemptId; this.consumers = new HashSet<>(); @@ -127,12 +127,19 @@ public class TaskMemoryManager { * * @return number of bytes successfully granted (<= N). */ - public long acquireExecutionMemory(long required, MemoryConsumer consumer) { + public long acquireExecutionMemory( + long required, + MemoryMode mode, + MemoryConsumer consumer) { assert(required >= 0); + // If we are allocating Tungsten pages off-heap and receive a request to allocate on-heap + // memory here, then it may not make sense to spill since that would only end up freeing + // off-heap memory. This is subject to change, though, so it may be risky to make this + // optimization now in case we forget to undo it late when making changes. synchronized (this) { - long got = memoryManager.acquireExecutionMemory(required, taskAttemptId); + long got = memoryManager.acquireExecutionMemory(required, taskAttemptId, mode); - // try to release memory from other consumers first, then we can reduce the frequency of + // Try to release memory from other consumers first, then we can reduce the frequency of // spilling, avoid to have too many spilled files. if (got < required) { // Call spill() on other consumers to release memory @@ -140,10 +147,10 @@ public class TaskMemoryManager { if (c != consumer && c.getUsed() > 0) { try { long released = c.spill(required - got, consumer); - if (released > 0) { - logger.info("Task {} released {} from {} for {}", taskAttemptId, + if (released > 0 && mode == tungstenMemoryMode) { + logger.debug("Task {} released {} from {} for {}", taskAttemptId, Utils.bytesToString(released), c, consumer); - got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); if (got >= required) { break; } @@ -161,10 +168,10 @@ public class TaskMemoryManager { if (got < required && consumer != null) { try { long released = consumer.spill(required - got, consumer); - if (released > 0) { - logger.info("Task {} released {} from itself ({})", taskAttemptId, + if (released > 0 && mode == tungstenMemoryMode) { + logger.debug("Task {} released {} from itself ({})", taskAttemptId, Utils.bytesToString(released), consumer); - got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId); + got += memoryManager.acquireExecutionMemory(required - got, taskAttemptId, mode); } } catch (IOException e) { logger.error("error while calling spill() on " + consumer, e); @@ -184,9 +191,9 @@ public class TaskMemoryManager { /** * Release N bytes of execution memory for a MemoryConsumer. */ - public void releaseExecutionMemory(long size, MemoryConsumer consumer) { + public void releaseExecutionMemory(long size, MemoryMode mode, MemoryConsumer consumer) { logger.debug("Task {} release {} from {}", taskAttemptId, Utils.bytesToString(size), consumer); - memoryManager.releaseExecutionMemory(size, taskAttemptId); + memoryManager.releaseExecutionMemory(size, taskAttemptId, mode); } /** @@ -195,11 +202,19 @@ public class TaskMemoryManager { public void showMemoryUsage() { logger.info("Memory used in task " + taskAttemptId); synchronized (this) { + long memoryAccountedForByConsumers = 0; for (MemoryConsumer c: consumers) { - if (c.getUsed() > 0) { - logger.info("Acquired by " + c + ": " + Utils.bytesToString(c.getUsed())); + long totalMemUsage = c.getUsed(); + memoryAccountedForByConsumers += totalMemUsage; + if (totalMemUsage > 0) { + logger.info("Acquired by " + c + ": " + Utils.bytesToString(totalMemUsage)); } } + long memoryNotAccountedFor = + memoryManager.getExecutionMemoryUsageForTask(taskAttemptId) - memoryAccountedForByConsumers; + logger.info( + "{} bytes of memory were used by task {} but are not associated with specific consumers", + memoryNotAccountedFor, taskAttemptId); } } @@ -214,7 +229,8 @@ public class TaskMemoryManager { * Allocate a block of memory that will be tracked in the MemoryManager's page table; this is * intended for allocating large blocks of Tungsten memory that will be shared between operators. * - * Returns `null` if there was not enough memory to allocate the page. + * Returns `null` if there was not enough memory to allocate the page. May return a page that + * contains fewer bytes than requested, so callers should verify the size of returned pages. */ public MemoryBlock allocatePage(long size, MemoryConsumer consumer) { if (size > MAXIMUM_PAGE_SIZE_BYTES) { @@ -222,7 +238,7 @@ public class TaskMemoryManager { "Cannot allocate a page with more than " + MAXIMUM_PAGE_SIZE_BYTES + " bytes"); } - long acquired = acquireExecutionMemory(size, consumer); + long acquired = acquireExecutionMemory(size, tungstenMemoryMode, consumer); if (acquired <= 0) { return null; } @@ -231,7 +247,7 @@ public class TaskMemoryManager { synchronized (this) { pageNumber = allocatedPages.nextClearBit(0); if (pageNumber >= PAGE_TABLE_SIZE) { - releaseExecutionMemory(acquired, consumer); + releaseExecutionMemory(acquired, tungstenMemoryMode, consumer); throw new IllegalStateException( "Have already allocated a maximum of " + PAGE_TABLE_SIZE + " pages"); } @@ -262,7 +278,7 @@ public class TaskMemoryManager { } long pageSize = page.size(); memoryManager.tungstenMemoryAllocator().free(page); - releaseExecutionMemory(pageSize, consumer); + releaseExecutionMemory(pageSize, tungstenMemoryMode, consumer); } /** @@ -276,7 +292,7 @@ public class TaskMemoryManager { * @return an encoded page address. */ public long encodePageNumberAndOffset(MemoryBlock page, long offsetInPage) { - if (!inHeap) { + if (tungstenMemoryMode == MemoryMode.OFF_HEAP) { // In off-heap mode, an offset is an absolute address that may require a full 64 bits to // encode. Due to our page size limitation, though, we can convert this into an offset that's // relative to the page's base offset; this relative offset will fit in 51 bits. @@ -305,7 +321,7 @@ public class TaskMemoryManager { * {@link TaskMemoryManager#encodePageNumberAndOffset(MemoryBlock, long)} */ public Object getPage(long pagePlusOffsetAddress) { - if (inHeap) { + if (tungstenMemoryMode == MemoryMode.ON_HEAP) { final int pageNumber = decodePageNumber(pagePlusOffsetAddress); assert (pageNumber >= 0 && pageNumber < PAGE_TABLE_SIZE); final MemoryBlock page = pageTable[pageNumber]; @@ -323,7 +339,7 @@ public class TaskMemoryManager { */ public long getOffsetInPage(long pagePlusOffsetAddress) { final long offsetInPage = decodeOffset(pagePlusOffsetAddress); - if (inHeap) { + if (tungstenMemoryMode == MemoryMode.ON_HEAP) { return offsetInPage; } else { // In off-heap mode, an offset is an absolute address. In encodePageNumberAndOffset, we @@ -351,11 +367,19 @@ public class TaskMemoryManager { } consumers.clear(); } + + for (MemoryBlock page : pageTable) { + if (page != null) { + memoryManager.tungstenMemoryAllocator().free(page); + } + } + Arrays.fill(pageTable, null); + return memoryManager.releaseAllExecutionMemoryForTask(taskAttemptId); } /** - * Returns the memory consumption, in bytes, for the current task + * Returns the memory consumption, in bytes, for the current task. */ public long getMemoryConsumptionForThisTask() { return memoryManager.getExecutionMemoryUsageForTask(taskAttemptId); diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 23ae9360f6..4474a83bed 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -341,7 +341,7 @@ object SparkEnv extends Logging { if (useLegacyMemoryManager) { new StaticMemoryManager(conf, numUsableCores) } else { - new UnifiedMemoryManager(conf, numUsableCores) + UnifiedMemoryManager(conf, numUsableCores) } val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) diff --git a/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala new file mode 100644 index 0000000000..7825bae425 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/memory/ExecutionMemoryPool.scala @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable + +import org.apache.spark.Logging + +/** + * Implements policies and bookkeeping for sharing a adjustable-sized pool of memory between tasks. + * + * Tries to ensure that each task gets a reasonable share of memory, instead of some task ramping up + * to a large amount first and then causing others to spill to disk repeatedly. + * + * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory + * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the + * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever this + * set changes. This is all done by synchronizing access to mutable state and using wait() and + * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across + * tasks was performed by the ShuffleMemoryManager. + * + * @param lock a [[MemoryManager]] instance to synchronize on + * @param poolName a human-readable name for this pool, for use in log messages + */ +class ExecutionMemoryPool( + lock: Object, + poolName: String + ) extends MemoryPool(lock) with Logging { + + /** + * Map from taskAttemptId -> memory consumption in bytes + */ + @GuardedBy("lock") + private val memoryForTask = new mutable.HashMap[Long, Long]() + + override def memoryUsed: Long = lock.synchronized { + memoryForTask.values.sum + } + + /** + * Returns the memory consumption, in bytes, for the given task. + */ + def getMemoryUsageForTask(taskAttemptId: Long): Long = lock.synchronized { + memoryForTask.getOrElse(taskAttemptId, 0L) + } + + /** + * Try to acquire up to `numBytes` of memory for the given task and return the number of bytes + * obtained, or 0 if none can be allocated. + * + * This call may block until there is enough free memory in some situations, to make sure each + * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of + * active tasks) before it is forced to spill. This can happen if the number of tasks increase + * but an older task had a lot of memory already. + * + * @return the number of bytes granted to the task. + */ + def acquireMemory(numBytes: Long, taskAttemptId: Long): Long = lock.synchronized { + assert(numBytes > 0, s"invalid number of bytes requested: $numBytes") + + // Add this task to the taskMemory map just so we can keep an accurate count of the number + // of active tasks, to let other tasks ramp down their memory in calls to `acquireMemory` + if (!memoryForTask.contains(taskAttemptId)) { + memoryForTask(taskAttemptId) = 0L + // This will later cause waiting tasks to wake up and check numTasks again + lock.notifyAll() + } + + // Keep looping until we're either sure that we don't want to grant this request (because this + // task would have more than 1 / numActiveTasks of the memory) or we have enough free + // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). + // TODO: simplify this to limit each task to its own slot + while (true) { + val numActiveTasks = memoryForTask.keys.size + val curMem = memoryForTask(taskAttemptId) + + // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks; + // don't let it be negative + val maxToGrant = + math.min(numBytes, math.max(0, (poolSize / numActiveTasks) - curMem)) + // Only give it as much memory as is free, which might be none if it reached 1 / numTasks + val toGrant = math.min(maxToGrant, memoryFree) + + if (curMem < poolSize / (2 * numActiveTasks)) { + // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; + // if we can't give it this much now, wait for other tasks to free up memory + // (this happens if older tasks allocated lots of memory before N grew) + if (memoryFree >= math.min(maxToGrant, poolSize / (2 * numActiveTasks) - curMem)) { + memoryForTask(taskAttemptId) += toGrant + return toGrant + } else { + logInfo( + s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free") + lock.wait() + } + } else { + memoryForTask(taskAttemptId) += toGrant + return toGrant + } + } + 0L // Never reached + } + + /** + * Release `numBytes` of memory acquired by the given task. + */ + def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { + val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) + var memoryToFree = if (curMem < numBytes) { + logWarning( + s"Internal error: release called on $numBytes bytes but task only has $curMem bytes " + + s"of memory from the $poolName pool") + curMem + } else { + numBytes + } + if (memoryForTask.contains(taskAttemptId)) { + memoryForTask(taskAttemptId) -= memoryToFree + if (memoryForTask(taskAttemptId) <= 0) { + memoryForTask.remove(taskAttemptId) + } + } + lock.notifyAll() // Notify waiters in acquireMemory() that memory has been freed + } + + /** + * Release all memory for the given task and mark it as inactive (e.g. when a task ends). + * @return the number of bytes freed. + */ + def releaseAllMemoryForTask(taskAttemptId: Long): Long = lock.synchronized { + val numBytesToFree = getMemoryUsageForTask(taskAttemptId) + releaseMemory(numBytesToFree, taskAttemptId) + numBytesToFree + } + +} diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index b0cf2696a3..ceb8ea434e 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -20,12 +20,8 @@ package org.apache.spark.memory import javax.annotation.concurrent.GuardedBy import scala.collection.mutable -import scala.collection.mutable.ArrayBuffer -import com.google.common.annotations.VisibleForTesting - -import org.apache.spark.util.Utils -import org.apache.spark.{SparkException, TaskContext, SparkConf, Logging} +import org.apache.spark.{SparkConf, Logging} import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} import org.apache.spark.unsafe.array.ByteArrayMethods import org.apache.spark.unsafe.memory.MemoryAllocator @@ -36,53 +32,40 @@ import org.apache.spark.unsafe.memory.MemoryAllocator * In this context, execution memory refers to that used for computation in shuffles, joins, * sorts and aggregations, while storage memory refers to that used for caching and propagating * internal data across the cluster. There exists one MemoryManager per JVM. - * - * The MemoryManager abstract base class itself implements policies for sharing execution memory - * between tasks; it tries to ensure that each task gets a reasonable share of memory, instead of - * some task ramping up to a large amount first and then causing others to spill to disk repeatedly. - * If there are N tasks, it ensures that each task can acquire at least 1 / 2N of the memory - * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the - * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever - * this set changes. This is all done by synchronizing access to mutable state and using wait() and - * notifyAll() to signal changes to callers. Prior to Spark 1.6, this arbitration of memory across - * tasks was performed by the ShuffleMemoryManager. */ -private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) extends Logging { +private[spark] abstract class MemoryManager( + conf: SparkConf, + numCores: Int, + storageMemory: Long, + onHeapExecutionMemory: Long) extends Logging { // -- Methods related to memory allocation policies and bookkeeping ------------------------------ - // The memory store used to evict cached blocks - private var _memoryStore: MemoryStore = _ - protected def memoryStore: MemoryStore = { - if (_memoryStore == null) { - throw new IllegalArgumentException("memory store not initialized yet") - } - _memoryStore - } + @GuardedBy("this") + protected val storageMemoryPool = new StorageMemoryPool(this) + @GuardedBy("this") + protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "on-heap execution") + @GuardedBy("this") + protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, "off-heap execution") - // Amount of execution/storage memory in use, accesses must be synchronized on `this` - @GuardedBy("this") protected var _executionMemoryUsed: Long = 0 - @GuardedBy("this") protected var _storageMemoryUsed: Long = 0 - // Map from taskAttemptId -> memory consumption in bytes - @GuardedBy("this") private val executionMemoryForTask = new mutable.HashMap[Long, Long]() - - /** - * Set the [[MemoryStore]] used by this manager to evict cached blocks. - * This must be set after construction due to initialization ordering constraints. - */ - final def setMemoryStore(store: MemoryStore): Unit = { - _memoryStore = store - } + storageMemoryPool.incrementPoolSize(storageMemory) + onHeapExecutionMemoryPool.incrementPoolSize(onHeapExecutionMemory) + offHeapExecutionMemoryPool.incrementPoolSize(conf.getSizeAsBytes("spark.memory.offHeapSize", 0)) /** - * Total available memory for execution, in bytes. + * Total available memory for storage, in bytes. This amount can vary over time, depending on + * the MemoryManager implementation. + * In this model, this is equivalent to the amount of memory not occupied by execution. */ - def maxExecutionMemory: Long + def maxStorageMemory: Long /** - * Total available memory for storage, in bytes. + * Set the [[MemoryStore]] used by this manager to evict cached blocks. + * This must be set after construction due to initialization ordering constraints. */ - def maxStorageMemory: Long + final def setMemoryStore(store: MemoryStore): Unit = synchronized { + storageMemoryPool.setMemoryStore(store) + } // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) @@ -94,7 +77,9 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) + } /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. @@ -109,103 +94,25 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes, evictedBlocks) - } - - /** - * Acquire N bytes of memory for execution, evicting cached blocks if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return number of bytes successfully granted (<= N). - */ - @VisibleForTesting - private[memory] def doAcquireExecutionMemory( - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean /** - * Try to acquire up to `numBytes` of execution memory for the current task and return the number - * of bytes obtained, or 0 if none can be allocated. + * Try to acquire up to `numBytes` of execution memory for the current task and return the + * number of bytes obtained, or 0 if none can be allocated. * * This call may block until there is enough free memory in some situations, to make sure each * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of * active tasks) before it is forced to spill. This can happen if the number of tasks increase * but an older task had a lot of memory already. - * - * Subclasses should override `doAcquireExecutionMemory` in order to customize the policies - * that control global sharing of memory between execution and storage. */ private[memory] - final def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long): Long = synchronized { - assert(numBytes > 0, "invalid number of bytes requested: " + numBytes) - - // Add this task to the taskMemory map just so we can keep an accurate count of the number - // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire - if (!executionMemoryForTask.contains(taskAttemptId)) { - executionMemoryForTask(taskAttemptId) = 0L - // This will later cause waiting tasks to wake up and check numTasks again - notifyAll() - } - - // Once the cross-task memory allocation policy has decided to grant more memory to a task, - // this method is called in order to actually obtain that execution memory, potentially - // triggering eviction of storage memory: - def acquire(toGrant: Long): Long = synchronized { - val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] - val acquired = doAcquireExecutionMemory(toGrant, evictedBlocks) - // Register evicted blocks, if any, with the active task metrics - Option(TaskContext.get()).foreach { tc => - val metrics = tc.taskMetrics() - val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) - metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) - } - executionMemoryForTask(taskAttemptId) += acquired - acquired - } - - // Keep looping until we're either sure that we don't want to grant this request (because this - // task would have more than 1 / numActiveTasks of the memory) or we have enough free - // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). - // TODO: simplify this to limit each task to its own slot - while (true) { - val numActiveTasks = executionMemoryForTask.keys.size - val curMem = executionMemoryForTask(taskAttemptId) - val freeMemory = maxExecutionMemory - executionMemoryForTask.values.sum - - // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks; - // don't let it be negative - val maxToGrant = - math.min(numBytes, math.max(0, (maxExecutionMemory / numActiveTasks) - curMem)) - // Only give it as much memory as is free, which might be none if it reached 1 / numTasks - val toGrant = math.min(maxToGrant, freeMemory) - - if (curMem < maxExecutionMemory / (2 * numActiveTasks)) { - // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking; - // if we can't give it this much now, wait for other tasks to free up memory - // (this happens if older tasks allocated lots of memory before N grew) - if ( - freeMemory >= math.min(maxToGrant, maxExecutionMemory / (2 * numActiveTasks) - curMem)) { - return acquire(toGrant) - } else { - logInfo( - s"TID $taskAttemptId waiting for at least 1/2N of execution memory pool to be free") - wait() - } - } else { - return acquire(toGrant) - } - } - 0L // Never reached - } - - @VisibleForTesting - private[memory] def releaseExecutionMemory(numBytes: Long): Unit = synchronized { - if (numBytes > _executionMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of execution " + - s"memory when we only have ${_executionMemoryUsed} bytes") - _executionMemoryUsed = 0 - } else { - _executionMemoryUsed -= numBytes + def acquireExecutionMemory( + numBytes: Long, + taskAttemptId: Long, + memoryMode: MemoryMode): Long = synchronized { + memoryMode match { + case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) } } @@ -213,24 +120,14 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte * Release numBytes of execution memory belonging to the given task. */ private[memory] - final def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long): Unit = synchronized { - val curMem = executionMemoryForTask.getOrElse(taskAttemptId, 0L) - if (curMem < numBytes) { - if (Utils.isTesting) { - throw new SparkException( - s"Internal error: release called on $numBytes bytes but task only has $curMem") - } else { - logWarning(s"Internal error: release called on $numBytes bytes but task only has $curMem") - } - } - if (executionMemoryForTask.contains(taskAttemptId)) { - executionMemoryForTask(taskAttemptId) -= numBytes - if (executionMemoryForTask(taskAttemptId) <= 0) { - executionMemoryForTask.remove(taskAttemptId) - } - releaseExecutionMemory(numBytes) + def releaseExecutionMemory( + numBytes: Long, + taskAttemptId: Long, + memoryMode: MemoryMode): Unit = synchronized { + memoryMode match { + case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) + case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.releaseMemory(numBytes, taskAttemptId) } - notifyAll() // Notify waiters in acquireExecutionMemory() that memory has been freed } /** @@ -238,35 +135,28 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte * @return the number of bytes freed. */ private[memory] def releaseAllExecutionMemoryForTask(taskAttemptId: Long): Long = synchronized { - val numBytesToFree = getExecutionMemoryUsageForTask(taskAttemptId) - releaseExecutionMemory(numBytesToFree, taskAttemptId) - numBytesToFree + onHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) + + offHeapExecutionMemoryPool.releaseAllMemoryForTask(taskAttemptId) } /** * Release N bytes of storage memory. */ def releaseStorageMemory(numBytes: Long): Unit = synchronized { - if (numBytes > _storageMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of storage " + - s"memory when we only have ${_storageMemoryUsed} bytes") - _storageMemoryUsed = 0 - } else { - _storageMemoryUsed -= numBytes - } + storageMemoryPool.releaseMemory(numBytes) } /** * Release all storage memory acquired. */ - def releaseAllStorageMemory(): Unit = synchronized { - _storageMemoryUsed = 0 + final def releaseAllStorageMemory(): Unit = synchronized { + storageMemoryPool.releaseAllMemory() } /** * Release N bytes of unroll memory. */ - def releaseUnrollMemory(numBytes: Long): Unit = synchronized { + final def releaseUnrollMemory(numBytes: Long): Unit = synchronized { releaseStorageMemory(numBytes) } @@ -274,26 +164,35 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte * Execution memory currently in use, in bytes. */ final def executionMemoryUsed: Long = synchronized { - _executionMemoryUsed + onHeapExecutionMemoryPool.memoryUsed + offHeapExecutionMemoryPool.memoryUsed } /** * Storage memory currently in use, in bytes. */ final def storageMemoryUsed: Long = synchronized { - _storageMemoryUsed + storageMemoryPool.memoryUsed } /** * Returns the execution memory consumption, in bytes, for the given task. */ private[memory] def getExecutionMemoryUsageForTask(taskAttemptId: Long): Long = synchronized { - executionMemoryForTask.getOrElse(taskAttemptId, 0L) + onHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) + + offHeapExecutionMemoryPool.getMemoryUsageForTask(taskAttemptId) } // -- Fields related to Tungsten managed memory ------------------------------------------------- /** + * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using + * sun.misc.Unsafe. + */ + final val tungstenMemoryMode: MemoryMode = { + if (conf.getBoolean("spark.unsafe.offHeap", false)) MemoryMode.OFF_HEAP else MemoryMode.ON_HEAP + } + + /** * The default page size, in bytes. * * If user didn't explicitly set "spark.buffer.pageSize", we figure out the default value @@ -306,21 +205,22 @@ private[spark] abstract class MemoryManager(conf: SparkConf, numCores: Int) exte val cores = if (numCores > 0) numCores else Runtime.getRuntime.availableProcessors() // Because of rounding to next power of 2, we may have safetyFactor as 8 in worst case val safetyFactor = 16 - val size = ByteArrayMethods.nextPowerOf2(maxExecutionMemory / cores / safetyFactor) + val maxTungstenMemory: Long = tungstenMemoryMode match { + case MemoryMode.ON_HEAP => onHeapExecutionMemoryPool.poolSize + case MemoryMode.OFF_HEAP => offHeapExecutionMemoryPool.poolSize + } + val size = ByteArrayMethods.nextPowerOf2(maxTungstenMemory / cores / safetyFactor) val default = math.min(maxPageSize, math.max(minPageSize, size)) conf.getSizeAsBytes("spark.buffer.pageSize", default) } /** - * Tracks whether Tungsten memory will be allocated on the JVM heap or off-heap using - * sun.misc.Unsafe. - */ - final val tungstenMemoryIsAllocatedInHeap: Boolean = - !conf.getBoolean("spark.unsafe.offHeap", false) - - /** * Allocates memory for use by Unsafe/Tungsten code. */ - private[memory] final val tungstenMemoryAllocator: MemoryAllocator = - if (tungstenMemoryIsAllocatedInHeap) MemoryAllocator.HEAP else MemoryAllocator.UNSAFE + private[memory] final val tungstenMemoryAllocator: MemoryAllocator = { + tungstenMemoryMode match { + case MemoryMode.ON_HEAP => MemoryAllocator.HEAP + case MemoryMode.OFF_HEAP => MemoryAllocator.UNSAFE + } + } } diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala new file mode 100644 index 0000000000..bfeec47e38 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/memory/MemoryPool.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import javax.annotation.concurrent.GuardedBy + +/** + * Manages bookkeeping for an adjustable-sized region of memory. This class is internal to + * the [[MemoryManager]]. See subclasses for more details. + * + * @param lock a [[MemoryManager]] instance, used for synchronization. We purposely erase the type + * to `Object` to avoid programming errors, since this object should only be used for + * synchronization purposes. + */ +abstract class MemoryPool(lock: Object) { + + @GuardedBy("lock") + private[this] var _poolSize: Long = 0 + + /** + * Returns the current size of the pool, in bytes. + */ + final def poolSize: Long = lock.synchronized { + _poolSize + } + + /** + * Returns the amount of free memory in the pool, in bytes. + */ + final def memoryFree: Long = lock.synchronized { + _poolSize - memoryUsed + } + + /** + * Expands the pool by `delta` bytes. + */ + final def incrementPoolSize(delta: Long): Unit = lock.synchronized { + require(delta >= 0) + _poolSize += delta + } + + /** + * Shrinks the pool by `delta` bytes. + */ + final def decrementPoolSize(delta: Long): Unit = lock.synchronized { + require(delta >= 0) + require(delta <= _poolSize) + require(_poolSize - delta >= memoryUsed) + _poolSize -= delta + } + + /** + * Returns the amount of used memory in this pool (in bytes). + */ + def memoryUsed: Long +} diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 9c2c2e90a2..12a0943068 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -22,7 +22,6 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus} - /** * A [[MemoryManager]] that statically partitions the heap space into disjoint regions. * @@ -32,10 +31,14 @@ import org.apache.spark.storage.{BlockId, BlockStatus} */ private[spark] class StaticMemoryManager( conf: SparkConf, - override val maxExecutionMemory: Long, + maxOnHeapExecutionMemory: Long, override val maxStorageMemory: Long, numCores: Int) - extends MemoryManager(conf, numCores) { + extends MemoryManager( + conf, + numCores, + maxStorageMemory, + maxOnHeapExecutionMemory) { def this(conf: SparkConf, numCores: Int) { this( @@ -50,76 +53,15 @@ private[spark] class StaticMemoryManager( (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong } - /** - * Acquire N bytes of memory for execution. - * @return number of bytes successfully granted (<= N). - */ - override def doAcquireExecutionMemory( - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { - assert(numBytes >= 0) - assert(_executionMemoryUsed <= maxExecutionMemory) - val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed) - _executionMemoryUsed += bytesToGrant - bytesToGrant - } - - /** - * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return whether all N bytes were successfully granted. - */ - override def acquireStorageMemory( - blockId: BlockId, - numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks) - } - - /** - * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. - * - * This evicts at most M bytes worth of existing blocks, where M is a fraction of the storage - * space specified by `spark.storage.unrollFraction`. Blocks evicted in the process, if any, - * are added to `evictedBlocks`. - * - * @return whether all N bytes were successfully granted. - */ override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - val currentUnrollMemory = memoryStore.currentUnrollMemory + val currentUnrollMemory = storageMemoryPool.memoryStore.currentUnrollMemory val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) val numBytesToFree = math.min(numBytes, maxNumBytesToFree) - acquireStorageMemory(blockId, numBytes, numBytesToFree, evictedBlocks) + storageMemoryPool.acquireMemory(blockId, numBytes, numBytesToFree, evictedBlocks) } - - /** - * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary. - * - * @param blockId the ID of the block we are acquiring storage memory for - * @param numBytesToAcquire the size of this block - * @param numBytesToFree the size of space to be freed through evicting blocks - * @param evictedBlocks a holder for blocks evicted in the process - * @return whether all N bytes were successfully granted. - */ - private def acquireStorageMemory( - blockId: BlockId, - numBytesToAcquire: Long, - numBytesToFree: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { - assert(numBytesToAcquire >= 0) - assert(numBytesToFree >= 0) - memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) - assert(_storageMemoryUsed <= maxStorageMemory) - val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory - if (enoughMemory) { - _storageMemoryUsed += numBytesToAcquire - } - enoughMemory - } - } @@ -135,7 +77,6 @@ private[spark] object StaticMemoryManager { (systemMaxMemory * memoryFraction * safetyFraction).toLong } - /** * Return the total amount of memory available for the execution region, in bytes. */ diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala new file mode 100644 index 0000000000..6a322eabf8 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import javax.annotation.concurrent.GuardedBy + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{TaskContext, Logging} +import org.apache.spark.storage.{MemoryStore, BlockStatus, BlockId} + +/** + * Performs bookkeeping for managing an adjustable-size pool of memory that is used for storage + * (caching). + * + * @param lock a [[MemoryManager]] instance to synchronize on + */ +class StorageMemoryPool(lock: Object) extends MemoryPool(lock) with Logging { + + @GuardedBy("lock") + private[this] var _memoryUsed: Long = 0L + + override def memoryUsed: Long = lock.synchronized { + _memoryUsed + } + + private var _memoryStore: MemoryStore = _ + def memoryStore: MemoryStore = { + if (_memoryStore == null) { + throw new IllegalStateException("memory store not initialized yet") + } + _memoryStore + } + + /** + * Set the [[MemoryStore]] used by this manager to evict cached blocks. + * This must be set after construction due to initialization ordering constraints. + */ + final def setMemoryStore(store: MemoryStore): Unit = { + _memoryStore = store + } + + /** + * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return whether all N bytes were successfully granted. + */ + def acquireMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + acquireMemory(blockId, numBytes, numBytes, evictedBlocks) + } + + /** + * Acquire N bytes of storage memory for the given block, evicting existing ones if necessary. + * + * @param blockId the ID of the block we are acquiring storage memory for + * @param numBytesToAcquire the size of this block + * @param numBytesToFree the size of space to be freed through evicting blocks + * @return whether all N bytes were successfully granted. + */ + def acquireMemory( + blockId: BlockId, + numBytesToAcquire: Long, + numBytesToFree: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = lock.synchronized { + assert(numBytesToAcquire >= 0) + assert(numBytesToFree >= 0) + assert(memoryUsed <= poolSize) + memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) + // Register evicted blocks, if any, with the active task metrics + Option(TaskContext.get()).foreach { tc => + val metrics = tc.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) + } + // NOTE: If the memory store evicts blocks, then those evictions will synchronously call + // back into this StorageMemoryPool in order to free. Therefore, these variables should have + // been updated. + val enoughMemory = numBytesToAcquire <= memoryFree + if (enoughMemory) { + _memoryUsed += numBytesToAcquire + } + enoughMemory + } + + def releaseMemory(size: Long): Unit = lock.synchronized { + if (size > _memoryUsed) { + logWarning(s"Attempted to release $size bytes of storage " + + s"memory when we only have ${_memoryUsed} bytes") + _memoryUsed = 0 + } else { + _memoryUsed -= size + } + } + + def releaseAllMemory(): Unit = lock.synchronized { + _memoryUsed = 0 + } + + /** + * Try to shrink the size of this storage memory pool by `spaceToFree` bytes. Return the number + * of bytes removed from the pool's capacity. + */ + def shrinkPoolToFreeSpace(spaceToFree: Long): Long = lock.synchronized { + // First, shrink the pool by reclaiming free memory: + val spaceFreedByReleasingUnusedMemory = Math.min(spaceToFree, memoryFree) + decrementPoolSize(spaceFreedByReleasingUnusedMemory) + if (spaceFreedByReleasingUnusedMemory == spaceToFree) { + spaceFreedByReleasingUnusedMemory + } else { + // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: + val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + memoryStore.ensureFreeSpace(spaceToFree - spaceFreedByReleasingUnusedMemory, evictedBlocks) + val spaceFreedByEviction = evictedBlocks.map(_._2.memSize).sum + _memoryUsed -= spaceFreedByEviction + decrementPoolSize(spaceFreedByEviction) + spaceFreedByReleasingUnusedMemory + spaceFreedByEviction + } + } +} diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index a3093030a0..8be5b05419 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -22,7 +22,6 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockStatus, BlockId} - /** * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that * either side can borrow memory from the other. @@ -41,98 +40,105 @@ import org.apache.spark.storage.{BlockStatus, BlockId} * The implication is that attempts to cache blocks may fail if execution has already eaten * up most of the storage space, in which case the new blocks will be evicted immediately * according to their respective storage levels. + * + * @param storageRegionSize Size of the storage region, in bytes. + * This region is not statically reserved; execution can borrow from + * it if necessary. Cached blocks can be evicted only if actual + * storage memory usage exceeds this region. */ -private[spark] class UnifiedMemoryManager( +private[spark] class UnifiedMemoryManager private[memory] ( conf: SparkConf, maxMemory: Long, + private val storageRegionSize: Long, numCores: Int) - extends MemoryManager(conf, numCores) { - - def this(conf: SparkConf, numCores: Int) { - this(conf, UnifiedMemoryManager.getMaxMemory(conf), numCores) - } - - /** - * Size of the storage region, in bytes. - * - * This region is not statically reserved; execution can borrow from it if necessary. - * Cached blocks can be evicted only if actual storage memory usage exceeds this region. - */ - private val storageRegionSize: Long = { - (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong - } - - /** - * Total amount of memory, in bytes, not currently occupied by either execution or storage. - */ - private def totalFreeMemory: Long = synchronized { - assert(_executionMemoryUsed <= maxMemory) - assert(_storageMemoryUsed <= maxMemory) - assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory) - maxMemory - _executionMemoryUsed - _storageMemoryUsed - } + extends MemoryManager( + conf, + numCores, + storageRegionSize, + maxMemory - storageRegionSize) { - /** - * Total available memory for execution, in bytes. - * In this model, this is equivalent to the amount of memory not occupied by storage. - */ - override def maxExecutionMemory: Long = synchronized { - maxMemory - _storageMemoryUsed - } + // We always maintain this invariant: + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) - /** - * Total available memory for storage, in bytes. - * In this model, this is equivalent to the amount of memory not occupied by execution. - */ override def maxStorageMemory: Long = synchronized { - maxMemory - _executionMemoryUsed + maxMemory - onHeapExecutionMemoryPool.memoryUsed } /** - * Acquire N bytes of memory for execution, evicting cached blocks if necessary. + * Try to acquire up to `numBytes` of execution memory for the current task and return the + * number of bytes obtained, or 0 if none can be allocated. * - * This method evicts blocks only up to the amount of memory borrowed by storage. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return number of bytes successfully granted (<= N). + * This call may block until there is enough free memory in some situations, to make sure each + * task has a chance to ramp up to at least 1 / 2N of the total memory pool (where N is the # of + * active tasks) before it is forced to spill. This can happen if the number of tasks increase + * but an older task had a lot of memory already. */ - private[memory] override def doAcquireExecutionMemory( + override private[memory] def acquireExecutionMemory( numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { + taskAttemptId: Long, + memoryMode: MemoryMode): Long = synchronized { + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) - val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize) - // If there is not enough free memory AND storage has borrowed some execution memory, - // then evict as much memory borrowed by storage as needed to grant this request - val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0 - if (shouldEvictStorage) { - val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage) - memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks) + memoryMode match { + case MemoryMode.ON_HEAP => + if (numBytes > onHeapExecutionMemoryPool.memoryFree) { + val extraMemoryNeeded = numBytes - onHeapExecutionMemoryPool.memoryFree + // There is not enough free memory in the execution pool, so try to reclaim memory from + // storage. We can reclaim any free memory from the storage pool. If the storage pool + // has grown to become larger than `storageRegionSize`, we can evict blocks and reclaim + // the memory that storage has borrowed from execution. + val memoryReclaimableFromStorage = + math.max(storageMemoryPool.memoryFree, storageMemoryPool.poolSize - storageRegionSize) + if (memoryReclaimableFromStorage > 0) { + // Only reclaim as much space as is necessary and available: + val spaceReclaimed = storageMemoryPool.shrinkPoolToFreeSpace( + math.min(extraMemoryNeeded, memoryReclaimableFromStorage)) + onHeapExecutionMemoryPool.incrementPoolSize(spaceReclaimed) + } + } + onHeapExecutionMemoryPool.acquireMemory(numBytes, taskAttemptId) + case MemoryMode.OFF_HEAP => + // For now, we only support on-heap caching of data, so we do not need to interact with + // the storage pool when allocating off-heap memory. This will change in the future, though. + super.acquireExecutionMemory(numBytes, taskAttemptId, memoryMode) } - val bytesToGrant = math.min(numBytes, totalFreeMemory) - _executionMemoryUsed += bytesToGrant - bytesToGrant } - /** - * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * Blocks evicted in the process, if any, are added to `evictedBlocks`. - * @return whether all N bytes were successfully granted. - */ override def acquireStorageMemory( blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + assert(onHeapExecutionMemoryPool.poolSize + storageMemoryPool.poolSize == maxMemory) assert(numBytes >= 0) - memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks) - val enoughMemory = totalFreeMemory >= numBytes - if (enoughMemory) { - _storageMemoryUsed += numBytes + if (numBytes > storageMemoryPool.memoryFree) { + // There is not enough free memory in the storage pool, so try to borrow free memory from + // the execution pool. + val memoryBorrowedFromExecution = Math.min(onHeapExecutionMemoryPool.memoryFree, numBytes) + onHeapExecutionMemoryPool.decrementPoolSize(memoryBorrowedFromExecution) + storageMemoryPool.incrementPoolSize(memoryBorrowedFromExecution) } - enoughMemory + storageMemoryPool.acquireMemory(blockId, numBytes, evictedBlocks) } + override def acquireUnrollMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes, evictedBlocks) + } } -private object UnifiedMemoryManager { +object UnifiedMemoryManager { + + def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = { + val maxMemory = getMaxMemory(conf) + new UnifiedMemoryManager( + conf, + maxMemory = maxMemory, + storageRegionSize = + (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong, + numCores = numCores) + } /** * Return the total amount of memory shared between execution and storage, in bytes. diff --git a/core/src/main/scala/org/apache/spark/memory/package.scala b/core/src/main/scala/org/apache/spark/memory/package.scala new file mode 100644 index 0000000000..564e30d2ff --- /dev/null +++ b/core/src/main/scala/org/apache/spark/memory/package.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +/** + * This package implements Spark's memory management system. This system consists of two main + * components, a JVM-wide memory manager and a per-task manager: + * + * - [[org.apache.spark.memory.MemoryManager]] manages Spark's overall memory usage within a JVM. + * This component implements the policies for dividing the available memory across tasks and for + * allocating memory between storage (memory used caching and data transfer) and execution (memory + * used by computations, such as shuffles, joins, sorts, and aggregations). + * - [[org.apache.spark.memory.TaskMemoryManager]] manages the memory allocated by individual tasks. + * Tasks interact with TaskMemoryManager and never directly interact with the JVM-wide + * MemoryManager. + * + * Internally, each of these components have additional abstractions for memory bookkeeping: + * + * - [[org.apache.spark.memory.MemoryConsumer]]s are clients of the TaskMemoryManager and + * correspond to individual operators and data structures within a task. The TaskMemoryManager + * receives memory allocation requests from MemoryConsumers and issues callbacks to consumers + * in order to trigger spilling when running low on memory. + * - [[org.apache.spark.memory.MemoryPool]]s are a bookkeeping abstraction used by the + * MemoryManager to track the division of memory between storage and execution. + * + * Diagrammatically: + * + * {{{ + * +-------------+ + * | MemConsumer |----+ +------------------------+ + * +-------------+ | +-------------------+ | MemoryManager | + * +--->| TaskMemoryManager |----+ | | + * +-------------+ | +-------------------+ | | +------------------+ | + * | MemConsumer |----+ | | | StorageMemPool | | + * +-------------+ +-------------------+ | | +------------------+ | + * | TaskMemoryManager |----+ | | + * +-------------------+ | | +------------------+ | + * +---->| |OnHeapExecMemPool | | + * * | | +------------------+ | + * * | | | + * +-------------+ * | | +------------------+ | + * | MemConsumer |----+ | | |OffHeapExecMemPool| | + * +-------------+ | +-------------------+ | | +------------------+ | + * +--->| TaskMemoryManager |----+ | | + * +-------------------+ +------------------------+ + * }}} + * + * + * There are two implementations of [[org.apache.spark.memory.MemoryManager]] which vary in how + * they handle the sizing of their memory pools: + * + * - [[org.apache.spark.memory.UnifiedMemoryManager]], the default in Spark 1.6+, enforces soft + * boundaries between storage and execution memory, allowing requests for memory in one region + * to be fulfilled by borrowing memory from the other. + * - [[org.apache.spark.memory.StaticMemoryManager]] enforces hard boundaries between storage + * and execution memory by statically partitioning Spark's memory and preventing storage and + * execution from borrowing memory from each other. This mode is retained only for legacy + * compatibility purposes. + */ +package object memory diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index 9e002621a6..3a48af82b1 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.{Logging, SparkEnv} /** @@ -78,7 +78,8 @@ private[spark] trait Spillable[C] extends Logging { if (elementsRead % 32 == 0 && currentMemory >= myMemoryThreshold) { // Claim up to double our current memory from the shuffle memory pool val amountToRequest = 2 * currentMemory - myMemoryThreshold - val granted = taskMemoryManager.acquireExecutionMemory(amountToRequest, null) + val granted = + taskMemoryManager.acquireExecutionMemory(amountToRequest, MemoryMode.ON_HEAP, null) myMemoryThreshold += granted // If we were granted too little memory to grow further (either tryToAcquire returned 0, // or we already had more memory than myMemoryThreshold), spill the current collection @@ -107,7 +108,8 @@ private[spark] trait Spillable[C] extends Logging { */ def releaseMemory(): Unit = { // The amount we requested does not include the initial memory tracking threshold - taskMemoryManager.releaseExecutionMemory(myMemoryThreshold - initialMemoryThreshold, null) + taskMemoryManager.releaseExecutionMemory( + myMemoryThreshold - initialMemoryThreshold, MemoryMode.ON_HEAP, null) myMemoryThreshold = initialMemoryThreshold } diff --git a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java index c731317395..711eed0193 100644 --- a/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java +++ b/core/src/test/java/org/apache/spark/memory/TaskMemoryManagerSuite.java @@ -28,8 +28,14 @@ public class TaskMemoryManagerSuite { @Test public void leakedPageMemoryIsDetected() { final TaskMemoryManager manager = new TaskMemoryManager( - new TestMemoryManager(new SparkConf().set("spark.unsafe.offHeap", "false")), 0); + new StaticMemoryManager( + new SparkConf().set("spark.unsafe.offHeap", "false"), + Long.MAX_VALUE, + Long.MAX_VALUE, + 1), + 0); manager.allocatePage(4096, null); // leak memory + Assert.assertEquals(4096, manager.getMemoryConsumptionForThisTask()); Assert.assertEquals(4096, manager.cleanUpAllAllocatedMemory()); } diff --git a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java index 8ae3642738..e6e16fff80 100644 --- a/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java +++ b/core/src/test/java/org/apache/spark/memory/TestMemoryConsumer.java @@ -32,13 +32,19 @@ public class TestMemoryConsumer extends MemoryConsumer { } void use(long size) { - long got = taskMemoryManager.acquireExecutionMemory(size, this); + long got = taskMemoryManager.acquireExecutionMemory( + size, + taskMemoryManager.tungstenMemoryMode, + this); used += got; } void free(long size) { used -= size; - taskMemoryManager.releaseExecutionMemory(size, this); + taskMemoryManager.releaseExecutionMemory( + size, + taskMemoryManager.tungstenMemoryMode, + this); } } diff --git a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java index 4763395d7d..0e0eca515a 100644 --- a/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java +++ b/core/src/test/java/org/apache/spark/shuffle/sort/UnsafeShuffleWriterSuite.java @@ -423,7 +423,7 @@ public class UnsafeShuffleWriterSuite { memoryManager.limit(UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE * 16); final UnsafeShuffleWriter<Object, Object> writer = createWriter(false); final ArrayList<Product2<Object, Object>> dataToWrite = new ArrayList<>(); - for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE; i++) { + for (int i = 0; i < UnsafeShuffleWriter.INITIAL_SORT_BUFFER_SIZE + 1; i++) { dataToWrite.add(new Tuple2<Object, Object>(i, i)); } writer.write(dataToWrite.iterator()); diff --git a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java index 92bd45e5fa..3bca790f30 100644 --- a/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java +++ b/core/src/test/java/org/apache/spark/unsafe/map/AbstractBytesToBytesMapSuite.java @@ -83,7 +83,9 @@ public abstract class AbstractBytesToBytesMapSuite { public void setup() { memoryManager = new TestMemoryManager( - new SparkConf().set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator())); + new SparkConf() + .set("spark.unsafe.offHeap", "" + useOffHeapMemoryAllocator()) + .set("spark.memory.offHeapSize", "256mb")); taskMemoryManager = new TaskMemoryManager(memoryManager, 0); tempDir = Utils.createTempDir(System.getProperty("java.io.tmpdir"), "unsafe-test"); diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index 4a9479cf49..f55d435fa3 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.memory import java.util.concurrent.atomic.AtomicLong +import scala.collection.mutable import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Future} @@ -29,7 +30,7 @@ import org.mockito.stubbing.Answer import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite -import org.apache.spark.storage.MemoryStore +import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, StorageLevel} /** @@ -78,7 +79,12 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " + s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}") val numBytes = args(numBytesPos).asInstanceOf[Long] - mockEnsureFreeSpace(mm, numBytes) + val success = mockEnsureFreeSpace(mm, numBytes) + if (success) { + args.last.asInstanceOf[mutable.Buffer[(BlockId, BlockStatus)]].append( + (null, BlockStatus(StorageLevel.MEMORY_ONLY, numBytes, 0L, 0L))) + } + success } } } @@ -132,93 +138,95 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { } /** - * Create a MemoryManager with the specified execution memory limit and no storage memory. + * Create a MemoryManager with the specified execution memory limits and no storage memory. */ - protected def createMemoryManager(maxExecutionMemory: Long): MemoryManager + protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long = 0L): MemoryManager // -- Tests of sharing of execution memory between tasks ---------------------------------------- // Prior to Spark 1.6, these tests were part of ShuffleMemoryManagerSuite. implicit val ec = ExecutionContext.global - test("single task requesting execution memory") { + test("single task requesting on-heap execution memory") { val manager = createMemoryManager(1000L) val taskMemoryManager = new TaskMemoryManager(manager, 0) - assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 100L) - assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L) - assert(taskMemoryManager.acquireExecutionMemory(400L, null) === 400L) - assert(taskMemoryManager.acquireExecutionMemory(200L, null) === 100L) - assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L) - assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L) + assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 100L) + assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L) + assert(taskMemoryManager.acquireExecutionMemory(400L, MemoryMode.ON_HEAP, null) === 400L) + assert(taskMemoryManager.acquireExecutionMemory(200L, MemoryMode.ON_HEAP, null) === 100L) + assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L) + assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L) - taskMemoryManager.releaseExecutionMemory(500L, null) - assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 300L) - assert(taskMemoryManager.acquireExecutionMemory(300L, null) === 200L) + taskMemoryManager.releaseExecutionMemory(500L, MemoryMode.ON_HEAP, null) + assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 300L) + assert(taskMemoryManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) === 200L) taskMemoryManager.cleanUpAllAllocatedMemory() - assert(taskMemoryManager.acquireExecutionMemory(1000L, null) === 1000L) - assert(taskMemoryManager.acquireExecutionMemory(100L, null) === 0L) + assert(taskMemoryManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) === 1000L) + assert(taskMemoryManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) === 0L) } - test("two tasks requesting full execution memory") { + test("two tasks requesting full on-heap execution memory") { val memoryManager = createMemoryManager(1000L) val t1MemManager = new TaskMemoryManager(memoryManager, 1) val t2MemManager = new TaskMemoryManager(memoryManager, 2) val futureTimeout: Duration = 20.seconds // Have both tasks request 500 bytes, then wait until both requests have been granted: - val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, null) } - val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t1Result1 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } + val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result1, futureTimeout) === 500L) assert(Await.result(t2Result1, futureTimeout) === 500L) // Have both tasks each request 500 bytes more; both should immediately return 0 as they are // both now at 1 / N - val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) } - val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } + val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result2, 200.millis) === 0L) assert(Await.result(t2Result2, 200.millis) === 0L) } - test("two tasks cannot grow past 1 / N of execution memory") { + test("two tasks cannot grow past 1 / N of on-heap execution memory") { val memoryManager = createMemoryManager(1000L) val t1MemManager = new TaskMemoryManager(memoryManager, 1) val t2MemManager = new TaskMemoryManager(memoryManager, 2) val futureTimeout: Duration = 20.seconds // Have both tasks request 250 bytes, then wait until both requests have been granted: - val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, null) } - val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) } + val t1Result1 = Future { t1MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } + val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result1, futureTimeout) === 250L) assert(Await.result(t2Result1, futureTimeout) === 250L) // Have both tasks each request 500 bytes more. // We should only grant 250 bytes to each of them on this second request - val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, null) } - val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t1Result2 = Future { t1MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } + val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result2, futureTimeout) === 250L) assert(Await.result(t2Result2, futureTimeout) === 250L) } - test("tasks can block to get at least 1 / 2N of execution memory") { + test("tasks can block to get at least 1 / 2N of on-heap execution memory") { val memoryManager = createMemoryManager(1000L) val t1MemManager = new TaskMemoryManager(memoryManager, 1) val t2MemManager = new TaskMemoryManager(memoryManager, 2) val futureTimeout: Duration = 20.seconds // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. - val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) } + val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result1, futureTimeout) === 1000L) - val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, null) } + val t2Result1 = Future { t2MemManager.acquireExecutionMemory(250L, MemoryMode.ON_HEAP, null) } // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult // to make sure the other thread blocks for some time otherwise. Thread.sleep(300) - t1MemManager.releaseExecutionMemory(250L, null) + t1MemManager.releaseExecutionMemory(250L, MemoryMode.ON_HEAP, null) // The memory freed from t1 should now be granted to t2. assert(Await.result(t2Result1, futureTimeout) === 250L) // Further requests by t2 should be denied immediately because it now has 1 / 2N of the memory. - val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, null) } + val t2Result2 = Future { t2MemManager.acquireExecutionMemory(100L, MemoryMode.ON_HEAP, null) } assert(Await.result(t2Result2, 200.millis) === 0L) } @@ -229,18 +237,18 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { val futureTimeout: Duration = 20.seconds // t1 grabs 1000 bytes and then waits until t2 is ready to make a request. - val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, null) } + val t1Result1 = Future { t1MemManager.acquireExecutionMemory(1000L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result1, futureTimeout) === 1000L) - val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t2Result1 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } // Make sure that t2 didn't grab the memory right away. This is hacky but it would be difficult // to make sure the other thread blocks for some time otherwise. Thread.sleep(300) // t1 releases all of its memory, so t2 should be able to grab all of the memory t1MemManager.cleanUpAllAllocatedMemory() assert(Await.result(t2Result1, futureTimeout) === 500L) - val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t2Result2 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } assert(Await.result(t2Result2, futureTimeout) === 500L) - val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, null) } + val t2Result3 = Future { t2MemManager.acquireExecutionMemory(500L, MemoryMode.ON_HEAP, null) } assert(Await.result(t2Result3, 200.millis) === 0L) } @@ -251,15 +259,35 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite { val t2MemManager = new TaskMemoryManager(memoryManager, 2) val futureTimeout: Duration = 20.seconds - val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, null) } + val t1Result1 = Future { t1MemManager.acquireExecutionMemory(700L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result1, futureTimeout) === 700L) - val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, null) } + val t2Result1 = Future { t2MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) } assert(Await.result(t2Result1, futureTimeout) === 300L) - val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, null) } + val t1Result2 = Future { t1MemManager.acquireExecutionMemory(300L, MemoryMode.ON_HEAP, null) } assert(Await.result(t1Result2, 200.millis) === 0L) } + + test("off-heap execution allocations cannot exceed limit") { + val memoryManager = createMemoryManager( + maxOnHeapExecutionMemory = 0L, + maxOffHeapExecutionMemory = 1000L) + + val tMemManager = new TaskMemoryManager(memoryManager, 1) + val result1 = Future { tMemManager.acquireExecutionMemory(1000L, MemoryMode.OFF_HEAP, null) } + assert(Await.result(result1, 200.millis) === 1000L) + assert(tMemManager.getMemoryConsumptionForThisTask === 1000L) + + val result2 = Future { tMemManager.acquireExecutionMemory(300L, MemoryMode.OFF_HEAP, null) } + assert(Await.result(result2, 200.millis) === 0L) + + assert(tMemManager.getMemoryConsumptionForThisTask === 1000L) + tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null) + assert(tMemManager.getMemoryConsumptionForThisTask === 500L) + tMemManager.releaseExecutionMemory(500L, MemoryMode.OFF_HEAP, null) + assert(tMemManager.getMemoryConsumptionForThisTask === 0L) + } } private object MemoryManagerSuite { diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index 885c450d6d..54cb28c389 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -24,7 +24,6 @@ import org.mockito.Mockito.when import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] @@ -36,38 +35,47 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { maxExecutionMem: Long, maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { val mm = new StaticMemoryManager( - conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem, numCores = 1) + conf, + maxOnHeapExecutionMemory = maxExecutionMem, + maxStorageMemory = maxStorageMem, + numCores = 1) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): StaticMemoryManager = { new StaticMemoryManager( - conf, - maxExecutionMemory = maxMemory, + conf.clone + .set("spark.memory.fraction", "1") + .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) + .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString), + maxOnHeapExecutionMemory = maxOnHeapExecutionMemory, maxStorageMemory = 0, numCores = 1) } test("basic execution memory") { val maxExecutionMem = 1000L + val taskAttemptId = 0L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) assert(mm.executionMemoryUsed === 0L) - assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) - mm.releaseExecutionMemory(800L) + mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxExecutionMem) + mm.releaseExecutionMemory(maxExecutionMem, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -113,13 +121,14 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { test("execution and storage isolation") { val maxExecutionMem = 200L val maxStorageMem = 1000L + val taskAttemptId = 0L val dummyBlock = TestBlockId("ain't nobody love like you do") val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) // Only execution memory should increase - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase @@ -128,7 +137,7 @@ class StaticMemoryManagerSuite extends MemoryManagerSuite { assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released - mm.releaseExecutionMemory(133L) + mm.releaseExecutionMemory(133L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 67L) // Only storage memory should be released diff --git a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala index 77e43554ee..0706a6e45d 100644 --- a/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala +++ b/core/src/test/scala/org/apache/spark/memory/TestMemoryManager.scala @@ -22,19 +22,20 @@ import scala.collection.mutable import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockStatus, BlockId} -class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = 1) { - private[memory] override def doAcquireExecutionMemory( +class TestMemoryManager(conf: SparkConf) + extends MemoryManager(conf, numCores = 1, Long.MaxValue, Long.MaxValue) { + + override private[memory] def acquireExecutionMemory( numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { + taskAttemptId: Long, + memoryMode: MemoryMode): Long = { if (oomOnce) { oomOnce = false 0 } else if (available >= numBytes) { - _executionMemoryUsed += numBytes // To suppress warnings when freeing unallocated memory available -= numBytes numBytes } else { - _executionMemoryUsed += available val grant = available available = 0 grant @@ -48,12 +49,13 @@ class TestMemoryManager(conf: SparkConf) extends MemoryManager(conf, numCores = blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def releaseExecutionMemory(numBytes: Long): Unit = { + override def releaseStorageMemory(numBytes: Long): Unit = {} + override private[memory] def releaseExecutionMemory( + numBytes: Long, + taskAttemptId: Long, + memoryMode: MemoryMode): Unit = { available += numBytes - _executionMemoryUsed -= numBytes } - override def releaseStorageMemory(numBytes: Long): Unit = {} - override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue private var oomOnce = false diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala index 0c97f2bd89..8cebe81c3b 100644 --- a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -24,57 +24,52 @@ import org.scalatest.PrivateMethodTester import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} - class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester { - private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5") private val dummyBlock = TestBlockId("--") private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + private val storageFraction: Double = 0.5 + /** * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies. */ private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = { - val mm = new UnifiedMemoryManager(conf, maxMemory, numCores = 1) + val mm = createMemoryManager(maxMemory) val ms = makeMemoryStore(mm) (mm, ms) } - override protected def createMemoryManager(maxMemory: Long): MemoryManager = { - new UnifiedMemoryManager(conf, maxMemory, numCores = 1) - } - - private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = { - mm invokePrivate PrivateMethod[Long]('storageRegionSize)() - } - - test("storage region size") { - val maxMemory = 1000L - val (mm, _) = makeThings(maxMemory) - val storageFraction = conf.get("spark.memory.storageFraction").toDouble - val expectedStorageRegionSize = maxMemory * storageFraction - val actualStorageRegionSize = getStorageRegionSize(mm) - assert(expectedStorageRegionSize === actualStorageRegionSize) + override protected def createMemoryManager( + maxOnHeapExecutionMemory: Long, + maxOffHeapExecutionMemory: Long): UnifiedMemoryManager = { + val conf = new SparkConf() + .set("spark.memory.fraction", "1") + .set("spark.testing.memory", maxOnHeapExecutionMemory.toString) + .set("spark.memory.offHeapSize", maxOffHeapExecutionMemory.toString) + .set("spark.memory.storageFraction", storageFraction.toString) + UnifiedMemoryManager(conf, numCores = 1) } test("basic execution memory") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, _) = makeThings(maxMemory) assert(mm.executionMemoryUsed === 0L) - assert(mm.doAcquireExecutionMemory(10L, evictedBlocks) === 10L) + assert(mm.acquireExecutionMemory(10L, taskAttemptId, MemoryMode.ON_HEAP) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) // Acquire up to the max - assert(mm.doAcquireExecutionMemory(1000L, evictedBlocks) === 890L) + assert(mm.acquireExecutionMemory(1000L, taskAttemptId, MemoryMode.ON_HEAP) === 890L) assert(mm.executionMemoryUsed === maxMemory) - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 0L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 0L) assert(mm.executionMemoryUsed === maxMemory) - mm.releaseExecutionMemory(800L) + mm.releaseExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.doAcquireExecutionMemory(1L, evictedBlocks) === 1L) + assert(mm.acquireExecutionMemory(1L, taskAttemptId, MemoryMode.ON_HEAP) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired - mm.releaseExecutionMemory(maxMemory) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) assert(mm.executionMemoryUsed === 0L) } @@ -118,44 +113,34 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("execution evicts storage") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) - // First, ensure the test classes are set up as expected - val expectedStorageRegionSize = 500L - val expectedExecutionRegionSize = 500L - val storageRegionSize = getStorageRegionSize(mm) - val executionRegionSize = maxMemory - expectedStorageRegionSize - require(storageRegionSize === expectedStorageRegionSize, - "bad test: storage region size is unexpected") - require(executionRegionSize === expectedExecutionRegionSize, - "bad test: storage region size is unexpected") // Acquire enough storage memory to exceed the storage region assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 750L) assert(mm.executionMemoryUsed === 0L) assert(mm.storageMemoryUsed === 750L) - require(mm.storageMemoryUsed > storageRegionSize, - s"bad test: storage memory used should exceed the storage region") // Execution needs to request 250 bytes to evict storage memory - assert(mm.doAcquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.acquireExecutionMemory(100L, taskAttemptId, MemoryMode.ON_HEAP) === 100L) assert(mm.executionMemoryUsed === 100L) assert(mm.storageMemoryUsed === 750L) assertEnsureFreeSpaceNotCalled(ms) // Execution wants 200 bytes but only 150 are free, so storage is evicted - assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L) - assertEnsureFreeSpaceCalled(ms, 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) + assert(mm.executionMemoryUsed === 300L) + assertEnsureFreeSpaceCalled(ms, 50L) assert(mm.executionMemoryUsed === 300L) mm.releaseAllStorageMemory() - require(mm.executionMemoryUsed < executionRegionSize, - s"bad test: execution memory used should be within the execution region") + require(mm.executionMemoryUsed === 300L) require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") // Acquire some storage memory again, but this time keep it within the storage region assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) assertEnsureFreeSpaceCalled(ms, 400L) - require(mm.storageMemoryUsed < storageRegionSize, - s"bad test: storage memory used should be within the storage region") + assert(mm.storageMemoryUsed === 400L) + assert(mm.executionMemoryUsed === 300L) // Execution cannot evict storage because the latter is within the storage fraction, // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300 - assert(mm.doAcquireExecutionMemory(400L, evictedBlocks) === 300L) + assert(mm.acquireExecutionMemory(400L, taskAttemptId, MemoryMode.ON_HEAP) === 300L) assert(mm.executionMemoryUsed === 600L) assert(mm.storageMemoryUsed === 400L) assertEnsureFreeSpaceNotCalled(ms) @@ -163,23 +148,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes test("storage does not evict execution") { val maxMemory = 1000L + val taskAttemptId = 0L val (mm, ms) = makeThings(maxMemory) - // First, ensure the test classes are set up as expected - val expectedStorageRegionSize = 500L - val expectedExecutionRegionSize = 500L - val storageRegionSize = getStorageRegionSize(mm) - val executionRegionSize = maxMemory - expectedStorageRegionSize - require(storageRegionSize === expectedStorageRegionSize, - "bad test: storage region size is unexpected") - require(executionRegionSize === expectedExecutionRegionSize, - "bad test: storage region size is unexpected") // Acquire enough execution memory to exceed the execution region - assert(mm.doAcquireExecutionMemory(800L, evictedBlocks) === 800L) + assert(mm.acquireExecutionMemory(800L, taskAttemptId, MemoryMode.ON_HEAP) === 800L) assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 0L) assertEnsureFreeSpaceNotCalled(ms) - require(mm.executionMemoryUsed > executionRegionSize, - s"bad test: execution memory used should exceed the execution region") // Storage should not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) assert(mm.executionMemoryUsed === 800L) @@ -189,15 +164,13 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes assert(mm.executionMemoryUsed === 800L) assert(mm.storageMemoryUsed === 100L) assertEnsureFreeSpaceCalled(ms, 250L) - mm.releaseExecutionMemory(maxMemory) + mm.releaseExecutionMemory(maxMemory, taskAttemptId, MemoryMode.ON_HEAP) mm.releaseStorageMemory(maxMemory) // Acquire some execution memory again, but this time keep it within the execution region - assert(mm.doAcquireExecutionMemory(200L, evictedBlocks) === 200L) + assert(mm.acquireExecutionMemory(200L, taskAttemptId, MemoryMode.ON_HEAP) === 200L) assert(mm.executionMemoryUsed === 200L) assert(mm.storageMemoryUsed === 0L) assertEnsureFreeSpaceNotCalled(ms) - require(mm.executionMemoryUsed < executionRegionSize, - s"bad test: execution memory used should be within the execution region") // Storage should still not be able to evict execution assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) assert(mm.executionMemoryUsed === 200L) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index d49015afcd..53991d8a1a 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -825,7 +825,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) val memoryManager = new StaticMemoryManager( conf, - maxExecutionMemory = Long.MaxValue, + maxOnHeapExecutionMemory = Long.MaxValue, maxStorageMemory = 1200, numCores = 1) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, |