diff options
21 files changed, 840 insertions, 306 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b344b5e173..1a0ac3d017 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -418,16 +418,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { } // Validate memory fractions - val memoryKeys = Seq( + val deprecatedMemoryKeys = Seq( "spark.storage.memoryFraction", "spark.shuffle.memoryFraction", "spark.shuffle.safetyFraction", "spark.storage.unrollFraction", "spark.storage.safetyFraction") + val memoryKeys = Seq( + "spark.memory.fraction", + "spark.memory.storageFraction") ++ + deprecatedMemoryKeys for (key <- memoryKeys) { val value = getDouble(key, 0.5) if (value > 1 || value < 0) { - throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').") + throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').") + } + } + + // Warn against deprecated memory fractions (unless legacy memory management mode is enabled) + val legacyMemoryManagementKey = "spark.memory.useLegacyMode" + val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false) + if (!legacyMemoryManagement) { + val keyset = deprecatedMemoryKeys.toSet + val detected = settings.keys().asScala.filter(keyset.contains) + if (detected.nonEmpty) { + logWarning("Detected deprecated memory fraction settings: " + + detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " + + "memory management are unified. All memory fractions used in the old model are " + + "now deprecated and no longer read. If you wish to use the old memory management, " + + s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).") } } diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index df3d84a1f0..c329983451 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.memory.{MemoryManager, StaticMemoryManager} +import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager} import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv} @@ -335,7 +335,14 @@ object SparkEnv extends Logging { val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) - val memoryManager = new StaticMemoryManager(conf) + val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false) + val memoryManager: MemoryManager = + if (useLegacyMemoryManager) { + new StaticMemoryManager(conf) + } else { + new UnifiedMemoryManager(conf) + } + val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores) val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores) diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala index 4bf73b6969..7168ac5491 100644 --- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala @@ -19,6 +19,7 @@ package org.apache.spark.memory import scala.collection.mutable +import org.apache.spark.Logging import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} @@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore} * sorts and aggregations, while storage memory refers to that used for caching and propagating * internal data across the cluster. There exists one of these per JVM. */ -private[spark] abstract class MemoryManager { +private[spark] abstract class MemoryManager extends Logging { // The memory store used to evict cached blocks private var _memoryStore: MemoryStore = _ @@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager { _memoryStore } + // Amount of execution/storage memory in use, accesses must be synchronized on `this` + protected var _executionMemoryUsed: Long = 0 + protected var _storageMemoryUsed: Long = 0 + /** * Set the [[MemoryStore]] used by this manager to evict cached blocks. * This must be set after construction due to initialization ordering constraints. */ - def setMemoryStore(store: MemoryStore): Unit = { + final def setMemoryStore(store: MemoryStore): Unit = { _memoryStore = store } /** - * Acquire N bytes of memory for execution. + * Total available memory for execution, in bytes. + */ + def maxExecutionMemory: Long + + /** + * Total available memory for storage, in bytes. + */ + def maxStorageMemory: Long + + // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985) + + /** + * Acquire N bytes of memory for execution, evicting cached blocks if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. * @return number of bytes successfully granted (<= N). */ - def acquireExecutionMemory(numBytes: Long): Long + def acquireExecutionMemory( + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. @@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager { /** * Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary. + * + * This extra method allows subclasses to differentiate behavior between acquiring storage + * memory and acquiring unroll memory. For instance, the memory management model in Spark + * 1.5 and before places a limit on the amount of space that can be freed from unrolling. * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * * @return whether all N bytes were successfully granted. */ def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + acquireStorageMemory(blockId, numBytes, evictedBlocks) + } /** * Release N bytes of execution memory. */ - def releaseExecutionMemory(numBytes: Long): Unit + def releaseExecutionMemory(numBytes: Long): Unit = synchronized { + if (numBytes > _executionMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of execution " + + s"memory when we only have ${_executionMemoryUsed} bytes") + _executionMemoryUsed = 0 + } else { + _executionMemoryUsed -= numBytes + } + } /** * Release N bytes of storage memory. */ - def releaseStorageMemory(numBytes: Long): Unit + def releaseStorageMemory(numBytes: Long): Unit = synchronized { + if (numBytes > _storageMemoryUsed) { + logWarning(s"Attempted to release $numBytes bytes of storage " + + s"memory when we only have ${_storageMemoryUsed} bytes") + _storageMemoryUsed = 0 + } else { + _storageMemoryUsed -= numBytes + } + } /** * Release all storage memory acquired. */ - def releaseStorageMemory(): Unit + def releaseAllStorageMemory(): Unit = synchronized { + _storageMemoryUsed = 0 + } /** * Release N bytes of unroll memory. */ - def releaseUnrollMemory(numBytes: Long): Unit - - /** - * Total available memory for execution, in bytes. - */ - def maxExecutionMemory: Long - - /** - * Total available memory for storage, in bytes. - */ - def maxStorageMemory: Long + def releaseUnrollMemory(numBytes: Long): Unit = synchronized { + releaseStorageMemory(numBytes) + } /** * Execution memory currently in use, in bytes. */ - def executionMemoryUsed: Long + final def executionMemoryUsed: Long = synchronized { + _executionMemoryUsed + } /** * Storage memory currently in use, in bytes. */ - def storageMemoryUsed: Long + final def storageMemoryUsed: Long = synchronized { + _storageMemoryUsed + } } diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala index 150445edb9..fa44f37234 100644 --- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala @@ -19,7 +19,7 @@ package org.apache.spark.memory import scala.collection.mutable -import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus} @@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager( conf: SparkConf, override val maxExecutionMemory: Long, override val maxStorageMemory: Long) - extends MemoryManager with Logging { - - // Max number of bytes worth of blocks to evict when unrolling - private val maxMemoryToEvictForUnroll: Long = { - (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong - } - - // Amount of execution / storage memory in use - // Accesses must be synchronized on `this` - private var _executionMemoryUsed: Long = 0 - private var _storageMemoryUsed: Long = 0 + extends MemoryManager { def this(conf: SparkConf) { this( @@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager( StaticMemoryManager.getMaxStorageMemory(conf)) } + // Max number of bytes worth of blocks to evict when unrolling + private val maxMemoryToEvictForUnroll: Long = { + (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong + } + /** * Acquire N bytes of memory for execution. * @return number of bytes successfully granted (<= N). */ - override def acquireExecutionMemory(numBytes: Long): Long = synchronized { + override def acquireExecutionMemory( + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { + assert(numBytes >= 0) assert(_executionMemoryUsed <= maxExecutionMemory) val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed) _executionMemoryUsed += bytesToGrant @@ -72,7 +70,7 @@ private[spark] class StaticMemoryManager( override def acquireStorageMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks) } @@ -88,7 +86,7 @@ private[spark] class StaticMemoryManager( override def acquireUnrollMemory( blockId: BlockId, numBytes: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { val currentUnrollMemory = memoryStore.currentUnrollMemory val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory) val numBytesToFree = math.min(numBytes, maxNumBytesToFree) @@ -108,71 +106,16 @@ private[spark] class StaticMemoryManager( blockId: BlockId, numBytesToAcquire: Long, numBytesToFree: Long, - evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - // Note: Keep this outside synchronized block to avoid potential deadlocks! + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + assert(numBytesToAcquire >= 0) + assert(numBytesToFree >= 0) memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks) - synchronized { - assert(_storageMemoryUsed <= maxStorageMemory) - val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory - if (enoughMemory) { - _storageMemoryUsed += numBytesToAcquire - } - enoughMemory - } - } - - /** - * Release N bytes of execution memory. - */ - override def releaseExecutionMemory(numBytes: Long): Unit = synchronized { - if (numBytes > _executionMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of execution " + - s"memory when we only have ${_executionMemoryUsed} bytes") - _executionMemoryUsed = 0 - } else { - _executionMemoryUsed -= numBytes - } - } - - /** - * Release N bytes of storage memory. - */ - override def releaseStorageMemory(numBytes: Long): Unit = synchronized { - if (numBytes > _storageMemoryUsed) { - logWarning(s"Attempted to release $numBytes bytes of storage " + - s"memory when we only have ${_storageMemoryUsed} bytes") - _storageMemoryUsed = 0 - } else { - _storageMemoryUsed -= numBytes + assert(_storageMemoryUsed <= maxStorageMemory) + val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory + if (enoughMemory) { + _storageMemoryUsed += numBytesToAcquire } - } - - /** - * Release all storage memory acquired. - */ - override def releaseStorageMemory(): Unit = synchronized { - _storageMemoryUsed = 0 - } - - /** - * Release N bytes of unroll memory. - */ - override def releaseUnrollMemory(numBytes: Long): Unit = { - releaseStorageMemory(numBytes) - } - - /** - * Amount of execution memory currently in use, in bytes. - */ - override def executionMemoryUsed: Long = synchronized { - _executionMemoryUsed - } - - /** - * Amount of storage memory currently in use, in bytes. - */ - override def storageMemoryUsed: Long = synchronized { - _storageMemoryUsed + enoughMemory } } @@ -184,9 +127,10 @@ private[spark] object StaticMemoryManager { * Return the total amount of memory available for the storage region, in bytes. */ private def getMaxStorageMemory(conf: SparkConf): Long = { + val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6) val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + (systemMaxMemory * memoryFraction * safetyFraction).toLong } @@ -194,9 +138,10 @@ private[spark] object StaticMemoryManager { * Return the total amount of memory available for the execution region, in bytes. */ private def getMaxExecutionMemory(conf: SparkConf): Long = { + val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2) val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8) - (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong + (systemMaxMemory * memoryFraction * safetyFraction).toLong } } diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala new file mode 100644 index 0000000000..5bf78d5b67 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import scala.collection.mutable + +import org.apache.spark.SparkConf +import org.apache.spark.storage.{BlockStatus, BlockId} + + +/** + * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that + * either side can borrow memory from the other. + * + * The region shared between execution and storage is a fraction of the total heap space + * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary + * within this space is further determined by `spark.memory.storageFraction` (default 0.5). + * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default. + * + * Storage can borrow as much execution memory as is free until execution reclaims its space. + * When this happens, cached blocks will be evicted from memory until sufficient borrowed + * memory is released to satisfy the execution memory request. + * + * Similarly, execution can borrow as much storage memory as is free. However, execution + * memory is *never* evicted by storage due to the complexities involved in implementing this. + * The implication is that attempts to cache blocks may fail if execution has already eaten + * up most of the storage space, in which case the new blocks will be evicted immediately + * according to their respective storage levels. + */ +private[spark] class UnifiedMemoryManager(conf: SparkConf, maxMemory: Long) extends MemoryManager { + + def this(conf: SparkConf) { + this(conf, UnifiedMemoryManager.getMaxMemory(conf)) + } + + /** + * Size of the storage region, in bytes. + * + * This region is not statically reserved; execution can borrow from it if necessary. + * Cached blocks can be evicted only if actual storage memory usage exceeds this region. + */ + private val storageRegionSize: Long = { + (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong + } + + /** + * Total amount of memory, in bytes, not currently occupied by either execution or storage. + */ + private def totalFreeMemory: Long = synchronized { + assert(_executionMemoryUsed <= maxMemory) + assert(_storageMemoryUsed <= maxMemory) + assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory) + maxMemory - _executionMemoryUsed - _storageMemoryUsed + } + + /** + * Total available memory for execution, in bytes. + * In this model, this is equivalent to the amount of memory not occupied by storage. + */ + override def maxExecutionMemory: Long = synchronized { + maxMemory - _storageMemoryUsed + } + + /** + * Total available memory for storage, in bytes. + * In this model, this is equivalent to the amount of memory not occupied by execution. + */ + override def maxStorageMemory: Long = synchronized { + maxMemory - _executionMemoryUsed + } + + /** + * Acquire N bytes of memory for execution, evicting cached blocks if necessary. + * + * This method evicts blocks only up to the amount of memory borrowed by storage. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return number of bytes successfully granted (<= N). + */ + override def acquireExecutionMemory( + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized { + assert(numBytes >= 0) + val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize) + // If there is not enough free memory AND storage has borrowed some execution memory, + // then evict as much memory borrowed by storage as needed to grant this request + val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0 + if (shouldEvictStorage) { + val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage) + memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks) + } + val bytesToGrant = math.min(numBytes, totalFreeMemory) + _executionMemoryUsed += bytesToGrant + bytesToGrant + } + + /** + * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. + * Blocks evicted in the process, if any, are added to `evictedBlocks`. + * @return whether all N bytes were successfully granted. + */ + override def acquireStorageMemory( + blockId: BlockId, + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized { + assert(numBytes >= 0) + memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks) + val enoughMemory = totalFreeMemory >= numBytes + if (enoughMemory) { + _storageMemoryUsed += numBytes + } + enoughMemory + } + +} + +private object UnifiedMemoryManager { + + /** + * Return the total amount of memory shared between execution and storage, in bytes. + */ + private def getMaxMemory(conf: SparkConf): Long = { + val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory) + val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75) + (systemMaxMemory * memoryFraction).toLong + } +} diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala index bb64bb3f35..aaf543ce92 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala @@ -18,11 +18,13 @@ package org.apache.spark.shuffle import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer import com.google.common.annotations.VisibleForTesting import org.apache.spark._ import org.apache.spark.memory.{StaticMemoryManager, MemoryManager} +import org.apache.spark.storage.{BlockId, BlockStatus} import org.apache.spark.unsafe.array.ByteArrayMethods /** @@ -36,8 +38,8 @@ import org.apache.spark.unsafe.array.ByteArrayMethods * If there are N tasks, it ensures that each tasks can acquire at least 1 / 2N of the memory * before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the * set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever - * this set changes. This is all done by synchronizing access on "this" to mutate state and using - * wait() and notifyAll() to signal changes. + * this set changes. This is all done by synchronizing access to `memoryManager` to mutate state + * and using wait() and notifyAll() to signal changes. * * Use `ShuffleMemoryManager.create()` factory method to create a new instance. * @@ -51,7 +53,6 @@ class ShuffleMemoryManager protected ( extends Logging { private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes - private val maxMemory = memoryManager.maxExecutionMemory private def currentTaskAttemptId(): Long = { // In case this is called on the driver, return an invalid task attempt id. @@ -65,7 +66,7 @@ class ShuffleMemoryManager protected ( * total memory pool (where N is the # of active tasks) before it is forced to spill. This can * happen if the number of tasks increases but an older task had a lot of memory already. */ - def tryToAcquire(numBytes: Long): Long = synchronized { + def tryToAcquire(numBytes: Long): Long = memoryManager.synchronized { val taskAttemptId = currentTaskAttemptId() assert(numBytes > 0, "invalid number of bytes requested: " + numBytes) @@ -73,15 +74,18 @@ class ShuffleMemoryManager protected ( // of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire if (!taskMemory.contains(taskAttemptId)) { taskMemory(taskAttemptId) = 0L - notifyAll() // Will later cause waiting tasks to wake up and check numTasks again + // This will later cause waiting tasks to wake up and check numTasks again + memoryManager.notifyAll() } // Keep looping until we're either sure that we don't want to grant this request (because this // task would have more than 1 / numActiveTasks of the memory) or we have enough free // memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)). + // TODO: simplify this to limit each task to its own slot while (true) { val numActiveTasks = taskMemory.keys.size val curMem = taskMemory(taskAttemptId) + val maxMemory = memoryManager.maxExecutionMemory val freeMemory = maxMemory - taskMemory.values.sum // How much we can grant this task; don't let it grow to more than 1 / numActiveTasks; @@ -99,7 +103,7 @@ class ShuffleMemoryManager protected ( } else { logInfo( s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free") - wait() + memoryManager.wait() } } else { return acquire(toGrant) @@ -112,15 +116,23 @@ class ShuffleMemoryManager protected ( * Acquire N bytes of execution memory from the memory manager for the current task. * @return number of bytes actually acquired (<= N). */ - private def acquire(numBytes: Long): Long = synchronized { + private def acquire(numBytes: Long): Long = memoryManager.synchronized { val taskAttemptId = currentTaskAttemptId() - val acquired = memoryManager.acquireExecutionMemory(numBytes) + val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + val acquired = memoryManager.acquireExecutionMemory(numBytes, evictedBlocks) + // Register evicted blocks, if any, with the active task metrics + // TODO: just do this in `acquireExecutionMemory` (SPARK-10985) + Option(TaskContext.get()).foreach { tc => + val metrics = tc.taskMetrics() + val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]()) + metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq) + } taskMemory(taskAttemptId) += acquired acquired } /** Release numBytes bytes for the current task. */ - def release(numBytes: Long): Unit = synchronized { + def release(numBytes: Long): Unit = memoryManager.synchronized { val taskAttemptId = currentTaskAttemptId() val curMem = taskMemory.getOrElse(taskAttemptId, 0L) if (curMem < numBytes) { @@ -129,20 +141,20 @@ class ShuffleMemoryManager protected ( } taskMemory(taskAttemptId) -= numBytes memoryManager.releaseExecutionMemory(numBytes) - notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed + memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed } /** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */ - def releaseMemoryForThisTask(): Unit = synchronized { + def releaseMemoryForThisTask(): Unit = memoryManager.synchronized { val taskAttemptId = currentTaskAttemptId() taskMemory.remove(taskAttemptId).foreach { numBytes => memoryManager.releaseExecutionMemory(numBytes) } - notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed + memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed } /** Returns the memory consumption, in bytes, for the current task */ - def getMemoryConsumptionForThisTask(): Long = synchronized { + def getMemoryConsumptionForThisTask(): Long = memoryManager.synchronized { val taskAttemptId = currentTaskAttemptId() taskMemory.getOrElse(taskAttemptId, 0L) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 9f5bd2abbd..c374b93766 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -91,6 +91,10 @@ private[spark] class BlockManager( } memoryManager.setMemoryStore(memoryStore) + // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time. + // However, since we use this only for reporting and logging, what we actually want here is + // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need + // to revisit whether reporting this value as the "max" is intuitive to the user. private val maxMemory = memoryManager.maxStorageMemory private[spark] diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala index 35c57b923c..4dbac388e0 100644 --- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala @@ -37,15 +37,14 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean) private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager) extends BlockStore(blockManager) { + // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and + // acquiring or releasing unroll memory, must be synchronized on `memoryManager`! + private val conf = blockManager.conf private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true) - private val maxMemory = memoryManager.maxStorageMemory - - // Ensure only one thread is putting, and if necessary, dropping blocks at any given time - private val accountingLock = new Object // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) - // All accesses of this map are assumed to have manually synchronized on `accountingLock` + // All accesses of this map are assumed to have manually synchronized on `memoryManager` private val unrollMemoryMap = mutable.HashMap[Long, Long]() // Same as `unrollMemoryMap`, but for pending unroll memory as defined below. // Pending unroll memory refers to the intermediate memory occupied by a task @@ -60,6 +59,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo private val unrollMemoryThreshold: Long = conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024) + /** Total amount of memory available for storage, in bytes. */ + private def maxMemory: Long = memoryManager.maxStorageMemory + if (maxMemory < unrollMemoryThreshold) { logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " + s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " + @@ -75,7 +77,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * Amount of storage memory, in bytes, used for caching blocks. * This does not include memory used for unrolling. */ - private def blocksMemoryUsed: Long = memoryUsed - currentUnrollMemory + private def blocksMemoryUsed: Long = memoryManager.synchronized { + memoryUsed - currentUnrollMemory + } override def getSize(blockId: BlockId): Long = { entries.synchronized { @@ -208,7 +212,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - override def remove(blockId: BlockId): Boolean = { + override def remove(blockId: BlockId): Boolean = memoryManager.synchronized { val entry = entries.synchronized { entries.remove(blockId) } if (entry != null) { memoryManager.releaseStorageMemory(entry.size) @@ -220,11 +224,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } - override def clear() { + override def clear(): Unit = memoryManager.synchronized { entries.synchronized { entries.clear() } - memoryManager.releaseStorageMemory() + unrollMemoryMap.clear() + pendingUnrollMemoryMap.clear() + memoryManager.releaseAllStorageMemory() logInfo("MemoryStore cleared") } @@ -299,22 +305,23 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } } finally { - // If we return an array, the values returned will later be cached in `tryToPut`. - // In this case, we should release the memory after we cache the block there. - // Otherwise, if we return an iterator, we release the memory reserved here - // later when the task finishes. + // If we return an array, the values returned here will be cached in `tryToPut` later. + // In this case, we should release the memory only after we cache the block there. if (keepUnrolling) { val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { - // Here, we transfer memory from unroll to pending unroll because we expect to cache this - // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in - // order to avoid race conditions where another component steals the memory that we're - // trying to transfer. + memoryManager.synchronized { + // Since we continue to hold onto the array until we actually cache it, we cannot + // release the unroll memory yet. Instead, we transfer it to pending unroll memory + // so `tryToPut` can further transfer it to normal storage memory later. + // TODO: we can probably express this without pending unroll memory (SPARK-10907) val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved unrollMemoryMap(taskAttemptId) -= amountToTransferToPending pendingUnrollMemoryMap(taskAttemptId) = pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending } + } else { + // Otherwise, if we return an iterator, we can only release the unroll memory when + // the task finishes since we don't know when the iterator will be consumed. } } } @@ -343,7 +350,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be * created to avoid OOM since it may be a big ByteBuffer. * - * Synchronize on `accountingLock` to ensure that all the put requests and its associated block + * Synchronize on `memoryManager` to ensure that all the put requests and its associated block * dropping is done by only on thread at a time. Otherwise while one thread is dropping * blocks to free memory for one block, another thread may use up the freed space for * another block. @@ -365,16 +372,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo * for freeing up more space for another block that needs to be put. Only then the actually * dropping of blocks (and writing to disk if necessary) can proceed in parallel. */ - accountingLock.synchronized { + memoryManager.synchronized { // Note: if we have previously unrolled this block successfully, then pending unroll // memory should be non-zero. This is the amount that we already reserved during the // unrolling process. In this case, we can just reuse this space to cache our block. - // - // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the - // synchronization on `accountingLock` guarantees that the release of unroll memory and - // acquisition of storage memory happens atomically. However, if storage memory is acquired - // outside of MemoryStore or if unroll memory is counted as execution memory, then we will - // have to revisit this assumption. See SPARK-10983 for more context. + // The synchronization on `memoryManager` here guarantees that the release and acquire + // happen atomically. This relies on the assumption that all memory acquisitions are + // synchronized on the same lock. releasePendingUnrollMemoryForThisTask() val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks) if (enoughMemory) { @@ -402,33 +406,61 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } /** + * Try to free up a given amount of space by evicting existing blocks. + * + * @param space the amount of memory to free, in bytes + * @param droppedBlocks a holder for blocks evicted in the process + * @return whether the requested free space is freed. + */ + private[spark] def ensureFreeSpace( + space: Long, + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + ensureFreeSpace(None, space, droppedBlocks) + } + + /** + * Try to free up a given amount of space to store a block by evicting existing ones. + * + * @param space the amount of memory to free, in bytes + * @param droppedBlocks a holder for blocks evicted in the process + * @return whether the requested free space is freed. + */ + private[spark] def ensureFreeSpace( + blockId: BlockId, + space: Long, + droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { + ensureFreeSpace(Some(blockId), space, droppedBlocks) + } + + /** * Try to free up a given amount of space to store a particular block, but can fail if * either the block is bigger than our memory or it would require replacing another block * from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that * don't fit into memory that we want to avoid). * - * @param blockId the ID of the block we are freeing space for + * @param blockId the ID of the block we are freeing space for, if any * @param space the size of this block * @param droppedBlocks a holder for blocks evicted in the process - * @return whether there is enough free space. + * @return whether the requested free space is freed. */ - private[spark] def ensureFreeSpace( - blockId: BlockId, + private def ensureFreeSpace( + blockId: Option[BlockId], space: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - accountingLock.synchronized { + memoryManager.synchronized { val freeMemory = maxMemory - memoryUsed - val rddToAdd = getRddId(blockId) + val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] var selectedMemory = 0L - logInfo(s"Ensuring $space bytes of free space for block $blockId " + + logInfo(s"Ensuring $space bytes of free space " + + blockId.map { id => s"for block $id" }.getOrElse("") + s"(free: $freeMemory, max: $maxMemory)") // Fail fast if the block simply won't fit if (space > maxMemory) { - logInfo(s"Will not store $blockId as the required space " + - s"($space bytes) than our memory limit ($maxMemory bytes)") + logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") + + s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)") return false } @@ -471,8 +503,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo } true } else { - logInfo(s"Will not store $blockId as it would require dropping another block " + - "from the same RDD") + blockId.foreach { id => + logInfo(s"Will not store $id as it would require dropping another block " + + "from the same RDD") + } false } } @@ -495,8 +529,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo blockId: BlockId, memory: Long, droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = { - accountingLock.synchronized { - // Note: all acquisitions of unroll memory must be synchronized on `accountingLock` + memoryManager.synchronized { val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks) if (success) { val taskAttemptId = currentTaskAttemptId() @@ -512,7 +545,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo */ def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { + memoryManager.synchronized { if (unrollMemoryMap.contains(taskAttemptId)) { val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { @@ -531,7 +564,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo */ def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() - accountingLock.synchronized { + memoryManager.synchronized { if (pendingUnrollMemoryMap.contains(taskAttemptId)) { val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { @@ -548,21 +581,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo /** * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ - def currentUnrollMemory: Long = accountingLock.synchronized { + def currentUnrollMemory: Long = memoryManager.synchronized { unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum } /** * Return the amount of memory currently occupied for unrolling blocks by this task. */ - def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized { + def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized { unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) } /** * Return the number of tasks currently unrolling blocks. */ - private def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size } + private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size } /** * Log information about current memory usage. diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index 29c5732f5a..6a96b5dc12 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -48,16 +48,6 @@ import org.apache.spark.executor.ShuffleWriteMetrics * However, if the spill threshold is too low, we spill frequently and incur unnecessary disk * writes. This may lead to a performance regression compared to the normal case of using the * non-spilling AppendOnlyMap. - * - * Two parameters control the memory threshold: - * - * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing - * these maps as a fraction of the executor's total memory. Since each concurrently running - * task maintains one map, the actual threshold for each map is this quantity divided by the - * number of running tasks. - * - * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of - * this threshold, in case map size estimation is not sufficiently accurate. */ @DeveloperApi class ExternalAppendOnlyMap[K, V, C]( diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 600c1403b0..34a4bb968e 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -213,11 +213,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex } test("compute when only some partitions fit in memory") { - val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01") - sc = new SparkContext(clusterUrl, "test", conf) - // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache - // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions - // to make sure that *some* of them do fit though + sc = new SparkContext(clusterUrl, "test", new SparkConf) + // TODO: verify that only a subset of partitions fit in memory (SPARK-11078) val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER) assert(data.count() === 4000000) assert(data.count() === 4000000) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index d91b799ecf..4a0877d86f 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -247,11 +247,13 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC .setMaster("local") .set("spark.shuffle.spill.compress", shuffleSpillCompress.toString) .set("spark.shuffle.compress", shuffleCompress.toString) - .set("spark.shuffle.memoryFraction", "0.001") resetSparkContext() sc = new SparkContext(myConf) + val diskBlockManager = sc.env.blockManager.diskBlockManager try { - sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect() + assert(diskBlockManager.getAllFiles().isEmpty) + sc.parallelize(0 until 10).map(i => (i / 4, i)).groupByKey().collect() + assert(diskBlockManager.getAllFiles().nonEmpty) } catch { case e: Exception => val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," + diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala new file mode 100644 index 0000000000..36e4566310 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import java.util.concurrent.atomic.AtomicLong + +import org.mockito.Matchers.{any, anyLong} +import org.mockito.Mockito.{mock, when} +import org.mockito.invocation.InvocationOnMock +import org.mockito.stubbing.Answer + +import org.apache.spark.SparkFunSuite +import org.apache.spark.storage.MemoryStore + + +/** + * Helper trait for sharing code among [[MemoryManager]] tests. + */ +private[memory] trait MemoryManagerSuite extends SparkFunSuite { + + import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED + + // Note: Mockito's verify mechanism does not provide a way to reset method call counts + // without also resetting stubbed methods. Since our test code relies on the latter, + // we need to use our own variable to track invocations of `ensureFreeSpace`. + + /** + * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]] + * + * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test + * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]]. + */ + private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED) + + /** + * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed. + * + * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]] + * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its + * dependencies. + */ + protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { + val ms = mock(classOf[MemoryStore]) + when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0)) + when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1)) + mm.setMemoryStore(ms) + ms + } + + /** + * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments. + */ + private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = { + new Answer[Boolean] { + override def answer(invocation: InvocationOnMock): Boolean = { + val args = invocation.getArguments + require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " + + s"in ensureFreeSpace, found ${args.size}") + require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " + + s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}") + val numBytes = args(numBytesPos).asInstanceOf[Long] + mockEnsureFreeSpace(mm, numBytes) + } + } + } + + /** + * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory. + * + * This is a significant simplification of the real method, which actually drops existing + * blocks based on the size of each block. Instead, here we simply release as many bytes + * as needed to ensure the requested amount of free space. This allows us to set up the + * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in + * many other dependencies. + * + * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that + * records the number of bytes this is called with. This variable is expected to be cleared + * by the test code later through [[assertEnsureFreeSpaceCalled]]. + */ + private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized { + require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, + "bad test: ensure free space variable was not reset") + // Record the number of bytes we freed this call + ensureFreeSpaceCalled.set(numBytes) + if (numBytes <= mm.maxStorageMemory) { + def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed + val spaceToRelease = numBytes - freeMemory + if (spaceToRelease > 0) { + mm.releaseStorageMemory(spaceToRelease) + } + freeMemory >= numBytes + } else { + // We attempted to free more bytes than our max allowable memory + false + } + } + + /** + * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters. + */ + protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = { + assert(ensureFreeSpaceCalled.get() === numBytes, + s"expected ensure free space to be called with $numBytes") + ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED) + } + + /** + * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called. + */ + protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = { + assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED, + "ensure free space should not have been called!") + } +} + +private object MemoryManagerSuite { + private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L +} diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala index c436a8b5c9..6cae1f871e 100644 --- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala @@ -19,32 +19,44 @@ package org.apache.spark.memory import scala.collection.mutable.ArrayBuffer -import org.mockito.Mockito.{mock, reset, verify, when} -import org.mockito.Matchers.{any, eq => meq} +import org.mockito.Mockito.when +import org.apache.spark.SparkConf import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} -import org.apache.spark.{SparkConf, SparkFunSuite} -class StaticMemoryManagerSuite extends SparkFunSuite { +class StaticMemoryManagerSuite extends MemoryManagerSuite { private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4") + private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + + /** + * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies. + */ + private def makeThings( + maxExecutionMem: Long, + maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { + val mm = new StaticMemoryManager( + conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem) + val ms = makeMemoryStore(mm) + (mm, ms) + } test("basic execution memory") { val maxExecutionMem = 1000L val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue) assert(mm.executionMemoryUsed === 0L) - assert(mm.acquireExecutionMemory(10L) === 10L) + assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L) assert(mm.executionMemoryUsed === 10L) - assert(mm.acquireExecutionMemory(100L) === 100L) + assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L) // Acquire up to the max - assert(mm.acquireExecutionMemory(1000L) === 890L) + assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L) assert(mm.executionMemoryUsed === maxExecutionMem) - assert(mm.acquireExecutionMemory(1L) === 0L) + assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L) assert(mm.executionMemoryUsed === maxExecutionMem) mm.releaseExecutionMemory(800L) assert(mm.executionMemoryUsed === 200L) // Acquire after release - assert(mm.acquireExecutionMemory(1L) === 1L) + assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L) assert(mm.executionMemoryUsed === 201L) // Release beyond what was acquired mm.releaseExecutionMemory(maxExecutionMem) @@ -54,37 +66,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite { test("basic storage memory") { val maxStorageMem = 1000L val dummyBlock = TestBlockId("you can see the world you brought to live") - val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) // `ensureFreeSpace` should be called with the number of bytes requested - assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L) + assertEnsureFreeSpaceCalled(ms, 10L) assert(mm.storageMemoryUsed === 10L) - assert(evictedBlocks.isEmpty) assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) + assertEnsureFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 110L) - // Acquire up to the max, not granted - assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L) + // Acquire more than the max, not granted + assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L) assert(mm.storageMemoryUsed === 110L) - assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L) + // Acquire up to the max, requests after this are still granted due to LRU eviction + assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1000L) assert(mm.storageMemoryUsed === 1000L) - assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 1000L) mm.releaseStorageMemory(800L) assert(mm.storageMemoryUsed === 200L) // Acquire after release assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assertEnsureFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 201L) - mm.releaseStorageMemory() + mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L) + assertEnsureFreeSpaceCalled(ms, 1L) assert(mm.storageMemoryUsed === 1L) // Release beyond what was acquired mm.releaseStorageMemory(100L) @@ -95,18 +106,17 @@ class StaticMemoryManagerSuite extends SparkFunSuite { val maxExecutionMem = 200L val maxStorageMem = 1000L val dummyBlock = TestBlockId("ain't nobody love like you do") - val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem) // Only execution memory should increase - assert(mm.acquireExecutionMemory(100L) === 100L) + assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 100L) - assert(mm.acquireExecutionMemory(1000L) === 100L) + assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L) assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 200L) // Only storage memory should increase - assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L) + assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 50L) assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 200L) // Only execution memory should be released @@ -114,7 +124,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite { assert(mm.storageMemoryUsed === 50L) assert(mm.executionMemoryUsed === 67L) // Only storage memory should be released - mm.releaseStorageMemory() + mm.releaseAllStorageMemory() assert(mm.storageMemoryUsed === 0L) assert(mm.executionMemoryUsed === 67L) } @@ -122,51 +132,26 @@ class StaticMemoryManagerSuite extends SparkFunSuite { test("unroll memory") { val maxStorageMem = 1000L val dummyBlock = TestBlockId("lonely water") - val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)] val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem) - assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks)) - assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L) + assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 100L) assert(mm.storageMemoryUsed === 100L) mm.releaseUnrollMemory(40L) assert(mm.storageMemoryUsed === 60L) when(ms.currentUnrollMemory).thenReturn(60L) - assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks)) + assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks)) // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes. // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes. - assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L) + assertEnsureFreeSpaceCalled(ms, 340L) assert(mm.storageMemoryUsed === 560L) when(ms.currentUnrollMemory).thenReturn(560L) - assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks)) + assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks)) assert(mm.storageMemoryUsed === 560L) // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed - assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L) + assertEnsureFreeSpaceCalled(ms, 0L) // Release beyond what was acquired mm.releaseUnrollMemory(maxStorageMem) assert(mm.storageMemoryUsed === 0L) } - /** - * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies. - */ - private def makeThings( - maxExecutionMem: Long, - maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = { - val mm = new StaticMemoryManager( - conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem) - val ms = mock(classOf[MemoryStore]) - mm.setMemoryStore(ms) - (mm, ms) - } - - /** - * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters. - */ - private def assertEnsureFreeSpaceCalled( - ms: MemoryStore, - blockId: BlockId, - numBytes: Long): Unit = { - verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any()) - reset(ms) - } - } diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala new file mode 100644 index 0000000000..e7baa50dc2 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.memory + +import scala.collection.mutable.ArrayBuffer + +import org.scalatest.PrivateMethodTester + +import org.apache.spark.SparkConf +import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId} + + +class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester { + private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5") + private val dummyBlock = TestBlockId("--") + private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)] + + /** + * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies. + */ + private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = { + val mm = new UnifiedMemoryManager(conf, maxMemory) + val ms = makeMemoryStore(mm) + (mm, ms) + } + + private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = { + mm invokePrivate PrivateMethod[Long]('storageRegionSize)() + } + + test("storage region size") { + val maxMemory = 1000L + val (mm, _) = makeThings(maxMemory) + val storageFraction = conf.get("spark.memory.storageFraction").toDouble + val expectedStorageRegionSize = maxMemory * storageFraction + val actualStorageRegionSize = getStorageRegionSize(mm) + assert(expectedStorageRegionSize === actualStorageRegionSize) + } + + test("basic execution memory") { + val maxMemory = 1000L + val (mm, _) = makeThings(maxMemory) + assert(mm.executionMemoryUsed === 0L) + assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L) + assert(mm.executionMemoryUsed === 10L) + assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L) + // Acquire up to the max + assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L) + assert(mm.executionMemoryUsed === maxMemory) + assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L) + assert(mm.executionMemoryUsed === maxMemory) + mm.releaseExecutionMemory(800L) + assert(mm.executionMemoryUsed === 200L) + // Acquire after release + assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L) + assert(mm.executionMemoryUsed === 201L) + // Release beyond what was acquired + mm.releaseExecutionMemory(maxMemory) + assert(mm.executionMemoryUsed === 0L) + } + + test("basic storage memory") { + val maxMemory = 1000L + val (mm, ms) = makeThings(maxMemory) + assert(mm.storageMemoryUsed === 0L) + assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks)) + // `ensureFreeSpace` should be called with the number of bytes requested + assertEnsureFreeSpaceCalled(ms, 10L) + assert(mm.storageMemoryUsed === 10L) + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 100L) + assert(mm.storageMemoryUsed === 110L) + // Acquire more than the max, not granted + assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, maxMemory + 1L) + assert(mm.storageMemoryUsed === 110L) + // Acquire up to the max, requests after this are still granted due to LRU eviction + assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1000L) + assert(mm.storageMemoryUsed === 1000L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1L) + assert(mm.storageMemoryUsed === 1000L) + mm.releaseStorageMemory(800L) + assert(mm.storageMemoryUsed === 200L) + // Acquire after release + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1L) + assert(mm.storageMemoryUsed === 201L) + mm.releaseAllStorageMemory() + assert(mm.storageMemoryUsed === 0L) + assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 1L) + assert(mm.storageMemoryUsed === 1L) + // Release beyond what was acquired + mm.releaseStorageMemory(100L) + assert(mm.storageMemoryUsed === 0L) + } + + test("execution evicts storage") { + val maxMemory = 1000L + val (mm, ms) = makeThings(maxMemory) + // First, ensure the test classes are set up as expected + val expectedStorageRegionSize = 500L + val expectedExecutionRegionSize = 500L + val storageRegionSize = getStorageRegionSize(mm) + val executionRegionSize = maxMemory - expectedStorageRegionSize + require(storageRegionSize === expectedStorageRegionSize, + "bad test: storage region size is unexpected") + require(executionRegionSize === expectedExecutionRegionSize, + "bad test: storage region size is unexpected") + // Acquire enough storage memory to exceed the storage region + assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 750L) + assert(mm.executionMemoryUsed === 0L) + assert(mm.storageMemoryUsed === 750L) + require(mm.storageMemoryUsed > storageRegionSize, + s"bad test: storage memory used should exceed the storage region") + // Execution needs to request 250 bytes to evict storage memory + assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L) + assert(mm.executionMemoryUsed === 100L) + assert(mm.storageMemoryUsed === 750L) + assertEnsureFreeSpaceNotCalled(ms) + // Execution wants 200 bytes but only 150 are free, so storage is evicted + assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L) + assertEnsureFreeSpaceCalled(ms, 200L) + assert(mm.executionMemoryUsed === 300L) + mm.releaseAllStorageMemory() + require(mm.executionMemoryUsed < executionRegionSize, + s"bad test: execution memory used should be within the execution region") + require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released") + // Acquire some storage memory again, but this time keep it within the storage region + assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks)) + assertEnsureFreeSpaceCalled(ms, 400L) + require(mm.storageMemoryUsed < storageRegionSize, + s"bad test: storage memory used should be within the storage region") + // Execution cannot evict storage because the latter is within the storage fraction, + // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300 + assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L) + assert(mm.executionMemoryUsed === 600L) + assert(mm.storageMemoryUsed === 400L) + assertEnsureFreeSpaceNotCalled(ms) + } + + test("storage does not evict execution") { + val maxMemory = 1000L + val (mm, ms) = makeThings(maxMemory) + // First, ensure the test classes are set up as expected + val expectedStorageRegionSize = 500L + val expectedExecutionRegionSize = 500L + val storageRegionSize = getStorageRegionSize(mm) + val executionRegionSize = maxMemory - expectedStorageRegionSize + require(storageRegionSize === expectedStorageRegionSize, + "bad test: storage region size is unexpected") + require(executionRegionSize === expectedExecutionRegionSize, + "bad test: storage region size is unexpected") + // Acquire enough execution memory to exceed the execution region + assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L) + assert(mm.executionMemoryUsed === 800L) + assert(mm.storageMemoryUsed === 0L) + assertEnsureFreeSpaceNotCalled(ms) + require(mm.executionMemoryUsed > executionRegionSize, + s"bad test: execution memory used should exceed the execution region") + // Storage should not be able to evict execution + assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks)) + assert(mm.executionMemoryUsed === 800L) + assert(mm.storageMemoryUsed === 100L) + assertEnsureFreeSpaceCalled(ms, 100L) + assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks)) + assert(mm.executionMemoryUsed === 800L) + assert(mm.storageMemoryUsed === 100L) + assertEnsureFreeSpaceCalled(ms, 250L) + mm.releaseExecutionMemory(maxMemory) + mm.releaseStorageMemory(maxMemory) + // Acquire some execution memory again, but this time keep it within the execution region + assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L) + assert(mm.executionMemoryUsed === 200L) + assert(mm.storageMemoryUsed === 0L) + assertEnsureFreeSpaceNotCalled(ms) + require(mm.executionMemoryUsed < executionRegionSize, + s"bad test: execution memory used should be within the execution region") + // Storage should still not be able to evict execution + assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks)) + assert(mm.executionMemoryUsed === 200L) + assert(mm.storageMemoryUsed === 750L) + assertEnsureFreeSpaceCalled(ms, 750L) + assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks)) + assert(mm.executionMemoryUsed === 200L) + assert(mm.storageMemoryUsed === 750L) + assertEnsureFreeSpaceCalled(ms, 850L) + } + +} diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala index 6d45b1a101..5877aa042d 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala @@ -24,7 +24,8 @@ import org.mockito.Mockito._ import org.scalatest.concurrent.Timeouts import org.scalatest.time.SpanSugar._ -import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext} +import org.apache.spark.{SparkFunSuite, TaskContext} +import org.apache.spark.executor.TaskMetrics class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { @@ -37,7 +38,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts { try { val taskAttemptId = nextTaskAttemptId.getAndIncrement val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS) + val taskMetrics = new TaskMetrics when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId) + when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics) TaskContext.setTaskContext(mockTaskContext) body } finally { diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala index 6351539e91..259020a2dd 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala @@ -36,9 +36,6 @@ class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll { override def beforeAll() { conf.set("spark.shuffle.manager", "tungsten-sort") - // UnsafeShuffleManager requires at least 128 MB of memory per task in order to be able to sort - // shuffle records. - conf.set("spark.shuffle.memoryFraction", "0.5") } test("UnsafeShuffleManager properly cleans up files for shuffles that use the new shuffle path") { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index 12e9bafcc9..0a03c32c64 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark._ import org.apache.spark.io.CompressionCodec +// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078) + class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS private def createCombiner[T](i: T) = ArrayBuffer[T](i) @@ -243,7 +245,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { */ private def testSimpleSpilling(codec: Option[String] = None): Unit = { val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) // reduceByKey - should spill ~8 times @@ -291,7 +292,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions") { val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[String] @@ -340,7 +340,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("spilling with many hash collisions") { val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.memoryFraction", "0.0001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) @@ -365,7 +364,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions using the Int.MaxValue key") { val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] @@ -382,7 +380,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("spilling with null keys and values") { val conf = createSparkConf(loadDefaults = true) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val map = createExternalMap[Int] @@ -401,8 +398,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { test("external aggregation updates peak execution memory") { val conf = createSparkConf(loadDefaults = false) - .set("spark.shuffle.memoryFraction", "0.001") .set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter + .set("spark.testing.memory", (10 * 1024 * 1024).toString) sc = new SparkContext("local", "test", conf) // No spilling AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") { diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index bdb0f4d507..651c7eaa65 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -24,6 +24,8 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.serializer.{JavaSerializer, KryoSerializer} +// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078) + class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = { val conf = new SparkConf(loadDefaults) @@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { conf.set("spark.shuffle.sort.bypassMergeThreshold", "0") // Ensure that we actually have multiple batches per spill file conf.set("spark.shuffle.spill.batchSize", "10") + conf.set("spark.testing.memory", "2000000") conf } @@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def emptyDataStream(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def fewElementsPerPartition(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def emptyPartitionsWithSpilling(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.spill.initialMemoryThreshold", "512") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def testSpillingInLocalCluster(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) @@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local-cluster[2,1,1024]", "test", conf) @@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in sorter") { val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in sorter if there are errors") { val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in shuffle") { val conf = createSparkConf(false, false) - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("cleanup of intermediate files in shuffle with errors") { val conf = createSparkConf(false, false) - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager @@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def noPartialAggregationOrSorting(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWithoutSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWIthSpillNoOrdering(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def partialAggregationWithSpillWithOrdering(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def sortingWithoutAggregationNoSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } def sortingWithoutAggregationWithSpill(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.001") conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) @@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) @@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with many hash collisions") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.0001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _) @@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with hash collisions using the Int.MaxValue key") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i) @@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { test("spilling with null keys and values") { val conf = createSparkConf(true, false) - conf.set("spark.shuffle.memoryFraction", "0.001") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i) @@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { } private def sortWithoutBreakingSortingContracts(conf: SparkConf) { - conf.set("spark.shuffle.memoryFraction", "0.01") conf.set("spark.shuffle.manager", "sort") sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) diff --git a/docs/configuration.md b/docs/configuration.md index 154a3aee68..771d93be04 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -446,17 +446,6 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.shuffle.memoryFraction</code></td> - <td>0.2</td> - <td> - Fraction of Java heap to use for aggregation and cogroups during shuffles. - At any given time, the collective size of - all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will - begin to spill to disk. If spills are often, consider increasing this value at the expense of - <code>spark.storage.memoryFraction</code>. - </td> -</tr> -<tr> <td><code>spark.shuffle.service.enabled</code></td> <td>false</td> <td> @@ -712,6 +701,76 @@ Apart from these, the following properties are also available, and may be useful </tr> </table> +#### Memory Management +<table class="table"> +<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> +<tr> + <td><code>spark.memory.fraction</code></td> + <td>0.75</td> + <td> + Fraction of the heap space used for execution and storage. The lower this is, the more + frequently spills and cached data eviction occur. The purpose of this config is to set + aside memory for internal metadata, user data structures, and imprecise size estimation + in the case of sparse, unusually large records. + </td> +</tr> +<tr> + <td><code>spark.memory.storageFraction</code></td> + <td>0.5</td> + <td> + The size of the storage region within the space set aside by + <code>spark.memory.fraction</code>. This region is not statically reserved, but dynamically + allocated as cache requests come in. Cached data may be evicted only if total storage exceeds + this region. + </td> +</tr> +<tr> + <td><code>spark.memory.useLegacyMode</code></td> + <td>false</td> + <td> + Whether to enable the legacy memory management mode used in Spark 1.5 and before. + The legacy mode rigidly partitions the heap space into fixed-size regions, + potentially leading to excessive spilling if the application was not tuned. + The following deprecated memory fraction configurations are not read unless this is enabled: + <code>spark.shuffle.memoryFraction</code><br> + <code>spark.storage.memoryFraction</code><br> + <code>spark.storage.unrollFraction</code> + </td> +</tr> +<tr> + <td><code>spark.shuffle.memoryFraction</code></td> + <td>0.2</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of Java heap to use for aggregation and cogroups during shuffles. + At any given time, the collective size of + all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will + begin to spill to disk. If spills are often, consider increasing this value at the expense of + <code>spark.storage.memoryFraction</code>. + </td> +</tr> +<tr> + <td><code>spark.storage.memoryFraction</code></td> + <td>0.6</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" + generation of objects in the JVM, which by default is given 0.6 of the heap, but you can + increase it if you configure your own old generation size. + </td> +</tr> +<tr> + <td><code>spark.storage.unrollFraction</code></td> + <td>0.2</td> + <td> + (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled. + Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory. + This is dynamically allocated by dropping existing blocks when there is not enough free + storage space to unroll the new block in its entirety. + </td> +</tr> +</table> + #### Execution Behavior <table class="table"> <tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr> @@ -825,15 +884,6 @@ Apart from these, the following properties are also available, and may be useful data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td> </tr> <tr> - <td><code>spark.storage.memoryFraction</code></td> - <td>0.6</td> - <td> - Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old" - generation of objects in the JVM, which by default is given 0.6 of the heap, but you can - increase it if you configure your own old generation size. - </td> -</tr> -<tr> <td><code>spark.storage.memoryMapThreshold</code></td> <td>2m</td> <td> @@ -843,15 +893,6 @@ Apart from these, the following properties are also available, and may be useful </td> </tr> <tr> - <td><code>spark.storage.unrollFraction</code></td> - <td>0.2</td> - <td> - Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory. - This is dynamically allocated by dropping existing blocks when there is not enough free - storage space to unroll the new block in its entirety. - </td> -</tr> -<tr> <td><code>spark.externalBlockStore.blockManager</code></td> <td>org.apache.spark.storage.TachyonBlockManager</td> <td> diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala index ff65d7bdf8..835f52fa56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala @@ -57,7 +57,9 @@ class TestShuffleMemoryManager } private class GrantEverythingMemoryManager extends MemoryManager { - override def acquireExecutionMemory(numBytes: Long): Long = numBytes + override def acquireExecutionMemory( + numBytes: Long, + evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes override def acquireStorageMemory( blockId: BlockId, numBytes: Long, @@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends MemoryManager { blockId: BlockId, numBytes: Long, evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true - override def releaseExecutionMemory(numBytes: Long): Unit = { } - override def releaseStorageMemory(numBytes: Long): Unit = { } - override def releaseStorageMemory(): Unit = { } - override def releaseUnrollMemory(numBytes: Long): Unit = { } override def maxExecutionMemory: Long = Long.MaxValue override def maxStorageMemory: Long = Long.MaxValue - override def executionMemoryUsed: Long = Long.MaxValue - override def storageMemoryUsed: Long = Long.MaxValue } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index f7d48bc53e..75d1fced59 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext { val conf = new SparkConf() .set("spark.shuffle.spill.initialMemoryThreshold", "1024") .set("spark.shuffle.sort.bypassMergeThreshold", "0") - .set("spark.shuffle.memoryFraction", "0.0001") + .set("spark.testing.memory", "80000") sc = new SparkContext("local", "test", conf) outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "") |