aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala141
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala121
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala133
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala105
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala208
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala23
-rw-r--r--docs/configuration.md99
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala2
21 files changed, 840 insertions, 306 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b344b5e173..1a0ac3d017 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -418,16 +418,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
// Validate memory fractions
- val memoryKeys = Seq(
+ val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
+ val memoryKeys = Seq(
+ "spark.memory.fraction",
+ "spark.memory.storageFraction") ++
+ deprecatedMemoryKeys
for (key <- memoryKeys) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
- throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
+ throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
+ }
+ }
+
+ // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
+ val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
+ val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
+ if (!legacyMemoryManagement) {
+ val keyset = deprecatedMemoryKeys.toSet
+ val detected = settings.keys().asScala.filter(keyset.contains)
+ if (detected.nonEmpty) {
+ logWarning("Detected deprecated memory fraction settings: " +
+ detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
+ "memory management are unified. All memory fractions used in the old model are " +
+ "now deprecated and no longer read. If you wish to use the old memory management, " +
+ s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index df3d84a1f0..c329983451 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
@@ -335,7 +335,14 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
- val memoryManager = new StaticMemoryManager(conf)
+ val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
+ val memoryManager: MemoryManager =
+ if (useLegacyMemoryManager) {
+ new StaticMemoryManager(conf)
+ } else {
+ new UnifiedMemoryManager(conf)
+ }
+
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 4bf73b6969..7168ac5491 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import scala.collection.mutable
+import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
@@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one of these per JVM.
*/
-private[spark] abstract class MemoryManager {
+private[spark] abstract class MemoryManager extends Logging {
// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
@@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager {
_memoryStore
}
+ // Amount of execution/storage memory in use, accesses must be synchronized on `this`
+ protected var _executionMemoryUsed: Long = 0
+ protected var _storageMemoryUsed: Long = 0
+
/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
- def setMemoryStore(store: MemoryStore): Unit = {
+ final def setMemoryStore(store: MemoryStore): Unit = {
_memoryStore = store
}
/**
- * Acquire N bytes of memory for execution.
+ * Total available memory for execution, in bytes.
+ */
+ def maxExecutionMemory: Long
+
+ /**
+ * Total available memory for storage, in bytes.
+ */
+ def maxStorageMemory: Long
+
+ // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
+
+ /**
+ * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
- def acquireExecutionMemory(numBytes: Long): Long
+ def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
@@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager {
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
+ *
+ * This extra method allows subclasses to differentiate behavior between acquiring storage
+ * memory and acquiring unroll memory. For instance, the memory management model in Spark
+ * 1.5 and before places a limit on the amount of space that can be freed from unrolling.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ *
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, evictedBlocks)
+ }
/**
* Release N bytes of execution memory.
*/
- def releaseExecutionMemory(numBytes: Long): Unit
+ def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
+ if (numBytes > _executionMemoryUsed) {
+ logWarning(s"Attempted to release $numBytes bytes of execution " +
+ s"memory when we only have ${_executionMemoryUsed} bytes")
+ _executionMemoryUsed = 0
+ } else {
+ _executionMemoryUsed -= numBytes
+ }
+ }
/**
* Release N bytes of storage memory.
*/
- def releaseStorageMemory(numBytes: Long): Unit
+ def releaseStorageMemory(numBytes: Long): Unit = synchronized {
+ if (numBytes > _storageMemoryUsed) {
+ logWarning(s"Attempted to release $numBytes bytes of storage " +
+ s"memory when we only have ${_storageMemoryUsed} bytes")
+ _storageMemoryUsed = 0
+ } else {
+ _storageMemoryUsed -= numBytes
+ }
+ }
/**
* Release all storage memory acquired.
*/
- def releaseStorageMemory(): Unit
+ def releaseAllStorageMemory(): Unit = synchronized {
+ _storageMemoryUsed = 0
+ }
/**
* Release N bytes of unroll memory.
*/
- def releaseUnrollMemory(numBytes: Long): Unit
-
- /**
- * Total available memory for execution, in bytes.
- */
- def maxExecutionMemory: Long
-
- /**
- * Total available memory for storage, in bytes.
- */
- def maxStorageMemory: Long
+ def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+ releaseStorageMemory(numBytes)
+ }
/**
* Execution memory currently in use, in bytes.
*/
- def executionMemoryUsed: Long
+ final def executionMemoryUsed: Long = synchronized {
+ _executionMemoryUsed
+ }
/**
* Storage memory currently in use, in bytes.
*/
- def storageMemoryUsed: Long
+ final def storageMemoryUsed: Long = synchronized {
+ _storageMemoryUsed
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 150445edb9..fa44f37234 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.memory
import scala.collection.mutable
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
@@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
- extends MemoryManager with Logging {
-
- // Max number of bytes worth of blocks to evict when unrolling
- private val maxMemoryToEvictForUnroll: Long = {
- (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
- }
-
- // Amount of execution / storage memory in use
- // Accesses must be synchronized on `this`
- private var _executionMemoryUsed: Long = 0
- private var _storageMemoryUsed: Long = 0
+ extends MemoryManager {
def this(conf: SparkConf) {
this(
@@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager(
StaticMemoryManager.getMaxStorageMemory(conf))
}
+ // Max number of bytes worth of blocks to evict when unrolling
+ private val maxMemoryToEvictForUnroll: Long = {
+ (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
+ }
+
/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
- override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ assert(numBytes >= 0)
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
@@ -72,7 +70,7 @@ private[spark] class StaticMemoryManager(
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}
@@ -88,7 +86,7 @@ private[spark] class StaticMemoryManager(
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
@@ -108,71 +106,16 @@ private[spark] class StaticMemoryManager(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- // Note: Keep this outside synchronized block to avoid potential deadlocks!
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(numBytesToAcquire >= 0)
+ assert(numBytesToFree >= 0)
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- synchronized {
- assert(_storageMemoryUsed <= maxStorageMemory)
- val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
- if (enoughMemory) {
- _storageMemoryUsed += numBytesToAcquire
- }
- enoughMemory
- }
- }
-
- /**
- * Release N bytes of execution memory.
- */
- override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _executionMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of execution " +
- s"memory when we only have ${_executionMemoryUsed} bytes")
- _executionMemoryUsed = 0
- } else {
- _executionMemoryUsed -= numBytes
- }
- }
-
- /**
- * Release N bytes of storage memory.
- */
- override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _storageMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of storage " +
- s"memory when we only have ${_storageMemoryUsed} bytes")
- _storageMemoryUsed = 0
- } else {
- _storageMemoryUsed -= numBytes
+ assert(_storageMemoryUsed <= maxStorageMemory)
+ val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
+ if (enoughMemory) {
+ _storageMemoryUsed += numBytesToAcquire
}
- }
-
- /**
- * Release all storage memory acquired.
- */
- override def releaseStorageMemory(): Unit = synchronized {
- _storageMemoryUsed = 0
- }
-
- /**
- * Release N bytes of unroll memory.
- */
- override def releaseUnrollMemory(numBytes: Long): Unit = {
- releaseStorageMemory(numBytes)
- }
-
- /**
- * Amount of execution memory currently in use, in bytes.
- */
- override def executionMemoryUsed: Long = synchronized {
- _executionMemoryUsed
- }
-
- /**
- * Amount of storage memory currently in use, in bytes.
- */
- override def storageMemoryUsed: Long = synchronized {
- _storageMemoryUsed
+ enoughMemory
}
}
@@ -184,9 +127,10 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
- (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+ (systemMaxMemory * memoryFraction * safetyFraction).toLong
}
@@ -194,9 +138,10 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
- (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+ (systemMaxMemory * memoryFraction * safetyFraction).toLong
}
}
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
new file mode 100644
index 0000000000..5bf78d5b67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockStatus, BlockId}
+
+
+/**
+ * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
+ * either side can borrow memory from the other.
+ *
+ * The region shared between execution and storage is a fraction of the total heap space
+ * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
+ * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
+ * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
+ *
+ * Storage can borrow as much execution memory as is free until execution reclaims its space.
+ * When this happens, cached blocks will be evicted from memory until sufficient borrowed
+ * memory is released to satisfy the execution memory request.
+ *
+ * Similarly, execution can borrow as much storage memory as is free. However, execution
+ * memory is *never* evicted by storage due to the complexities involved in implementing this.
+ * The implication is that attempts to cache blocks may fail if execution has already eaten
+ * up most of the storage space, in which case the new blocks will be evicted immediately
+ * according to their respective storage levels.
+ */
+private[spark] class UnifiedMemoryManager(conf: SparkConf, maxMemory: Long) extends MemoryManager {
+
+ def this(conf: SparkConf) {
+ this(conf, UnifiedMemoryManager.getMaxMemory(conf))
+ }
+
+ /**
+ * Size of the storage region, in bytes.
+ *
+ * This region is not statically reserved; execution can borrow from it if necessary.
+ * Cached blocks can be evicted only if actual storage memory usage exceeds this region.
+ */
+ private val storageRegionSize: Long = {
+ (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
+ }
+
+ /**
+ * Total amount of memory, in bytes, not currently occupied by either execution or storage.
+ */
+ private def totalFreeMemory: Long = synchronized {
+ assert(_executionMemoryUsed <= maxMemory)
+ assert(_storageMemoryUsed <= maxMemory)
+ assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
+ maxMemory - _executionMemoryUsed - _storageMemoryUsed
+ }
+
+ /**
+ * Total available memory for execution, in bytes.
+ * In this model, this is equivalent to the amount of memory not occupied by storage.
+ */
+ override def maxExecutionMemory: Long = synchronized {
+ maxMemory - _storageMemoryUsed
+ }
+
+ /**
+ * Total available memory for storage, in bytes.
+ * In this model, this is equivalent to the amount of memory not occupied by execution.
+ */
+ override def maxStorageMemory: Long = synchronized {
+ maxMemory - _executionMemoryUsed
+ }
+
+ /**
+ * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ *
+ * This method evicts blocks only up to the amount of memory borrowed by storage.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return number of bytes successfully granted (<= N).
+ */
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ assert(numBytes >= 0)
+ val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize)
+ // If there is not enough free memory AND storage has borrowed some execution memory,
+ // then evict as much memory borrowed by storage as needed to grant this request
+ val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0
+ if (shouldEvictStorage) {
+ val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
+ memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+ }
+ val bytesToGrant = math.min(numBytes, totalFreeMemory)
+ _executionMemoryUsed += bytesToGrant
+ bytesToGrant
+ }
+
+ /**
+ * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return whether all N bytes were successfully granted.
+ */
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(numBytes >= 0)
+ memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
+ val enoughMemory = totalFreeMemory >= numBytes
+ if (enoughMemory) {
+ _storageMemoryUsed += numBytes
+ }
+ enoughMemory
+ }
+
+}
+
+private object UnifiedMemoryManager {
+
+ /**
+ * Return the total amount of memory shared between execution and storage, in bytes.
+ */
+ private def getMaxMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
+ (systemMaxMemory * memoryFraction).toLong
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index bb64bb3f35..aaf543ce92 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -18,11 +18,13 @@
package org.apache.spark.shuffle
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.spark._
import org.apache.spark.memory.{StaticMemoryManager, MemoryManager}
+import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.unsafe.array.ByteArrayMethods
/**
@@ -36,8 +38,8 @@ import org.apache.spark.unsafe.array.ByteArrayMethods
* If there are N tasks, it ensures that each tasks can acquire at least 1 / 2N of the memory
* before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
- * this set changes. This is all done by synchronizing access on "this" to mutate state and using
- * wait() and notifyAll() to signal changes.
+ * this set changes. This is all done by synchronizing access to `memoryManager` to mutate state
+ * and using wait() and notifyAll() to signal changes.
*
* Use `ShuffleMemoryManager.create()` factory method to create a new instance.
*
@@ -51,7 +53,6 @@ class ShuffleMemoryManager protected (
extends Logging {
private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
- private val maxMemory = memoryManager.maxExecutionMemory
private def currentTaskAttemptId(): Long = {
// In case this is called on the driver, return an invalid task attempt id.
@@ -65,7 +66,7 @@ class ShuffleMemoryManager protected (
* total memory pool (where N is the # of active tasks) before it is forced to spill. This can
* happen if the number of tasks increases but an older task had a lot of memory already.
*/
- def tryToAcquire(numBytes: Long): Long = synchronized {
+ def tryToAcquire(numBytes: Long): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
@@ -73,15 +74,18 @@ class ShuffleMemoryManager protected (
// of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
if (!taskMemory.contains(taskAttemptId)) {
taskMemory(taskAttemptId) = 0L
- notifyAll() // Will later cause waiting tasks to wake up and check numTasks again
+ // This will later cause waiting tasks to wake up and check numTasks again
+ memoryManager.notifyAll()
}
// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
+ // TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = taskMemory.keys.size
val curMem = taskMemory(taskAttemptId)
+ val maxMemory = memoryManager.maxExecutionMemory
val freeMemory = maxMemory - taskMemory.values.sum
// How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
@@ -99,7 +103,7 @@ class ShuffleMemoryManager protected (
} else {
logInfo(
s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free")
- wait()
+ memoryManager.wait()
}
} else {
return acquire(toGrant)
@@ -112,15 +116,23 @@ class ShuffleMemoryManager protected (
* Acquire N bytes of execution memory from the memory manager for the current task.
* @return number of bytes actually acquired (<= N).
*/
- private def acquire(numBytes: Long): Long = synchronized {
+ private def acquire(numBytes: Long): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
- val acquired = memoryManager.acquireExecutionMemory(numBytes)
+ val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val acquired = memoryManager.acquireExecutionMemory(numBytes, evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ // TODO: just do this in `acquireExecutionMemory` (SPARK-10985)
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
taskMemory(taskAttemptId) += acquired
acquired
}
/** Release numBytes bytes for the current task. */
- def release(numBytes: Long): Unit = synchronized {
+ def release(numBytes: Long): Unit = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
val curMem = taskMemory.getOrElse(taskAttemptId, 0L)
if (curMem < numBytes) {
@@ -129,20 +141,20 @@ class ShuffleMemoryManager protected (
}
taskMemory(taskAttemptId) -= numBytes
memoryManager.releaseExecutionMemory(numBytes)
- notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed
+ memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed
}
/** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */
- def releaseMemoryForThisTask(): Unit = synchronized {
+ def releaseMemoryForThisTask(): Unit = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
taskMemory.remove(taskAttemptId).foreach { numBytes =>
memoryManager.releaseExecutionMemory(numBytes)
}
- notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed
+ memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed
}
/** Returns the memory consumption, in bytes, for the current task */
- def getMemoryConsumptionForThisTask(): Long = synchronized {
+ def getMemoryConsumptionForThisTask(): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
taskMemory.getOrElse(taskAttemptId, 0L)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9f5bd2abbd..c374b93766 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -91,6 +91,10 @@ private[spark] class BlockManager(
}
memoryManager.setMemoryStore(memoryStore)
+ // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
+ // However, since we use this only for reporting and logging, what we actually want here is
+ // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need
+ // to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxStorageMemory
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 35c57b923c..4dbac388e0 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -37,15 +37,14 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
extends BlockStore(blockManager) {
+ // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
+ // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
+
private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
- private val maxMemory = memoryManager.maxStorageMemory
-
- // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
- private val accountingLock = new Object
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
- // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+ // All accesses of this map are assumed to have manually synchronized on `memoryManager`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
// Pending unroll memory refers to the intermediate memory occupied by a task
@@ -60,6 +59,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+ /** Total amount of memory available for storage, in bytes. */
+ private def maxMemory: Long = memoryManager.maxStorageMemory
+
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
@@ -75,7 +77,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* Amount of storage memory, in bytes, used for caching blocks.
* This does not include memory used for unrolling.
*/
- private def blocksMemoryUsed: Long = memoryUsed - currentUnrollMemory
+ private def blocksMemoryUsed: Long = memoryManager.synchronized {
+ memoryUsed - currentUnrollMemory
+ }
override def getSize(blockId: BlockId): Long = {
entries.synchronized {
@@ -208,7 +212,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def remove(blockId: BlockId): Boolean = {
+ override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val entry = entries.synchronized { entries.remove(blockId) }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
@@ -220,11 +224,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def clear() {
+ override def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
entries.clear()
}
- memoryManager.releaseStorageMemory()
+ unrollMemoryMap.clear()
+ pendingUnrollMemoryMap.clear()
+ memoryManager.releaseAllStorageMemory()
logInfo("MemoryStore cleared")
}
@@ -299,22 +305,23 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
} finally {
- // If we return an array, the values returned will later be cached in `tryToPut`.
- // In this case, we should release the memory after we cache the block there.
- // Otherwise, if we return an iterator, we release the memory reserved here
- // later when the task finishes.
+ // If we return an array, the values returned here will be cached in `tryToPut` later.
+ // In this case, we should release the memory only after we cache the block there.
if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
- // Here, we transfer memory from unroll to pending unroll because we expect to cache this
- // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in
- // order to avoid race conditions where another component steals the memory that we're
- // trying to transfer.
+ memoryManager.synchronized {
+ // Since we continue to hold onto the array until we actually cache it, we cannot
+ // release the unroll memory yet. Instead, we transfer it to pending unroll memory
+ // so `tryToPut` can further transfer it to normal storage memory later.
+ // TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
}
+ } else {
+ // Otherwise, if we return an iterator, we can only release the unroll memory when
+ // the task finishes since we don't know when the iterator will be consumed.
}
}
}
@@ -343,7 +350,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
* created to avoid OOM since it may be a big ByteBuffer.
*
- * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
+ * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
@@ -365,16 +372,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
- accountingLock.synchronized {
+ memoryManager.synchronized {
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
- //
- // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the
- // synchronization on `accountingLock` guarantees that the release of unroll memory and
- // acquisition of storage memory happens atomically. However, if storage memory is acquired
- // outside of MemoryStore or if unroll memory is counted as execution memory, then we will
- // have to revisit this assumption. See SPARK-10983 for more context.
+ // The synchronization on `memoryManager` here guarantees that the release and acquire
+ // happen atomically. This relies on the assumption that all memory acquisitions are
+ // synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory) {
@@ -402,33 +406,61 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
/**
+ * Try to free up a given amount of space by evicting existing blocks.
+ *
+ * @param space the amount of memory to free, in bytes
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def ensureFreeSpace(
+ space: Long,
+ droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ ensureFreeSpace(None, space, droppedBlocks)
+ }
+
+ /**
+ * Try to free up a given amount of space to store a block by evicting existing ones.
+ *
+ * @param space the amount of memory to free, in bytes
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def ensureFreeSpace(
+ blockId: BlockId,
+ space: Long,
+ droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ ensureFreeSpace(Some(blockId), space, droppedBlocks)
+ }
+
+ /**
* Try to free up a given amount of space to store a particular block, but can fail if
* either the block is bigger than our memory or it would require replacing another block
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * @param blockId the ID of the block we are freeing space for
+ * @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
- * @return whether there is enough free space.
+ * @return whether the requested free space is freed.
*/
- private[spark] def ensureFreeSpace(
- blockId: BlockId,
+ private def ensureFreeSpace(
+ blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- accountingLock.synchronized {
+ memoryManager.synchronized {
val freeMemory = maxMemory - memoryUsed
- val rddToAdd = getRddId(blockId)
+ val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
- logInfo(s"Ensuring $space bytes of free space for block $blockId " +
+ logInfo(s"Ensuring $space bytes of free space " +
+ blockId.map { id => s"for block $id" }.getOrElse("") +
s"(free: $freeMemory, max: $maxMemory)")
// Fail fast if the block simply won't fit
if (space > maxMemory) {
- logInfo(s"Will not store $blockId as the required space " +
- s"($space bytes) than our memory limit ($maxMemory bytes)")
+ logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
+ s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
return false
}
@@ -471,8 +503,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
true
} else {
- logInfo(s"Will not store $blockId as it would require dropping another block " +
- "from the same RDD")
+ blockId.foreach { id =>
+ logInfo(s"Will not store $id as it would require dropping another block " +
+ "from the same RDD")
+ }
false
}
}
@@ -495,8 +529,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
memory: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- accountingLock.synchronized {
- // Note: all acquisitions of unroll memory must be synchronized on `accountingLock`
+ memoryManager.synchronized {
val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
if (success) {
val taskAttemptId = currentTaskAttemptId()
@@ -512,7 +545,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*/
def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
+ memoryManager.synchronized {
if (unrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -531,7 +564,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*/
def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
+ memoryManager.synchronized {
if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -548,21 +581,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
/**
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
- def currentUnrollMemory: Long = accountingLock.synchronized {
+ def currentUnrollMemory: Long = memoryManager.synchronized {
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}
/**
* Return the amount of memory currently occupied for unrolling blocks by this task.
*/
- def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized {
+ def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
}
/**
* Return the number of tasks currently unrolling blocks.
*/
- private def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
+ private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
/**
* Log information about current memory usage.
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 29c5732f5a..6a96b5dc12 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -48,16 +48,6 @@ import org.apache.spark.executor.ShuffleWriteMetrics
* However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
* writes. This may lead to a performance regression compared to the normal case of using the
* non-spilling AppendOnlyMap.
- *
- * Two parameters control the memory threshold:
- *
- * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
- * these maps as a fraction of the executor's total memory. Since each concurrently running
- * task maintains one map, the actual threshold for each map is this quantity divided by the
- * number of running tasks.
- *
- * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
- * this threshold, in case map size estimation is not sufficiently accurate.
*/
@DeveloperApi
class ExternalAppendOnlyMap[K, V, C](
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 600c1403b0..34a4bb968e 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -213,11 +213,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("compute when only some partitions fit in memory") {
- val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
- sc = new SparkContext(clusterUrl, "test", conf)
- // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
- // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
- // to make sure that *some* of them do fit though
+ sc = new SparkContext(clusterUrl, "test", new SparkConf)
+ // TODO: verify that only a subset of partitions fit in memory (SPARK-11078)
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index d91b799ecf..4a0877d86f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -247,11 +247,13 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
.setMaster("local")
.set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
.set("spark.shuffle.compress", shuffleCompress.toString)
- .set("spark.shuffle.memoryFraction", "0.001")
resetSparkContext()
sc = new SparkContext(myConf)
+ val diskBlockManager = sc.env.blockManager.diskBlockManager
try {
- sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().isEmpty)
+ sc.parallelize(0 until 10).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().nonEmpty)
} catch {
case e: Exception =>
val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," +
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
new file mode 100644
index 0000000000..36e4566310
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.MemoryStore
+
+
+/**
+ * Helper trait for sharing code among [[MemoryManager]] tests.
+ */
+private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+
+ import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+
+ // Note: Mockito's verify mechanism does not provide a way to reset method call counts
+ // without also resetting stubbed methods. Since our test code relies on the latter,
+ // we need to use our own variable to track invocations of `ensureFreeSpace`.
+
+ /**
+ * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
+ *
+ * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
+ * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
+ */
+ private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+
+ /**
+ * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
+ *
+ * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
+ * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
+ * dependencies.
+ */
+ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
+ val ms = mock(classOf[MemoryStore])
+ when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
+ when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+ mm.setMemoryStore(ms)
+ ms
+ }
+
+ /**
+ * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
+ */
+ private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
+ new Answer[Boolean] {
+ override def answer(invocation: InvocationOnMock): Boolean = {
+ val args = invocation.getArguments
+ require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
+ s"in ensureFreeSpace, found ${args.size}")
+ require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
+ s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
+ val numBytes = args(numBytesPos).asInstanceOf[Long]
+ mockEnsureFreeSpace(mm, numBytes)
+ }
+ }
+ }
+
+ /**
+ * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually drops existing
+ * blocks based on the size of each block. Instead, here we simply release as many bytes
+ * as needed to ensure the requested amount of free space. This allows us to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is expected to be cleared
+ * by the test code later through [[assertEnsureFreeSpaceCalled]].
+ */
+ private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized {
+ require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "bad test: ensure free space variable was not reset")
+ // Record the number of bytes we freed this call
+ ensureFreeSpaceCalled.set(numBytes)
+ if (numBytes <= mm.maxStorageMemory) {
+ def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
+ val spaceToRelease = numBytes - freeMemory
+ if (spaceToRelease > 0) {
+ mm.releaseStorageMemory(spaceToRelease)
+ }
+ freeMemory >= numBytes
+ } else {
+ // We attempted to free more bytes than our max allowable memory
+ false
+ }
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
+ */
+ protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
+ assert(ensureFreeSpaceCalled.get() === numBytes,
+ s"expected ensure free space to be called with $numBytes")
+ ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+ */
+ protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
+ assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "ensure free space should not have been called!")
+ }
+}
+
+private object MemoryManagerSuite {
+ private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index c436a8b5c9..6cae1f871e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -19,32 +19,44 @@ package org.apache.spark.memory
import scala.collection.mutable.ArrayBuffer
-import org.mockito.Mockito.{mock, reset, verify, when}
-import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.when
+import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-import org.apache.spark.{SparkConf, SparkFunSuite}
-class StaticMemoryManagerSuite extends SparkFunSuite {
+class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(
+ maxExecutionMem: Long,
+ maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
+ val mm = new StaticMemoryManager(
+ conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
test("basic execution memory") {
val maxExecutionMem = 1000L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L) === 10L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L) === 0L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
mm.releaseExecutionMemory(800L)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L) === 1L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
mm.releaseExecutionMemory(maxExecutionMem)
@@ -54,37 +66,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("basic storage memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("you can see the world you brought to live")
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
// `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
+ assertEnsureFreeSpaceCalled(ms, 10L)
assert(mm.storageMemoryUsed === 10L)
- assert(evictedBlocks.isEmpty)
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 110L)
- // Acquire up to the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
assert(mm.storageMemoryUsed === 110L)
- assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
assert(mm.storageMemoryUsed === 1000L)
- assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 201L)
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -95,18 +106,17 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
val maxExecutionMem = 200L
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("ain't nobody love like you do")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L)
+ assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
@@ -114,7 +124,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 67L)
}
@@ -122,51 +132,26 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("unroll memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("lonely water")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks))
+ assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L)
+ assertEnsureFreeSpaceCalled(ms, 340L)
assert(mm.storageMemoryUsed === 560L)
when(ms.currentUnrollMemory).thenReturn(560L)
- assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks))
+ assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
assert(mm.storageMemoryUsed === 560L)
// We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L)
+ assertEnsureFreeSpaceCalled(ms, 0L)
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
}
- /**
- * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
- */
- private def makeThings(
- maxExecutionMem: Long,
- maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
- val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
- val ms = mock(classOf[MemoryStore])
- mm.setMemoryStore(ms)
- (mm, ms)
- }
-
- /**
- * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
- */
- private def assertEnsureFreeSpaceCalled(
- ms: MemoryStore,
- blockId: BlockId,
- numBytes: Long): Unit = {
- verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any())
- reset(ms)
- }
-
}
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
new file mode 100644
index 0000000000..e7baa50dc2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+
+
+class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
+ private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
+ private val dummyBlock = TestBlockId("--")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
+ val mm = new UnifiedMemoryManager(conf, maxMemory)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
+
+ private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
+ mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
+ }
+
+ test("storage region size") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ val storageFraction = conf.get("spark.memory.storageFraction").toDouble
+ val expectedStorageRegionSize = maxMemory * storageFraction
+ val actualStorageRegionSize = getStorageRegionSize(mm)
+ assert(expectedStorageRegionSize === actualStorageRegionSize)
+ }
+
+ test("basic execution memory") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.executionMemoryUsed === 10L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ // Acquire up to the max
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ mm.releaseExecutionMemory(800L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.executionMemoryUsed === 201L)
+ // Release beyond what was acquired
+ mm.releaseExecutionMemory(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ }
+
+ test("basic storage memory") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ // `ensureFreeSpace` should be called with the number of bytes requested
+ assertEnsureFreeSpaceCalled(ms, 10L)
+ assert(mm.storageMemoryUsed === 10L)
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
+ assert(mm.storageMemoryUsed === 1000L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1000L)
+ mm.releaseStorageMemory(800L)
+ assert(mm.storageMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 201L)
+ mm.releaseAllStorageMemory()
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1L)
+ // Release beyond what was acquired
+ mm.releaseStorageMemory(100L)
+ assert(mm.storageMemoryUsed === 0L)
+ }
+
+ test("execution evicts storage") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough storage memory to exceed the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.storageMemoryUsed === 750L)
+ require(mm.storageMemoryUsed > storageRegionSize,
+ s"bad test: storage memory used should exceed the storage region")
+ // Execution needs to request 250 bytes to evict storage memory
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.executionMemoryUsed === 100L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ // Execution wants 200 bytes but only 150 are free, so storage is evicted
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assertEnsureFreeSpaceCalled(ms, 200L)
+ assert(mm.executionMemoryUsed === 300L)
+ mm.releaseAllStorageMemory()
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
+ // Acquire some storage memory again, but this time keep it within the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 400L)
+ require(mm.storageMemoryUsed < storageRegionSize,
+ s"bad test: storage memory used should be within the storage region")
+ // Execution cannot evict storage because the latter is within the storage fraction,
+ // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
+ assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.executionMemoryUsed === 600L)
+ assert(mm.storageMemoryUsed === 400L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ }
+
+ test("storage does not evict execution") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough execution memory to exceed the execution region
+ assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed > executionRegionSize,
+ s"bad test: execution memory used should exceed the execution region")
+ // Storage should not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 250L)
+ mm.releaseExecutionMemory(maxMemory)
+ mm.releaseStorageMemory(maxMemory)
+ // Acquire some execution memory again, but this time keep it within the execution region
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ // Storage should still not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 850L)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index 6d45b1a101..5877aa042d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,8 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.executor.TaskMetrics
class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
@@ -37,7 +38,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
try {
val taskAttemptId = nextTaskAttemptId.getAndIncrement
val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
+ val taskMetrics = new TaskMetrics
when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
+ when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics)
TaskContext.setTaskContext(mockTaskContext)
body
} finally {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
index 6351539e91..259020a2dd 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
@@ -36,9 +36,6 @@ class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def beforeAll() {
conf.set("spark.shuffle.manager", "tungsten-sort")
- // UnsafeShuffleManager requires at least 128 MB of memory per task in order to be able to sort
- // shuffle records.
- conf.set("spark.shuffle.memoryFraction", "0.5")
}
test("UnsafeShuffleManager properly cleans up files for shuffles that use the new shuffle path") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 12e9bafcc9..0a03c32c64 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
@@ -243,7 +245,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
*/
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
@@ -291,7 +292,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
@@ -340,7 +340,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -365,7 +364,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -382,7 +380,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -401,8 +398,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("external aggregation updates peak execution memory") {
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.memoryFraction", "0.001")
.set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
+ .set("spark.testing.memory", (10 * 1024 * 1024).toString)
sc = new SparkContext("local", "test", conf)
// No spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index bdb0f4d507..651c7eaa65 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -24,6 +24,8 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
@@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
+ conf.set("spark.testing.memory", "2000000")
conf
}
@@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyDataStream(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def fewElementsPerPartition(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyPartitionsWithSpilling(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def testSpillingInLocalCluster(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
@@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter if there are errors") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle with errors") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def noPartialAggregationOrSorting(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithoutSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationNoSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationWithSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
@@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
private def sortWithoutBreakingSortingContracts(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 154a3aee68..771d93be04 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -446,17 +446,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.shuffle.memoryFraction</code></td>
- <td>0.2</td>
- <td>
- Fraction of Java heap to use for aggregation and cogroups during shuffles.
- At any given time, the collective size of
- all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
- begin to spill to disk. If spills are often, consider increasing this value at the expense of
- <code>spark.storage.memoryFraction</code>.
- </td>
-</tr>
-<tr>
<td><code>spark.shuffle.service.enabled</code></td>
<td>false</td>
<td>
@@ -712,6 +701,76 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>
+#### Memory Management
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+ <td><code>spark.memory.fraction</code></td>
+ <td>0.75</td>
+ <td>
+ Fraction of the heap space used for execution and storage. The lower this is, the more
+ frequently spills and cached data eviction occur. The purpose of this config is to set
+ aside memory for internal metadata, user data structures, and imprecise size estimation
+ in the case of sparse, unusually large records.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.memory.storageFraction</code></td>
+ <td>0.5</td>
+ <td>
+ T​he size of the storage region within the space set aside by
+ <code>s​park.memory.fraction</code>. This region is not statically reserved, but dynamically
+ allocated as cache requests come in. ​Cached data may be evicted only if total storage exceeds
+ this region.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.memory.useLegacyMode</code></td>
+ <td>false</td>
+ <td>
+ ​Whether to enable the legacy memory management mode used in Spark 1.5 and before.
+ The legacy mode rigidly partitions the heap space into fixed-size regions,
+ potentially leading to excessive spilling if the application was not tuned.
+ The following deprecated memory fraction configurations are not read unless this is enabled:
+ <code>spark.shuffle.memoryFraction</code><br>
+ <code>spark.storage.memoryFraction</code><br>
+ <code>spark.storage.unrollFraction</code>
+ </td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.memoryFraction</code></td>
+ <td>0.2</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of Java heap to use for aggregation and cogroups during shuffles.
+ At any given time, the collective size of
+ all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
+ begin to spill to disk. If spills are often, consider increasing this value at the expense of
+ <code>spark.storage.memoryFraction</code>.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.storage.memoryFraction</code></td>
+ <td>0.6</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
+ generation of objects in the JVM, which by default is given 0.6 of the heap, but you can
+ increase it if you configure your own old generation size.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.storage.unrollFraction</code></td>
+ <td>0.2</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+ This is dynamically allocated by dropping existing blocks when there is not enough free
+ storage space to unroll the new block in its entirety.
+ </td>
+</tr>
+</table>
+
#### Execution Behavior
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -825,15 +884,6 @@ Apart from these, the following properties are also available, and may be useful
data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td>
</tr>
<tr>
- <td><code>spark.storage.memoryFraction</code></td>
- <td>0.6</td>
- <td>
- Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
- generation of objects in the JVM, which by default is given 0.6 of the heap, but you can
- increase it if you configure your own old generation size.
- </td>
-</tr>
-<tr>
<td><code>spark.storage.memoryMapThreshold</code></td>
<td>2m</td>
<td>
@@ -843,15 +893,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.storage.unrollFraction</code></td>
- <td>0.2</td>
- <td>
- Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
- This is dynamically allocated by dropping existing blocks when there is not enough free
- storage space to unroll the new block in its entirety.
- </td>
-</tr>
-<tr>
<td><code>spark.externalBlockStore.blockManager</code></td>
<td>org.apache.spark.storage.TachyonBlockManager</td>
<td>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index ff65d7bdf8..835f52fa56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -57,7 +57,9 @@ class TestShuffleMemoryManager
}
private class GrantEverythingMemoryManager extends MemoryManager {
- override def acquireExecutionMemory(numBytes: Long): Long = numBytes
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
@@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends MemoryManager {
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(): Unit = { }
- override def releaseUnrollMemory(numBytes: Long): Unit = { }
override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
- override def executionMemoryUsed: Long = Long.MaxValue
- override def storageMemoryUsed: Long = Long.MaxValue
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index f7d48bc53e..75d1fced59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1024")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
- .set("spark.shuffle.memoryFraction", "0.0001")
+ .set("spark.testing.memory", "80000")
sc = new SparkContext("local", "test", conf)
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")