aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-13 13:49:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-13 13:49:59 -0700
commitb3ffac5178795f2d8e7908b3e77e8e89f50b5f6f (patch)
tree058d6885c0fffa8cfb496f5c4ed675f6a5345f75
parent2b574f52d7bf51b1fe2a73086a3735b633e9083f (diff)
downloadspark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.gz
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.tar.bz2
spark-b3ffac5178795f2d8e7908b3e77e8e89f50b5f6f.zip
[SPARK-10983] Unified memory manager
This patch unifies the memory management of the storage and execution regions such that either side can borrow memory from each other. When memory pressure arises, storage will be evicted in favor of execution. To avoid regressions in cases where storage is crucial, we dynamically allocate a fraction of space for storage that execution cannot evict. Several configurations are introduced: - **spark.memory.fraction (default 0.75)**: ​fraction of the heap space used for execution and storage. The lower this is, the more frequently spills and cached data eviction occur. The purpose of this config is to set aside memory for internal metadata, user data structures, and imprecise size estimation in the case of sparse, unusually large records. - **spark.memory.storageFraction (default 0.5)**: size of the storage region within the space set aside by `s​park.memory.fraction`. ​Cached data may only be evicted if total storage exceeds this region. - **spark.memory.useLegacyMode (default false)**: whether to use the memory management that existed in Spark 1.5 and before. This is mainly for backward compatibility. For a detailed description of the design, see [SPARK-10000](https://issues.apache.org/jira/browse/SPARK-10000). This patch builds on top of the `MemoryManager` interface introduced in #9000. Author: Andrew Or <andrew@databricks.com> Closes #9084 from andrewor14/unified-memory-manager.
-rw-r--r--core/src/main/scala/org/apache/spark/SparkConf.scala23
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/memory/MemoryManager.scala83
-rw-r--r--core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala105
-rw-r--r--core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala141
-rw-r--r--core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala38
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/storage/MemoryStore.scala121
-rw-r--r--core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala10
-rw-r--r--core/src/test/scala/org/apache/spark/DistributedSuite.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/ShuffleSuite.scala6
-rw-r--r--core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala133
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala105
-rw-r--r--core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala208
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala5
-rw-r--r--core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala9
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala23
-rw-r--r--docs/configuration.md99
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala10
-rw-r--r--sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala2
21 files changed, 840 insertions, 306 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala
index b344b5e173..1a0ac3d017 100644
--- a/core/src/main/scala/org/apache/spark/SparkConf.scala
+++ b/core/src/main/scala/org/apache/spark/SparkConf.scala
@@ -418,16 +418,35 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}
// Validate memory fractions
- val memoryKeys = Seq(
+ val deprecatedMemoryKeys = Seq(
"spark.storage.memoryFraction",
"spark.shuffle.memoryFraction",
"spark.shuffle.safetyFraction",
"spark.storage.unrollFraction",
"spark.storage.safetyFraction")
+ val memoryKeys = Seq(
+ "spark.memory.fraction",
+ "spark.memory.storageFraction") ++
+ deprecatedMemoryKeys
for (key <- memoryKeys) {
val value = getDouble(key, 0.5)
if (value > 1 || value < 0) {
- throw new IllegalArgumentException("$key should be between 0 and 1 (was '$value').")
+ throw new IllegalArgumentException(s"$key should be between 0 and 1 (was '$value').")
+ }
+ }
+
+ // Warn against deprecated memory fractions (unless legacy memory management mode is enabled)
+ val legacyMemoryManagementKey = "spark.memory.useLegacyMode"
+ val legacyMemoryManagement = getBoolean(legacyMemoryManagementKey, false)
+ if (!legacyMemoryManagement) {
+ val keyset = deprecatedMemoryKeys.toSet
+ val detected = settings.keys().asScala.filter(keyset.contains)
+ if (detected.nonEmpty) {
+ logWarning("Detected deprecated memory fraction settings: " +
+ detected.mkString("[", ", ", "]") + ". As of Spark 1.6, execution and storage " +
+ "memory management are unified. All memory fractions used in the old model are " +
+ "now deprecated and no longer read. If you wish to use the old memory management, " +
+ s"you may explicitly enable `$legacyMemoryManagementKey` (not recommended).")
}
}
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index df3d84a1f0..c329983451 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.memory.{MemoryManager, StaticMemoryManager}
+import org.apache.spark.memory.{MemoryManager, StaticMemoryManager, UnifiedMemoryManager}
import org.apache.spark.network.BlockTransferService
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.{RpcEndpointRef, RpcEndpoint, RpcEnv}
@@ -335,7 +335,14 @@ object SparkEnv extends Logging {
val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
- val memoryManager = new StaticMemoryManager(conf)
+ val useLegacyMemoryManager = conf.getBoolean("spark.memory.useLegacyMode", false)
+ val memoryManager: MemoryManager =
+ if (useLegacyMemoryManager) {
+ new StaticMemoryManager(conf)
+ } else {
+ new UnifiedMemoryManager(conf)
+ }
+
val shuffleMemoryManager = ShuffleMemoryManager.create(conf, memoryManager, numUsableCores)
val blockTransferService = new NettyBlockTransferService(conf, securityManager, numUsableCores)
diff --git a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
index 4bf73b6969..7168ac5491 100644
--- a/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/MemoryManager.scala
@@ -19,6 +19,7 @@ package org.apache.spark.memory
import scala.collection.mutable
+import org.apache.spark.Logging
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
@@ -29,7 +30,7 @@ import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore}
* sorts and aggregations, while storage memory refers to that used for caching and propagating
* internal data across the cluster. There exists one of these per JVM.
*/
-private[spark] abstract class MemoryManager {
+private[spark] abstract class MemoryManager extends Logging {
// The memory store used to evict cached blocks
private var _memoryStore: MemoryStore = _
@@ -40,19 +41,38 @@ private[spark] abstract class MemoryManager {
_memoryStore
}
+ // Amount of execution/storage memory in use, accesses must be synchronized on `this`
+ protected var _executionMemoryUsed: Long = 0
+ protected var _storageMemoryUsed: Long = 0
+
/**
* Set the [[MemoryStore]] used by this manager to evict cached blocks.
* This must be set after construction due to initialization ordering constraints.
*/
- def setMemoryStore(store: MemoryStore): Unit = {
+ final def setMemoryStore(store: MemoryStore): Unit = {
_memoryStore = store
}
/**
- * Acquire N bytes of memory for execution.
+ * Total available memory for execution, in bytes.
+ */
+ def maxExecutionMemory: Long
+
+ /**
+ * Total available memory for storage, in bytes.
+ */
+ def maxStorageMemory: Long
+
+ // TODO: avoid passing evicted blocks around to simplify method signatures (SPARK-10985)
+
+ /**
+ * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
* @return number of bytes successfully granted (<= N).
*/
- def acquireExecutionMemory(numBytes: Long): Long
+ def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long
/**
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
@@ -66,52 +86,73 @@ private[spark] abstract class MemoryManager {
/**
* Acquire N bytes of memory to unroll the given block, evicting existing ones if necessary.
+ *
+ * This extra method allows subclasses to differentiate behavior between acquiring storage
+ * memory and acquiring unroll memory. For instance, the memory management model in Spark
+ * 1.5 and before places a limit on the amount of space that can be freed from unrolling.
* Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ *
* @return whether all N bytes were successfully granted.
*/
def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ acquireStorageMemory(blockId, numBytes, evictedBlocks)
+ }
/**
* Release N bytes of execution memory.
*/
- def releaseExecutionMemory(numBytes: Long): Unit
+ def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
+ if (numBytes > _executionMemoryUsed) {
+ logWarning(s"Attempted to release $numBytes bytes of execution " +
+ s"memory when we only have ${_executionMemoryUsed} bytes")
+ _executionMemoryUsed = 0
+ } else {
+ _executionMemoryUsed -= numBytes
+ }
+ }
/**
* Release N bytes of storage memory.
*/
- def releaseStorageMemory(numBytes: Long): Unit
+ def releaseStorageMemory(numBytes: Long): Unit = synchronized {
+ if (numBytes > _storageMemoryUsed) {
+ logWarning(s"Attempted to release $numBytes bytes of storage " +
+ s"memory when we only have ${_storageMemoryUsed} bytes")
+ _storageMemoryUsed = 0
+ } else {
+ _storageMemoryUsed -= numBytes
+ }
+ }
/**
* Release all storage memory acquired.
*/
- def releaseStorageMemory(): Unit
+ def releaseAllStorageMemory(): Unit = synchronized {
+ _storageMemoryUsed = 0
+ }
/**
* Release N bytes of unroll memory.
*/
- def releaseUnrollMemory(numBytes: Long): Unit
-
- /**
- * Total available memory for execution, in bytes.
- */
- def maxExecutionMemory: Long
-
- /**
- * Total available memory for storage, in bytes.
- */
- def maxStorageMemory: Long
+ def releaseUnrollMemory(numBytes: Long): Unit = synchronized {
+ releaseStorageMemory(numBytes)
+ }
/**
* Execution memory currently in use, in bytes.
*/
- def executionMemoryUsed: Long
+ final def executionMemoryUsed: Long = synchronized {
+ _executionMemoryUsed
+ }
/**
* Storage memory currently in use, in bytes.
*/
- def storageMemoryUsed: Long
+ final def storageMemoryUsed: Long = synchronized {
+ _storageMemoryUsed
+ }
}
diff --git a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
index 150445edb9..fa44f37234 100644
--- a/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/memory/StaticMemoryManager.scala
@@ -19,7 +19,7 @@ package org.apache.spark.memory
import scala.collection.mutable
-import org.apache.spark.{Logging, SparkConf}
+import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus}
@@ -34,17 +34,7 @@ private[spark] class StaticMemoryManager(
conf: SparkConf,
override val maxExecutionMemory: Long,
override val maxStorageMemory: Long)
- extends MemoryManager with Logging {
-
- // Max number of bytes worth of blocks to evict when unrolling
- private val maxMemoryToEvictForUnroll: Long = {
- (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
- }
-
- // Amount of execution / storage memory in use
- // Accesses must be synchronized on `this`
- private var _executionMemoryUsed: Long = 0
- private var _storageMemoryUsed: Long = 0
+ extends MemoryManager {
def this(conf: SparkConf) {
this(
@@ -53,11 +43,19 @@ private[spark] class StaticMemoryManager(
StaticMemoryManager.getMaxStorageMemory(conf))
}
+ // Max number of bytes worth of blocks to evict when unrolling
+ private val maxMemoryToEvictForUnroll: Long = {
+ (maxStorageMemory * conf.getDouble("spark.storage.unrollFraction", 0.2)).toLong
+ }
+
/**
* Acquire N bytes of memory for execution.
* @return number of bytes successfully granted (<= N).
*/
- override def acquireExecutionMemory(numBytes: Long): Long = synchronized {
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ assert(numBytes >= 0)
assert(_executionMemoryUsed <= maxExecutionMemory)
val bytesToGrant = math.min(numBytes, maxExecutionMemory - _executionMemoryUsed)
_executionMemoryUsed += bytesToGrant
@@ -72,7 +70,7 @@ private[spark] class StaticMemoryManager(
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
acquireStorageMemory(blockId, numBytes, numBytes, evictedBlocks)
}
@@ -88,7 +86,7 @@ private[spark] class StaticMemoryManager(
override def acquireUnrollMemory(
blockId: BlockId,
numBytes: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
val currentUnrollMemory = memoryStore.currentUnrollMemory
val maxNumBytesToFree = math.max(0, maxMemoryToEvictForUnroll - currentUnrollMemory)
val numBytesToFree = math.min(numBytes, maxNumBytesToFree)
@@ -108,71 +106,16 @@ private[spark] class StaticMemoryManager(
blockId: BlockId,
numBytesToAcquire: Long,
numBytesToFree: Long,
- evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- // Note: Keep this outside synchronized block to avoid potential deadlocks!
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(numBytesToAcquire >= 0)
+ assert(numBytesToFree >= 0)
memoryStore.ensureFreeSpace(blockId, numBytesToFree, evictedBlocks)
- synchronized {
- assert(_storageMemoryUsed <= maxStorageMemory)
- val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
- if (enoughMemory) {
- _storageMemoryUsed += numBytesToAcquire
- }
- enoughMemory
- }
- }
-
- /**
- * Release N bytes of execution memory.
- */
- override def releaseExecutionMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _executionMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of execution " +
- s"memory when we only have ${_executionMemoryUsed} bytes")
- _executionMemoryUsed = 0
- } else {
- _executionMemoryUsed -= numBytes
- }
- }
-
- /**
- * Release N bytes of storage memory.
- */
- override def releaseStorageMemory(numBytes: Long): Unit = synchronized {
- if (numBytes > _storageMemoryUsed) {
- logWarning(s"Attempted to release $numBytes bytes of storage " +
- s"memory when we only have ${_storageMemoryUsed} bytes")
- _storageMemoryUsed = 0
- } else {
- _storageMemoryUsed -= numBytes
+ assert(_storageMemoryUsed <= maxStorageMemory)
+ val enoughMemory = _storageMemoryUsed + numBytesToAcquire <= maxStorageMemory
+ if (enoughMemory) {
+ _storageMemoryUsed += numBytesToAcquire
}
- }
-
- /**
- * Release all storage memory acquired.
- */
- override def releaseStorageMemory(): Unit = synchronized {
- _storageMemoryUsed = 0
- }
-
- /**
- * Release N bytes of unroll memory.
- */
- override def releaseUnrollMemory(numBytes: Long): Unit = {
- releaseStorageMemory(numBytes)
- }
-
- /**
- * Amount of execution memory currently in use, in bytes.
- */
- override def executionMemoryUsed: Long = synchronized {
- _executionMemoryUsed
- }
-
- /**
- * Amount of storage memory currently in use, in bytes.
- */
- override def storageMemoryUsed: Long = synchronized {
- _storageMemoryUsed
+ enoughMemory
}
}
@@ -184,9 +127,10 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the storage region, in bytes.
*/
private def getMaxStorageMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.storage.memoryFraction", 0.6)
val safetyFraction = conf.getDouble("spark.storage.safetyFraction", 0.9)
- (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+ (systemMaxMemory * memoryFraction * safetyFraction).toLong
}
@@ -194,9 +138,10 @@ private[spark] object StaticMemoryManager {
* Return the total amount of memory available for the execution region, in bytes.
*/
private def getMaxExecutionMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
val memoryFraction = conf.getDouble("spark.shuffle.memoryFraction", 0.2)
val safetyFraction = conf.getDouble("spark.shuffle.safetyFraction", 0.8)
- (Runtime.getRuntime.maxMemory * memoryFraction * safetyFraction).toLong
+ (systemMaxMemory * memoryFraction * safetyFraction).toLong
}
}
diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
new file mode 100644
index 0000000000..5bf78d5b67
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockStatus, BlockId}
+
+
+/**
+ * A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
+ * either side can borrow memory from the other.
+ *
+ * The region shared between execution and storage is a fraction of the total heap space
+ * configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
+ * within this space is further determined by `spark.memory.storageFraction` (default 0.5).
+ * This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
+ *
+ * Storage can borrow as much execution memory as is free until execution reclaims its space.
+ * When this happens, cached blocks will be evicted from memory until sufficient borrowed
+ * memory is released to satisfy the execution memory request.
+ *
+ * Similarly, execution can borrow as much storage memory as is free. However, execution
+ * memory is *never* evicted by storage due to the complexities involved in implementing this.
+ * The implication is that attempts to cache blocks may fail if execution has already eaten
+ * up most of the storage space, in which case the new blocks will be evicted immediately
+ * according to their respective storage levels.
+ */
+private[spark] class UnifiedMemoryManager(conf: SparkConf, maxMemory: Long) extends MemoryManager {
+
+ def this(conf: SparkConf) {
+ this(conf, UnifiedMemoryManager.getMaxMemory(conf))
+ }
+
+ /**
+ * Size of the storage region, in bytes.
+ *
+ * This region is not statically reserved; execution can borrow from it if necessary.
+ * Cached blocks can be evicted only if actual storage memory usage exceeds this region.
+ */
+ private val storageRegionSize: Long = {
+ (maxMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
+ }
+
+ /**
+ * Total amount of memory, in bytes, not currently occupied by either execution or storage.
+ */
+ private def totalFreeMemory: Long = synchronized {
+ assert(_executionMemoryUsed <= maxMemory)
+ assert(_storageMemoryUsed <= maxMemory)
+ assert(_executionMemoryUsed + _storageMemoryUsed <= maxMemory)
+ maxMemory - _executionMemoryUsed - _storageMemoryUsed
+ }
+
+ /**
+ * Total available memory for execution, in bytes.
+ * In this model, this is equivalent to the amount of memory not occupied by storage.
+ */
+ override def maxExecutionMemory: Long = synchronized {
+ maxMemory - _storageMemoryUsed
+ }
+
+ /**
+ * Total available memory for storage, in bytes.
+ * In this model, this is equivalent to the amount of memory not occupied by execution.
+ */
+ override def maxStorageMemory: Long = synchronized {
+ maxMemory - _executionMemoryUsed
+ }
+
+ /**
+ * Acquire N bytes of memory for execution, evicting cached blocks if necessary.
+ *
+ * This method evicts blocks only up to the amount of memory borrowed by storage.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return number of bytes successfully granted (<= N).
+ */
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = synchronized {
+ assert(numBytes >= 0)
+ val memoryBorrowedByStorage = math.max(0, _storageMemoryUsed - storageRegionSize)
+ // If there is not enough free memory AND storage has borrowed some execution memory,
+ // then evict as much memory borrowed by storage as needed to grant this request
+ val shouldEvictStorage = totalFreeMemory < numBytes && memoryBorrowedByStorage > 0
+ if (shouldEvictStorage) {
+ val spaceToEnsure = math.min(numBytes, memoryBorrowedByStorage)
+ memoryStore.ensureFreeSpace(spaceToEnsure, evictedBlocks)
+ }
+ val bytesToGrant = math.min(numBytes, totalFreeMemory)
+ _executionMemoryUsed += bytesToGrant
+ bytesToGrant
+ }
+
+ /**
+ * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
+ * Blocks evicted in the process, if any, are added to `evictedBlocks`.
+ * @return whether all N bytes were successfully granted.
+ */
+ override def acquireStorageMemory(
+ blockId: BlockId,
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = synchronized {
+ assert(numBytes >= 0)
+ memoryStore.ensureFreeSpace(blockId, numBytes, evictedBlocks)
+ val enoughMemory = totalFreeMemory >= numBytes
+ if (enoughMemory) {
+ _storageMemoryUsed += numBytes
+ }
+ enoughMemory
+ }
+
+}
+
+private object UnifiedMemoryManager {
+
+ /**
+ * Return the total amount of memory shared between execution and storage, in bytes.
+ */
+ private def getMaxMemory(conf: SparkConf): Long = {
+ val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
+ val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
+ (systemMaxMemory * memoryFraction).toLong
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
index bb64bb3f35..aaf543ce92 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleMemoryManager.scala
@@ -18,11 +18,13 @@
package org.apache.spark.shuffle
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import com.google.common.annotations.VisibleForTesting
import org.apache.spark._
import org.apache.spark.memory.{StaticMemoryManager, MemoryManager}
+import org.apache.spark.storage.{BlockId, BlockStatus}
import org.apache.spark.unsafe.array.ByteArrayMethods
/**
@@ -36,8 +38,8 @@ import org.apache.spark.unsafe.array.ByteArrayMethods
* If there are N tasks, it ensures that each tasks can acquire at least 1 / 2N of the memory
* before it has to spill, and at most 1 / N. Because N varies dynamically, we keep track of the
* set of active tasks and redo the calculations of 1 / 2N and 1 / N in waiting tasks whenever
- * this set changes. This is all done by synchronizing access on "this" to mutate state and using
- * wait() and notifyAll() to signal changes.
+ * this set changes. This is all done by synchronizing access to `memoryManager` to mutate state
+ * and using wait() and notifyAll() to signal changes.
*
* Use `ShuffleMemoryManager.create()` factory method to create a new instance.
*
@@ -51,7 +53,6 @@ class ShuffleMemoryManager protected (
extends Logging {
private val taskMemory = new mutable.HashMap[Long, Long]() // taskAttemptId -> memory bytes
- private val maxMemory = memoryManager.maxExecutionMemory
private def currentTaskAttemptId(): Long = {
// In case this is called on the driver, return an invalid task attempt id.
@@ -65,7 +66,7 @@ class ShuffleMemoryManager protected (
* total memory pool (where N is the # of active tasks) before it is forced to spill. This can
* happen if the number of tasks increases but an older task had a lot of memory already.
*/
- def tryToAcquire(numBytes: Long): Long = synchronized {
+ def tryToAcquire(numBytes: Long): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
assert(numBytes > 0, "invalid number of bytes requested: " + numBytes)
@@ -73,15 +74,18 @@ class ShuffleMemoryManager protected (
// of active tasks, to let other tasks ramp down their memory in calls to tryToAcquire
if (!taskMemory.contains(taskAttemptId)) {
taskMemory(taskAttemptId) = 0L
- notifyAll() // Will later cause waiting tasks to wake up and check numTasks again
+ // This will later cause waiting tasks to wake up and check numTasks again
+ memoryManager.notifyAll()
}
// Keep looping until we're either sure that we don't want to grant this request (because this
// task would have more than 1 / numActiveTasks of the memory) or we have enough free
// memory to give it (we always let each task get at least 1 / (2 * numActiveTasks)).
+ // TODO: simplify this to limit each task to its own slot
while (true) {
val numActiveTasks = taskMemory.keys.size
val curMem = taskMemory(taskAttemptId)
+ val maxMemory = memoryManager.maxExecutionMemory
val freeMemory = maxMemory - taskMemory.values.sum
// How much we can grant this task; don't let it grow to more than 1 / numActiveTasks;
@@ -99,7 +103,7 @@ class ShuffleMemoryManager protected (
} else {
logInfo(
s"TID $taskAttemptId waiting for at least 1/2N of shuffle memory pool to be free")
- wait()
+ memoryManager.wait()
}
} else {
return acquire(toGrant)
@@ -112,15 +116,23 @@ class ShuffleMemoryManager protected (
* Acquire N bytes of execution memory from the memory manager for the current task.
* @return number of bytes actually acquired (<= N).
*/
- private def acquire(numBytes: Long): Long = synchronized {
+ private def acquire(numBytes: Long): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
- val acquired = memoryManager.acquireExecutionMemory(numBytes)
+ val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val acquired = memoryManager.acquireExecutionMemory(numBytes, evictedBlocks)
+ // Register evicted blocks, if any, with the active task metrics
+ // TODO: just do this in `acquireExecutionMemory` (SPARK-10985)
+ Option(TaskContext.get()).foreach { tc =>
+ val metrics = tc.taskMetrics()
+ val lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())
+ metrics.updatedBlocks = Some(lastUpdatedBlocks ++ evictedBlocks.toSeq)
+ }
taskMemory(taskAttemptId) += acquired
acquired
}
/** Release numBytes bytes for the current task. */
- def release(numBytes: Long): Unit = synchronized {
+ def release(numBytes: Long): Unit = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
val curMem = taskMemory.getOrElse(taskAttemptId, 0L)
if (curMem < numBytes) {
@@ -129,20 +141,20 @@ class ShuffleMemoryManager protected (
}
taskMemory(taskAttemptId) -= numBytes
memoryManager.releaseExecutionMemory(numBytes)
- notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed
+ memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed
}
/** Release all memory for the current task and mark it as inactive (e.g. when a task ends). */
- def releaseMemoryForThisTask(): Unit = synchronized {
+ def releaseMemoryForThisTask(): Unit = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
taskMemory.remove(taskAttemptId).foreach { numBytes =>
memoryManager.releaseExecutionMemory(numBytes)
}
- notifyAll() // Notify waiters who locked "this" in tryToAcquire that memory has been freed
+ memoryManager.notifyAll() // Notify waiters in tryToAcquire that memory has been freed
}
/** Returns the memory consumption, in bytes, for the current task */
- def getMemoryConsumptionForThisTask(): Long = synchronized {
+ def getMemoryConsumptionForThisTask(): Long = memoryManager.synchronized {
val taskAttemptId = currentTaskAttemptId()
taskMemory.getOrElse(taskAttemptId, 0L)
}
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index 9f5bd2abbd..c374b93766 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -91,6 +91,10 @@ private[spark] class BlockManager(
}
memoryManager.setMemoryStore(memoryStore)
+ // Note: depending on the memory manager, `maxStorageMemory` may actually vary over time.
+ // However, since we use this only for reporting and logging, what we actually want here is
+ // the absolute maximum value that `maxStorageMemory` can ever possibly reach. We may need
+ // to revisit whether reporting this value as the "max" is intuitive to the user.
private val maxMemory = memoryManager.maxStorageMemory
private[spark]
diff --git a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
index 35c57b923c..4dbac388e0 100644
--- a/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
+++ b/core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
@@ -37,15 +37,14 @@ private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: MemoryManager)
extends BlockStore(blockManager) {
+ // Note: all changes to memory allocations, notably putting blocks, evicting blocks, and
+ // acquiring or releasing unroll memory, must be synchronized on `memoryManager`!
+
private val conf = blockManager.conf
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)
- private val maxMemory = memoryManager.maxStorageMemory
-
- // Ensure only one thread is putting, and if necessary, dropping blocks at any given time
- private val accountingLock = new Object
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
- // All accesses of this map are assumed to have manually synchronized on `accountingLock`
+ // All accesses of this map are assumed to have manually synchronized on `memoryManager`
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
// Same as `unrollMemoryMap`, but for pending unroll memory as defined below.
// Pending unroll memory refers to the intermediate memory occupied by a task
@@ -60,6 +59,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
private val unrollMemoryThreshold: Long =
conf.getLong("spark.storage.unrollMemoryThreshold", 1024 * 1024)
+ /** Total amount of memory available for storage, in bytes. */
+ private def maxMemory: Long = memoryManager.maxStorageMemory
+
if (maxMemory < unrollMemoryThreshold) {
logWarning(s"Max memory ${Utils.bytesToString(maxMemory)} is less than the initial memory " +
s"threshold ${Utils.bytesToString(unrollMemoryThreshold)} needed to store a block in " +
@@ -75,7 +77,9 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* Amount of storage memory, in bytes, used for caching blocks.
* This does not include memory used for unrolling.
*/
- private def blocksMemoryUsed: Long = memoryUsed - currentUnrollMemory
+ private def blocksMemoryUsed: Long = memoryManager.synchronized {
+ memoryUsed - currentUnrollMemory
+ }
override def getSize(blockId: BlockId): Long = {
entries.synchronized {
@@ -208,7 +212,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def remove(blockId: BlockId): Boolean = {
+ override def remove(blockId: BlockId): Boolean = memoryManager.synchronized {
val entry = entries.synchronized { entries.remove(blockId) }
if (entry != null) {
memoryManager.releaseStorageMemory(entry.size)
@@ -220,11 +224,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
}
- override def clear() {
+ override def clear(): Unit = memoryManager.synchronized {
entries.synchronized {
entries.clear()
}
- memoryManager.releaseStorageMemory()
+ unrollMemoryMap.clear()
+ pendingUnrollMemoryMap.clear()
+ memoryManager.releaseAllStorageMemory()
logInfo("MemoryStore cleared")
}
@@ -299,22 +305,23 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
} finally {
- // If we return an array, the values returned will later be cached in `tryToPut`.
- // In this case, we should release the memory after we cache the block there.
- // Otherwise, if we return an iterator, we release the memory reserved here
- // later when the task finishes.
+ // If we return an array, the values returned here will be cached in `tryToPut` later.
+ // In this case, we should release the memory only after we cache the block there.
if (keepUnrolling) {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
- // Here, we transfer memory from unroll to pending unroll because we expect to cache this
- // block in `tryToPut`. We do not release and re-acquire memory from the MemoryManager in
- // order to avoid race conditions where another component steals the memory that we're
- // trying to transfer.
+ memoryManager.synchronized {
+ // Since we continue to hold onto the array until we actually cache it, we cannot
+ // release the unroll memory yet. Instead, we transfer it to pending unroll memory
+ // so `tryToPut` can further transfer it to normal storage memory later.
+ // TODO: we can probably express this without pending unroll memory (SPARK-10907)
val amountToTransferToPending = currentUnrollMemoryForThisTask - previousMemoryReserved
unrollMemoryMap(taskAttemptId) -= amountToTransferToPending
pendingUnrollMemoryMap(taskAttemptId) =
pendingUnrollMemoryMap.getOrElse(taskAttemptId, 0L) + amountToTransferToPending
}
+ } else {
+ // Otherwise, if we return an iterator, we can only release the unroll memory when
+ // the task finishes since we don't know when the iterator will be consumed.
}
}
}
@@ -343,7 +350,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* `value` will be lazily created. If it cannot be put into MemoryStore or disk, `value` won't be
* created to avoid OOM since it may be a big ByteBuffer.
*
- * Synchronize on `accountingLock` to ensure that all the put requests and its associated block
+ * Synchronize on `memoryManager` to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
@@ -365,16 +372,13 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
* for freeing up more space for another block that needs to be put. Only then the actually
* dropping of blocks (and writing to disk if necessary) can proceed in parallel. */
- accountingLock.synchronized {
+ memoryManager.synchronized {
// Note: if we have previously unrolled this block successfully, then pending unroll
// memory should be non-zero. This is the amount that we already reserved during the
// unrolling process. In this case, we can just reuse this space to cache our block.
- //
- // Note: the StaticMemoryManager counts unroll memory as storage memory. Here, the
- // synchronization on `accountingLock` guarantees that the release of unroll memory and
- // acquisition of storage memory happens atomically. However, if storage memory is acquired
- // outside of MemoryStore or if unroll memory is counted as execution memory, then we will
- // have to revisit this assumption. See SPARK-10983 for more context.
+ // The synchronization on `memoryManager` here guarantees that the release and acquire
+ // happen atomically. This relies on the assumption that all memory acquisitions are
+ // synchronized on the same lock.
releasePendingUnrollMemoryForThisTask()
val enoughMemory = memoryManager.acquireStorageMemory(blockId, size, droppedBlocks)
if (enoughMemory) {
@@ -402,33 +406,61 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
/**
+ * Try to free up a given amount of space by evicting existing blocks.
+ *
+ * @param space the amount of memory to free, in bytes
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def ensureFreeSpace(
+ space: Long,
+ droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ ensureFreeSpace(None, space, droppedBlocks)
+ }
+
+ /**
+ * Try to free up a given amount of space to store a block by evicting existing ones.
+ *
+ * @param space the amount of memory to free, in bytes
+ * @param droppedBlocks a holder for blocks evicted in the process
+ * @return whether the requested free space is freed.
+ */
+ private[spark] def ensureFreeSpace(
+ blockId: BlockId,
+ space: Long,
+ droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
+ ensureFreeSpace(Some(blockId), space, droppedBlocks)
+ }
+
+ /**
* Try to free up a given amount of space to store a particular block, but can fail if
* either the block is bigger than our memory or it would require replacing another block
* from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
- * @param blockId the ID of the block we are freeing space for
+ * @param blockId the ID of the block we are freeing space for, if any
* @param space the size of this block
* @param droppedBlocks a holder for blocks evicted in the process
- * @return whether there is enough free space.
+ * @return whether the requested free space is freed.
*/
- private[spark] def ensureFreeSpace(
- blockId: BlockId,
+ private def ensureFreeSpace(
+ blockId: Option[BlockId],
space: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- accountingLock.synchronized {
+ memoryManager.synchronized {
val freeMemory = maxMemory - memoryUsed
- val rddToAdd = getRddId(blockId)
+ val rddToAdd = blockId.flatMap(getRddId)
val selectedBlocks = new ArrayBuffer[BlockId]
var selectedMemory = 0L
- logInfo(s"Ensuring $space bytes of free space for block $blockId " +
+ logInfo(s"Ensuring $space bytes of free space " +
+ blockId.map { id => s"for block $id" }.getOrElse("") +
s"(free: $freeMemory, max: $maxMemory)")
// Fail fast if the block simply won't fit
if (space > maxMemory) {
- logInfo(s"Will not store $blockId as the required space " +
- s"($space bytes) than our memory limit ($maxMemory bytes)")
+ logInfo("Will not " + blockId.map { id => s"store $id" }.getOrElse("free memory") +
+ s" as the required space ($space bytes) exceeds our memory limit ($maxMemory bytes)")
return false
}
@@ -471,8 +503,10 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
}
true
} else {
- logInfo(s"Will not store $blockId as it would require dropping another block " +
- "from the same RDD")
+ blockId.foreach { id =>
+ logInfo(s"Will not store $id as it would require dropping another block " +
+ "from the same RDD")
+ }
false
}
}
@@ -495,8 +529,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
blockId: BlockId,
memory: Long,
droppedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = {
- accountingLock.synchronized {
- // Note: all acquisitions of unroll memory must be synchronized on `accountingLock`
+ memoryManager.synchronized {
val success = memoryManager.acquireUnrollMemory(blockId, memory, droppedBlocks)
if (success) {
val taskAttemptId = currentTaskAttemptId()
@@ -512,7 +545,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*/
def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
+ memoryManager.synchronized {
if (unrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -531,7 +564,7 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
*/
def releasePendingUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
val taskAttemptId = currentTaskAttemptId()
- accountingLock.synchronized {
+ memoryManager.synchronized {
if (pendingUnrollMemoryMap.contains(taskAttemptId)) {
val memoryToRelease = math.min(memory, pendingUnrollMemoryMap(taskAttemptId))
if (memoryToRelease > 0) {
@@ -548,21 +581,21 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
/**
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
*/
- def currentUnrollMemory: Long = accountingLock.synchronized {
+ def currentUnrollMemory: Long = memoryManager.synchronized {
unrollMemoryMap.values.sum + pendingUnrollMemoryMap.values.sum
}
/**
* Return the amount of memory currently occupied for unrolling blocks by this task.
*/
- def currentUnrollMemoryForThisTask: Long = accountingLock.synchronized {
+ def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
}
/**
* Return the number of tasks currently unrolling blocks.
*/
- private def numTasksUnrolling: Int = accountingLock.synchronized { unrollMemoryMap.keys.size }
+ private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
/**
* Log information about current memory usage.
diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
index 29c5732f5a..6a96b5dc12 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala
@@ -48,16 +48,6 @@ import org.apache.spark.executor.ShuffleWriteMetrics
* However, if the spill threshold is too low, we spill frequently and incur unnecessary disk
* writes. This may lead to a performance regression compared to the normal case of using the
* non-spilling AppendOnlyMap.
- *
- * Two parameters control the memory threshold:
- *
- * `spark.shuffle.memoryFraction` specifies the collective amount of memory used for storing
- * these maps as a fraction of the executor's total memory. Since each concurrently running
- * task maintains one map, the actual threshold for each map is this quantity divided by the
- * number of running tasks.
- *
- * `spark.shuffle.safetyFraction` specifies an additional margin of safety as a fraction of
- * this threshold, in case map size estimation is not sufficiently accurate.
*/
@DeveloperApi
class ExternalAppendOnlyMap[K, V, C](
diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
index 600c1403b0..34a4bb968e 100644
--- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala
+++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala
@@ -213,11 +213,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
}
test("compute when only some partitions fit in memory") {
- val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
- sc = new SparkContext(clusterUrl, "test", conf)
- // data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
- // to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
- // to make sure that *some* of them do fit though
+ sc = new SparkContext(clusterUrl, "test", new SparkConf)
+ // TODO: verify that only a subset of partitions fit in memory (SPARK-11078)
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
index d91b799ecf..4a0877d86f 100644
--- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala
@@ -247,11 +247,13 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC
.setMaster("local")
.set("spark.shuffle.spill.compress", shuffleSpillCompress.toString)
.set("spark.shuffle.compress", shuffleCompress.toString)
- .set("spark.shuffle.memoryFraction", "0.001")
resetSparkContext()
sc = new SparkContext(myConf)
+ val diskBlockManager = sc.env.blockManager.diskBlockManager
try {
- sc.parallelize(0 until 100000).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().isEmpty)
+ sc.parallelize(0 until 10).map(i => (i / 4, i)).groupByKey().collect()
+ assert(diskBlockManager.getAllFiles().nonEmpty)
} catch {
case e: Exception =>
val errMsg = s"Failed with spark.shuffle.spill.compress=$shuffleSpillCompress," +
diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
new file mode 100644
index 0000000000..36e4566310
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.mockito.Matchers.{any, anyLong}
+import org.mockito.Mockito.{mock, when}
+import org.mockito.invocation.InvocationOnMock
+import org.mockito.stubbing.Answer
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.storage.MemoryStore
+
+
+/**
+ * Helper trait for sharing code among [[MemoryManager]] tests.
+ */
+private[memory] trait MemoryManagerSuite extends SparkFunSuite {
+
+ import MemoryManagerSuite.DEFAULT_ENSURE_FREE_SPACE_CALLED
+
+ // Note: Mockito's verify mechanism does not provide a way to reset method call counts
+ // without also resetting stubbed methods. Since our test code relies on the latter,
+ // we need to use our own variable to track invocations of `ensureFreeSpace`.
+
+ /**
+ * The amount of free space requested in the last call to [[MemoryStore.ensureFreeSpace]]
+ *
+ * This set whenever [[MemoryStore.ensureFreeSpace]] is called, and cleared when the test
+ * code makes explicit assertions on this variable through [[assertEnsureFreeSpaceCalled]].
+ */
+ private val ensureFreeSpaceCalled = new AtomicLong(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+
+ /**
+ * Make a mocked [[MemoryStore]] whose [[MemoryStore.ensureFreeSpace]] method is stubbed.
+ *
+ * This allows our test code to release storage memory when [[MemoryStore.ensureFreeSpace]]
+ * is called without relying on [[org.apache.spark.storage.BlockManager]] and all of its
+ * dependencies.
+ */
+ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
+ val ms = mock(classOf[MemoryStore])
+ when(ms.ensureFreeSpace(anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 0))
+ when(ms.ensureFreeSpace(any(), anyLong(), any())).thenAnswer(ensureFreeSpaceAnswer(mm, 1))
+ mm.setMemoryStore(ms)
+ ms
+ }
+
+ /**
+ * Make an [[Answer]] that stubs [[MemoryStore.ensureFreeSpace]] with the right arguments.
+ */
+ private def ensureFreeSpaceAnswer(mm: MemoryManager, numBytesPos: Int): Answer[Boolean] = {
+ new Answer[Boolean] {
+ override def answer(invocation: InvocationOnMock): Boolean = {
+ val args = invocation.getArguments
+ require(args.size > numBytesPos, s"bad test: expected >$numBytesPos arguments " +
+ s"in ensureFreeSpace, found ${args.size}")
+ require(args(numBytesPos).isInstanceOf[Long], s"bad test: expected ensureFreeSpace " +
+ s"argument at index $numBytesPos to be a Long: ${args.mkString(", ")}")
+ val numBytes = args(numBytesPos).asInstanceOf[Long]
+ mockEnsureFreeSpace(mm, numBytes)
+ }
+ }
+ }
+
+ /**
+ * Simulate the part of [[MemoryStore.ensureFreeSpace]] that releases storage memory.
+ *
+ * This is a significant simplification of the real method, which actually drops existing
+ * blocks based on the size of each block. Instead, here we simply release as many bytes
+ * as needed to ensure the requested amount of free space. This allows us to set up the
+ * test without relying on the [[org.apache.spark.storage.BlockManager]], which brings in
+ * many other dependencies.
+ *
+ * Every call to this method will set a global variable, [[ensureFreeSpaceCalled]], that
+ * records the number of bytes this is called with. This variable is expected to be cleared
+ * by the test code later through [[assertEnsureFreeSpaceCalled]].
+ */
+ private def mockEnsureFreeSpace(mm: MemoryManager, numBytes: Long): Boolean = mm.synchronized {
+ require(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "bad test: ensure free space variable was not reset")
+ // Record the number of bytes we freed this call
+ ensureFreeSpaceCalled.set(numBytes)
+ if (numBytes <= mm.maxStorageMemory) {
+ def freeMemory = mm.maxStorageMemory - mm.storageMemoryUsed
+ val spaceToRelease = numBytes - freeMemory
+ if (spaceToRelease > 0) {
+ mm.releaseStorageMemory(spaceToRelease)
+ }
+ freeMemory >= numBytes
+ } else {
+ // We attempted to free more bytes than our max allowable memory
+ false
+ }
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
+ */
+ protected def assertEnsureFreeSpaceCalled(ms: MemoryStore, numBytes: Long): Unit = {
+ assert(ensureFreeSpaceCalled.get() === numBytes,
+ s"expected ensure free space to be called with $numBytes")
+ ensureFreeSpaceCalled.set(DEFAULT_ENSURE_FREE_SPACE_CALLED)
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is NOT called.
+ */
+ protected def assertEnsureFreeSpaceNotCalled[T](ms: MemoryStore): Unit = {
+ assert(ensureFreeSpaceCalled.get() === DEFAULT_ENSURE_FREE_SPACE_CALLED,
+ "ensure free space should not have been called!")
+ }
+}
+
+private object MemoryManagerSuite {
+ private val DEFAULT_ENSURE_FREE_SPACE_CALLED = -1L
+}
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
index c436a8b5c9..6cae1f871e 100644
--- a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -19,32 +19,44 @@ package org.apache.spark.memory
import scala.collection.mutable.ArrayBuffer
-import org.mockito.Mockito.{mock, reset, verify, when}
-import org.mockito.Matchers.{any, eq => meq}
+import org.mockito.Mockito.when
+import org.apache.spark.SparkConf
import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
-import org.apache.spark.{SparkConf, SparkFunSuite}
-class StaticMemoryManagerSuite extends SparkFunSuite {
+class StaticMemoryManagerSuite extends MemoryManagerSuite {
private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(
+ maxExecutionMem: Long,
+ maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
+ val mm = new StaticMemoryManager(
+ conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
test("basic execution memory") {
val maxExecutionMem = 1000L
val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
assert(mm.executionMemoryUsed === 0L)
- assert(mm.acquireExecutionMemory(10L) === 10L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
assert(mm.executionMemoryUsed === 10L)
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
// Acquire up to the max
- assert(mm.acquireExecutionMemory(1000L) === 890L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
assert(mm.executionMemoryUsed === maxExecutionMem)
- assert(mm.acquireExecutionMemory(1L) === 0L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
assert(mm.executionMemoryUsed === maxExecutionMem)
mm.releaseExecutionMemory(800L)
assert(mm.executionMemoryUsed === 200L)
// Acquire after release
- assert(mm.acquireExecutionMemory(1L) === 1L)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
assert(mm.executionMemoryUsed === 201L)
// Release beyond what was acquired
mm.releaseExecutionMemory(maxExecutionMem)
@@ -54,37 +66,36 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("basic storage memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("you can see the world you brought to live")
- val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
// `ensureFreeSpace` should be called with the number of bytes requested
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
+ assertEnsureFreeSpaceCalled(ms, 10L)
assert(mm.storageMemoryUsed === 10L)
- assert(evictedBlocks.isEmpty)
assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 110L)
- // Acquire up to the max, not granted
- assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxStorageMem + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxStorageMem + 1L)
assert(mm.storageMemoryUsed === 110L)
- assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxStorageMem, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
assert(mm.storageMemoryUsed === 1000L)
- assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1000L)
mm.releaseStorageMemory(800L)
assert(mm.storageMemoryUsed === 200L)
// Acquire after release
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 201L)
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assertEnsureFreeSpaceCalled(ms, 1L)
assert(mm.storageMemoryUsed === 1L)
// Release beyond what was acquired
mm.releaseStorageMemory(100L)
@@ -95,18 +106,17 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
val maxExecutionMem = 200L
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("ain't nobody love like you do")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
// Only execution memory should increase
- assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 100L)
- assert(mm.acquireExecutionMemory(1000L) === 100L)
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 100L)
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 200L)
// Only storage memory should increase
- assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L)
+ assert(mm.acquireStorageMemory(dummyBlock, 50L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 50L)
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 200L)
// Only execution memory should be released
@@ -114,7 +124,7 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
assert(mm.storageMemoryUsed === 50L)
assert(mm.executionMemoryUsed === 67L)
// Only storage memory should be released
- mm.releaseStorageMemory()
+ mm.releaseAllStorageMemory()
assert(mm.storageMemoryUsed === 0L)
assert(mm.executionMemoryUsed === 67L)
}
@@ -122,51 +132,26 @@ class StaticMemoryManagerSuite extends SparkFunSuite {
test("unroll memory") {
val maxStorageMem = 1000L
val dummyBlock = TestBlockId("lonely water")
- val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
- assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks))
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
assert(mm.storageMemoryUsed === 100L)
mm.releaseUnrollMemory(40L)
assert(mm.storageMemoryUsed === 60L)
when(ms.currentUnrollMemory).thenReturn(60L)
- assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks))
+ assert(mm.acquireUnrollMemory(dummyBlock, 500L, evictedBlocks))
// `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
// Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L)
+ assertEnsureFreeSpaceCalled(ms, 340L)
assert(mm.storageMemoryUsed === 560L)
when(ms.currentUnrollMemory).thenReturn(560L)
- assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks))
+ assert(!mm.acquireUnrollMemory(dummyBlock, 800L, evictedBlocks))
assert(mm.storageMemoryUsed === 560L)
// We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
- assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L)
+ assertEnsureFreeSpaceCalled(ms, 0L)
// Release beyond what was acquired
mm.releaseUnrollMemory(maxStorageMem)
assert(mm.storageMemoryUsed === 0L)
}
- /**
- * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
- */
- private def makeThings(
- maxExecutionMem: Long,
- maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
- val mm = new StaticMemoryManager(
- conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
- val ms = mock(classOf[MemoryStore])
- mm.setMemoryStore(ms)
- (mm, ms)
- }
-
- /**
- * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
- */
- private def assertEnsureFreeSpaceCalled(
- ms: MemoryStore,
- blockId: BlockId,
- numBytes: Long): Unit = {
- verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any())
- reset(ms)
- }
-
}
diff --git a/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
new file mode 100644
index 0000000000..e7baa50dc2
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/UnifiedMemoryManagerSuite.scala
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.scalatest.PrivateMethodTester
+
+import org.apache.spark.SparkConf
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+
+
+class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTester {
+ private val conf = new SparkConf().set("spark.memory.storageFraction", "0.5")
+ private val dummyBlock = TestBlockId("--")
+ private val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+
+ /**
+ * Make a [[UnifiedMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(maxMemory: Long): (UnifiedMemoryManager, MemoryStore) = {
+ val mm = new UnifiedMemoryManager(conf, maxMemory)
+ val ms = makeMemoryStore(mm)
+ (mm, ms)
+ }
+
+ private def getStorageRegionSize(mm: UnifiedMemoryManager): Long = {
+ mm invokePrivate PrivateMethod[Long]('storageRegionSize)()
+ }
+
+ test("storage region size") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ val storageFraction = conf.get("spark.memory.storageFraction").toDouble
+ val expectedStorageRegionSize = maxMemory * storageFraction
+ val actualStorageRegionSize = getStorageRegionSize(mm)
+ assert(expectedStorageRegionSize === actualStorageRegionSize)
+ }
+
+ test("basic execution memory") {
+ val maxMemory = 1000L
+ val (mm, _) = makeThings(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.acquireExecutionMemory(10L, evictedBlocks) === 10L)
+ assert(mm.executionMemoryUsed === 10L)
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ // Acquire up to the max
+ assert(mm.acquireExecutionMemory(1000L, evictedBlocks) === 890L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 0L)
+ assert(mm.executionMemoryUsed === maxMemory)
+ mm.releaseExecutionMemory(800L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireExecutionMemory(1L, evictedBlocks) === 1L)
+ assert(mm.executionMemoryUsed === 201L)
+ // Release beyond what was acquired
+ mm.releaseExecutionMemory(maxMemory)
+ assert(mm.executionMemoryUsed === 0L)
+ }
+
+ test("basic storage memory") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ // `ensureFreeSpace` should be called with the number of bytes requested
+ assertEnsureFreeSpaceCalled(ms, 10L)
+ assert(mm.storageMemoryUsed === 10L)
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire more than the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, maxMemory + 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, maxMemory + 1L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire up to the max, requests after this are still granted due to LRU eviction
+ assert(mm.acquireStorageMemory(dummyBlock, maxMemory, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1000L)
+ assert(mm.storageMemoryUsed === 1000L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1000L)
+ mm.releaseStorageMemory(800L)
+ assert(mm.storageMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 201L)
+ mm.releaseAllStorageMemory()
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 1L)
+ assert(mm.storageMemoryUsed === 1L)
+ // Release beyond what was acquired
+ mm.releaseStorageMemory(100L)
+ assert(mm.storageMemoryUsed === 0L)
+ }
+
+ test("execution evicts storage") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough storage memory to exceed the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.storageMemoryUsed === 750L)
+ require(mm.storageMemoryUsed > storageRegionSize,
+ s"bad test: storage memory used should exceed the storage region")
+ // Execution needs to request 250 bytes to evict storage memory
+ assert(mm.acquireExecutionMemory(100L, evictedBlocks) === 100L)
+ assert(mm.executionMemoryUsed === 100L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ // Execution wants 200 bytes but only 150 are free, so storage is evicted
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assertEnsureFreeSpaceCalled(ms, 200L)
+ assert(mm.executionMemoryUsed === 300L)
+ mm.releaseAllStorageMemory()
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ require(mm.storageMemoryUsed === 0, "bad test: all storage memory should have been released")
+ // Acquire some storage memory again, but this time keep it within the storage region
+ assert(mm.acquireStorageMemory(dummyBlock, 400L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, 400L)
+ require(mm.storageMemoryUsed < storageRegionSize,
+ s"bad test: storage memory used should be within the storage region")
+ // Execution cannot evict storage because the latter is within the storage fraction,
+ // so grant only what's remaining without evicting anything, i.e. 1000 - 300 - 400 = 300
+ assert(mm.acquireExecutionMemory(400L, evictedBlocks) === 300L)
+ assert(mm.executionMemoryUsed === 600L)
+ assert(mm.storageMemoryUsed === 400L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ }
+
+ test("storage does not evict execution") {
+ val maxMemory = 1000L
+ val (mm, ms) = makeThings(maxMemory)
+ // First, ensure the test classes are set up as expected
+ val expectedStorageRegionSize = 500L
+ val expectedExecutionRegionSize = 500L
+ val storageRegionSize = getStorageRegionSize(mm)
+ val executionRegionSize = maxMemory - expectedStorageRegionSize
+ require(storageRegionSize === expectedStorageRegionSize,
+ "bad test: storage region size is unexpected")
+ require(executionRegionSize === expectedExecutionRegionSize,
+ "bad test: storage region size is unexpected")
+ // Acquire enough execution memory to exceed the execution region
+ assert(mm.acquireExecutionMemory(800L, evictedBlocks) === 800L)
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed > executionRegionSize,
+ s"bad test: execution memory used should exceed the execution region")
+ // Storage should not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 100L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 250L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 800L)
+ assert(mm.storageMemoryUsed === 100L)
+ assertEnsureFreeSpaceCalled(ms, 250L)
+ mm.releaseExecutionMemory(maxMemory)
+ mm.releaseStorageMemory(maxMemory)
+ // Acquire some execution memory again, but this time keep it within the execution region
+ assert(mm.acquireExecutionMemory(200L, evictedBlocks) === 200L)
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 0L)
+ assertEnsureFreeSpaceNotCalled(ms)
+ require(mm.executionMemoryUsed < executionRegionSize,
+ s"bad test: execution memory used should be within the execution region")
+ // Storage should still not be able to evict execution
+ assert(mm.acquireStorageMemory(dummyBlock, 750L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 750L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 850L, evictedBlocks))
+ assert(mm.executionMemoryUsed === 200L)
+ assert(mm.storageMemoryUsed === 750L)
+ assertEnsureFreeSpaceCalled(ms, 850L)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
index 6d45b1a101..5877aa042d 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/ShuffleMemoryManagerSuite.scala
@@ -24,7 +24,8 @@ import org.mockito.Mockito._
import org.scalatest.concurrent.Timeouts
import org.scalatest.time.SpanSugar._
-import org.apache.spark.{SparkConf, SparkFunSuite, TaskContext}
+import org.apache.spark.{SparkFunSuite, TaskContext}
+import org.apache.spark.executor.TaskMetrics
class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
@@ -37,7 +38,9 @@ class ShuffleMemoryManagerSuite extends SparkFunSuite with Timeouts {
try {
val taskAttemptId = nextTaskAttemptId.getAndIncrement
val mockTaskContext = mock(classOf[TaskContext], RETURNS_SMART_NULLS)
+ val taskMetrics = new TaskMetrics
when(mockTaskContext.taskAttemptId()).thenReturn(taskAttemptId)
+ when(mockTaskContext.taskMetrics()).thenReturn(taskMetrics)
TaskContext.setTaskContext(mockTaskContext)
body
} finally {
diff --git a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
index 6351539e91..259020a2dd 100644
--- a/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
+++ b/core/src/test/scala/org/apache/spark/shuffle/unsafe/UnsafeShuffleSuite.scala
@@ -36,9 +36,6 @@ class UnsafeShuffleSuite extends ShuffleSuite with BeforeAndAfterAll {
override def beforeAll() {
conf.set("spark.shuffle.manager", "tungsten-sort")
- // UnsafeShuffleManager requires at least 128 MB of memory per task in order to be able to sort
- // shuffle records.
- conf.set("spark.shuffle.memoryFraction", "0.5")
}
test("UnsafeShuffleManager properly cleans up files for shuffles that use the new shuffle path") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index 12e9bafcc9..0a03c32c64 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -22,6 +22,8 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark._
import org.apache.spark.io.CompressionCodec
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
private val allCompressionCodecs = CompressionCodec.ALL_COMPRESSION_CODECS
private def createCombiner[T](i: T) = ArrayBuffer[T](i)
@@ -243,7 +245,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
*/
private def testSimpleSpilling(codec: Option[String] = None): Unit = {
val conf = createSparkConf(loadDefaults = true, codec) // Load defaults for Spark home
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
// reduceByKey - should spill ~8 times
@@ -291,7 +292,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[String]
@@ -340,7 +340,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = new ExternalAppendOnlyMap[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -365,7 +364,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -382,7 +380,6 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(loadDefaults = true)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val map = createExternalMap[Int]
@@ -401,8 +398,8 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext {
test("external aggregation updates peak execution memory") {
val conf = createSparkConf(loadDefaults = false)
- .set("spark.shuffle.memoryFraction", "0.001")
.set("spark.shuffle.manager", "hash") // make sure we're not also using ExternalSorter
+ .set("spark.testing.memory", (10 * 1024 * 1024).toString)
sc = new SparkContext("local", "test", conf)
// No spilling
AccumulatorSuite.verifyPeakExecutionMemorySet(sc, "external map without spilling") {
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index bdb0f4d507..651c7eaa65 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -24,6 +24,8 @@ import scala.util.Random
import org.apache.spark._
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
+// TODO: some of these spilling tests probably aren't actually spilling (SPARK-11078)
+
class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
private def createSparkConf(loadDefaults: Boolean, kryo: Boolean): SparkConf = {
val conf = new SparkConf(loadDefaults)
@@ -38,6 +40,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
conf.set("spark.shuffle.sort.bypassMergeThreshold", "0")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
+ conf.set("spark.testing.memory", "2000000")
conf
}
@@ -50,7 +53,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyDataStream(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -91,7 +93,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def fewElementsPerPartition(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -140,7 +141,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def emptyPartitionsWithSpilling(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.spill.initialMemoryThreshold", "512")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -174,7 +174,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def testSpillingInLocalCluster(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
@@ -252,7 +251,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def spillingInLocalClusterWithManyReduceTasks(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
@@ -323,7 +321,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -348,7 +345,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in sorter if there are errors") {
val conf = createSparkConf(true, false) // Load defaults, otherwise SPARK_HOME is not found
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -372,7 +368,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -387,7 +382,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("cleanup of intermediate files in shuffle with errors") {
val conf = createSparkConf(false, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager
@@ -416,7 +410,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def noPartialAggregationOrSorting(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -438,7 +431,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithoutSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -461,7 +453,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWIthSpillNoOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -485,7 +476,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def partialAggregationWithSpillWithOrdering(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -512,7 +502,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationNoSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -536,7 +525,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
def sortingWithoutAggregationWithSpill(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.001")
conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager")
sc = new SparkContext("local", "test", conf)
@@ -553,7 +541,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -610,7 +597,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with many hash collisions") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.0001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
val agg = new Aggregator[FixedHashObject, Int, Int](_ => 1, _ + _, _ + _)
@@ -633,7 +619,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with hash collisions using the Int.MaxValue key") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: Int): ArrayBuffer[Int] = ArrayBuffer[Int](i)
@@ -657,7 +642,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
test("spilling with null keys and values") {
val conf = createSparkConf(true, false)
- conf.set("spark.shuffle.memoryFraction", "0.001")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
def createCombiner(i: String): ArrayBuffer[String] = ArrayBuffer[String](i)
@@ -693,7 +677,6 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext {
}
private def sortWithoutBreakingSortingContracts(conf: SparkConf) {
- conf.set("spark.shuffle.memoryFraction", "0.01")
conf.set("spark.shuffle.manager", "sort")
sc = new SparkContext("local-cluster[1,1,1024]", "test", conf)
diff --git a/docs/configuration.md b/docs/configuration.md
index 154a3aee68..771d93be04 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -446,17 +446,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.shuffle.memoryFraction</code></td>
- <td>0.2</td>
- <td>
- Fraction of Java heap to use for aggregation and cogroups during shuffles.
- At any given time, the collective size of
- all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
- begin to spill to disk. If spills are often, consider increasing this value at the expense of
- <code>spark.storage.memoryFraction</code>.
- </td>
-</tr>
-<tr>
<td><code>spark.shuffle.service.enabled</code></td>
<td>false</td>
<td>
@@ -712,6 +701,76 @@ Apart from these, the following properties are also available, and may be useful
</tr>
</table>
+#### Memory Management
+<table class="table">
+<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
+<tr>
+ <td><code>spark.memory.fraction</code></td>
+ <td>0.75</td>
+ <td>
+ Fraction of the heap space used for execution and storage. The lower this is, the more
+ frequently spills and cached data eviction occur. The purpose of this config is to set
+ aside memory for internal metadata, user data structures, and imprecise size estimation
+ in the case of sparse, unusually large records.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.memory.storageFraction</code></td>
+ <td>0.5</td>
+ <td>
+ T​he size of the storage region within the space set aside by
+ <code>s​park.memory.fraction</code>. This region is not statically reserved, but dynamically
+ allocated as cache requests come in. ​Cached data may be evicted only if total storage exceeds
+ this region.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.memory.useLegacyMode</code></td>
+ <td>false</td>
+ <td>
+ ​Whether to enable the legacy memory management mode used in Spark 1.5 and before.
+ The legacy mode rigidly partitions the heap space into fixed-size regions,
+ potentially leading to excessive spilling if the application was not tuned.
+ The following deprecated memory fraction configurations are not read unless this is enabled:
+ <code>spark.shuffle.memoryFraction</code><br>
+ <code>spark.storage.memoryFraction</code><br>
+ <code>spark.storage.unrollFraction</code>
+ </td>
+</tr>
+<tr>
+ <td><code>spark.shuffle.memoryFraction</code></td>
+ <td>0.2</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of Java heap to use for aggregation and cogroups during shuffles.
+ At any given time, the collective size of
+ all in-memory maps used for shuffles is bounded by this limit, beyond which the contents will
+ begin to spill to disk. If spills are often, consider increasing this value at the expense of
+ <code>spark.storage.memoryFraction</code>.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.storage.memoryFraction</code></td>
+ <td>0.6</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
+ generation of objects in the JVM, which by default is given 0.6 of the heap, but you can
+ increase it if you configure your own old generation size.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.storage.unrollFraction</code></td>
+ <td>0.2</td>
+ <td>
+ (deprecated) This is read only if <code>spark.memory.useLegacyMode</code> is enabled.
+ Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
+ This is dynamically allocated by dropping existing blocks when there is not enough free
+ storage space to unroll the new block in its entirety.
+ </td>
+</tr>
+</table>
+
#### Execution Behavior
<table class="table">
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
@@ -825,15 +884,6 @@ Apart from these, the following properties are also available, and may be useful
data may need to be rewritten to pre-existing output directories during checkpoint recovery.</td>
</tr>
<tr>
- <td><code>spark.storage.memoryFraction</code></td>
- <td>0.6</td>
- <td>
- Fraction of Java heap to use for Spark's memory cache. This should not be larger than the "old"
- generation of objects in the JVM, which by default is given 0.6 of the heap, but you can
- increase it if you configure your own old generation size.
- </td>
-</tr>
-<tr>
<td><code>spark.storage.memoryMapThreshold</code></td>
<td>2m</td>
<td>
@@ -843,15 +893,6 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
- <td><code>spark.storage.unrollFraction</code></td>
- <td>0.2</td>
- <td>
- Fraction of <code>spark.storage.memoryFraction</code> to use for unrolling blocks in memory.
- This is dynamically allocated by dropping existing blocks when there is not enough free
- storage space to unroll the new block in its entirety.
- </td>
-</tr>
-<tr>
<td><code>spark.externalBlockStore.blockManager</code></td>
<td>org.apache.spark.storage.TachyonBlockManager</td>
<td>
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
index ff65d7bdf8..835f52fa56 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/TestShuffleMemoryManager.scala
@@ -57,7 +57,9 @@ class TestShuffleMemoryManager
}
private class GrantEverythingMemoryManager extends MemoryManager {
- override def acquireExecutionMemory(numBytes: Long): Long = numBytes
+ override def acquireExecutionMemory(
+ numBytes: Long,
+ evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Long = numBytes
override def acquireStorageMemory(
blockId: BlockId,
numBytes: Long,
@@ -66,12 +68,6 @@ private class GrantEverythingMemoryManager extends MemoryManager {
blockId: BlockId,
numBytes: Long,
evictedBlocks: mutable.Buffer[(BlockId, BlockStatus)]): Boolean = true
- override def releaseExecutionMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(numBytes: Long): Unit = { }
- override def releaseStorageMemory(): Unit = { }
- override def releaseUnrollMemory(numBytes: Long): Unit = { }
override def maxExecutionMemory: Long = Long.MaxValue
override def maxStorageMemory: Long = Long.MaxValue
- override def executionMemoryUsed: Long = Long.MaxValue
- override def storageMemoryUsed: Long = Long.MaxValue
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index f7d48bc53e..75d1fced59 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -103,7 +103,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkContext {
val conf = new SparkConf()
.set("spark.shuffle.spill.initialMemoryThreshold", "1024")
.set("spark.shuffle.sort.bypassMergeThreshold", "0")
- .set("spark.shuffle.memoryFraction", "0.0001")
+ .set("spark.testing.memory", "80000")
sc = new SparkContext("local", "test", conf)
outputFile = File.createTempFile("test-unsafe-row-serializer-spill", "")