aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-08 21:44:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-08 21:44:59 -0700
commit67fbecbf32fced87d3accd2618fef2af9f44fae2 (patch)
treea0b14456a5c7fad7b98a3c0508884f4e9698f6b6 /core/src/test
parent09841290055770a619a2e72fbaef1a5e694916ae (diff)
downloadspark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.gz
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.bz2
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.zip
[SPARK-10956] Common MemoryManager interface for storage and execution
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible. This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks. Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes #9000 from andrewor14/memory-manager.
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala172
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala29
-rw-r--r--core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala34
3 files changed, 211 insertions, 24 deletions
diff --git a/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
new file mode 100644
index 0000000000..c436a8b5c9
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/memory/StaticMemoryManagerSuite.scala
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.memory
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.mockito.Mockito.{mock, reset, verify, when}
+import org.mockito.Matchers.{any, eq => meq}
+
+import org.apache.spark.storage.{BlockId, BlockStatus, MemoryStore, TestBlockId}
+import org.apache.spark.{SparkConf, SparkFunSuite}
+
+
+class StaticMemoryManagerSuite extends SparkFunSuite {
+ private val conf = new SparkConf().set("spark.storage.unrollFraction", "0.4")
+
+ test("basic execution memory") {
+ val maxExecutionMem = 1000L
+ val (mm, _) = makeThings(maxExecutionMem, Long.MaxValue)
+ assert(mm.executionMemoryUsed === 0L)
+ assert(mm.acquireExecutionMemory(10L) === 10L)
+ assert(mm.executionMemoryUsed === 10L)
+ assert(mm.acquireExecutionMemory(100L) === 100L)
+ // Acquire up to the max
+ assert(mm.acquireExecutionMemory(1000L) === 890L)
+ assert(mm.executionMemoryUsed === maxExecutionMem)
+ assert(mm.acquireExecutionMemory(1L) === 0L)
+ assert(mm.executionMemoryUsed === maxExecutionMem)
+ mm.releaseExecutionMemory(800L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireExecutionMemory(1L) === 1L)
+ assert(mm.executionMemoryUsed === 201L)
+ // Release beyond what was acquired
+ mm.releaseExecutionMemory(maxExecutionMem)
+ assert(mm.executionMemoryUsed === 0L)
+ }
+
+ test("basic storage memory") {
+ val maxStorageMem = 1000L
+ val dummyBlock = TestBlockId("you can see the world you brought to live")
+ val evictedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 10L, evictedBlocks))
+ // `ensureFreeSpace` should be called with the number of bytes requested
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 10L)
+ assert(mm.storageMemoryUsed === 10L)
+ assert(evictedBlocks.isEmpty)
+ assert(mm.acquireStorageMemory(dummyBlock, 100L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assert(mm.storageMemoryUsed === 110L)
+ // Acquire up to the max, not granted
+ assert(!mm.acquireStorageMemory(dummyBlock, 1000L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 1000L)
+ assert(mm.storageMemoryUsed === 110L)
+ assert(mm.acquireStorageMemory(dummyBlock, 890L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 890L)
+ assert(mm.storageMemoryUsed === 1000L)
+ assert(!mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.storageMemoryUsed === 1000L)
+ mm.releaseStorageMemory(800L)
+ assert(mm.storageMemoryUsed === 200L)
+ // Acquire after release
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.storageMemoryUsed === 201L)
+ mm.releaseStorageMemory()
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.acquireStorageMemory(dummyBlock, 1L, evictedBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 1L)
+ assert(mm.storageMemoryUsed === 1L)
+ // Release beyond what was acquired
+ mm.releaseStorageMemory(100L)
+ assert(mm.storageMemoryUsed === 0L)
+ }
+
+ test("execution and storage isolation") {
+ val maxExecutionMem = 200L
+ val maxStorageMem = 1000L
+ val dummyBlock = TestBlockId("ain't nobody love like you do")
+ val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val (mm, ms) = makeThings(maxExecutionMem, maxStorageMem)
+ // Only execution memory should increase
+ assert(mm.acquireExecutionMemory(100L) === 100L)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.executionMemoryUsed === 100L)
+ assert(mm.acquireExecutionMemory(1000L) === 100L)
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Only storage memory should increase
+ assert(mm.acquireStorageMemory(dummyBlock, 50L, dummyBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 50L)
+ assert(mm.storageMemoryUsed === 50L)
+ assert(mm.executionMemoryUsed === 200L)
+ // Only execution memory should be released
+ mm.releaseExecutionMemory(133L)
+ assert(mm.storageMemoryUsed === 50L)
+ assert(mm.executionMemoryUsed === 67L)
+ // Only storage memory should be released
+ mm.releaseStorageMemory()
+ assert(mm.storageMemoryUsed === 0L)
+ assert(mm.executionMemoryUsed === 67L)
+ }
+
+ test("unroll memory") {
+ val maxStorageMem = 1000L
+ val dummyBlock = TestBlockId("lonely water")
+ val dummyBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
+ val (mm, ms) = makeThings(Long.MaxValue, maxStorageMem)
+ assert(mm.acquireUnrollMemory(dummyBlock, 100L, dummyBlocks))
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 100L)
+ assert(mm.storageMemoryUsed === 100L)
+ mm.releaseUnrollMemory(40L)
+ assert(mm.storageMemoryUsed === 60L)
+ when(ms.currentUnrollMemory).thenReturn(60L)
+ assert(mm.acquireUnrollMemory(dummyBlock, 500L, dummyBlocks))
+ // `spark.storage.unrollFraction` is 0.4, so the max unroll space is 400 bytes.
+ // Since we already occupy 60 bytes, we will try to ensure only 400 - 60 = 340 bytes.
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 340L)
+ assert(mm.storageMemoryUsed === 560L)
+ when(ms.currentUnrollMemory).thenReturn(560L)
+ assert(!mm.acquireUnrollMemory(dummyBlock, 800L, dummyBlocks))
+ assert(mm.storageMemoryUsed === 560L)
+ // We already have 560 bytes > the max unroll space of 400 bytes, so no bytes are freed
+ assertEnsureFreeSpaceCalled(ms, dummyBlock, 0L)
+ // Release beyond what was acquired
+ mm.releaseUnrollMemory(maxStorageMem)
+ assert(mm.storageMemoryUsed === 0L)
+ }
+
+ /**
+ * Make a [[StaticMemoryManager]] and a [[MemoryStore]] with limited class dependencies.
+ */
+ private def makeThings(
+ maxExecutionMem: Long,
+ maxStorageMem: Long): (StaticMemoryManager, MemoryStore) = {
+ val mm = new StaticMemoryManager(
+ conf, maxExecutionMemory = maxExecutionMem, maxStorageMemory = maxStorageMem)
+ val ms = mock(classOf[MemoryStore])
+ mm.setMemoryStore(ms)
+ (mm, ms)
+ }
+
+ /**
+ * Assert that [[MemoryStore.ensureFreeSpace]] is called with the given parameters.
+ */
+ private def assertEnsureFreeSpaceCalled(
+ ms: MemoryStore,
+ blockId: BlockId,
+ numBytes: Long): Unit = {
+ verify(ms).ensureFreeSpace(meq(blockId), meq(numBytes: java.lang.Long), any())
+ reset(ms)
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
index eb5af70d57..cc44c676b2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.concurrent.Eventually._
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark._
+import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.BlockTransferService
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
@@ -39,29 +40,31 @@ import org.apache.spark.storage.StorageLevel._
class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with BeforeAndAfter {
private val conf = new SparkConf(false).set("spark.app.id", "test")
- var rpcEnv: RpcEnv = null
- var master: BlockManagerMaster = null
- val securityMgr = new SecurityManager(conf)
- val mapOutputTracker = new MapOutputTrackerMaster(conf)
- val shuffleManager = new HashShuffleManager(conf)
+ private var rpcEnv: RpcEnv = null
+ private var master: BlockManagerMaster = null
+ private val securityMgr = new SecurityManager(conf)
+ private val mapOutputTracker = new MapOutputTrackerMaster(conf)
+ private val shuffleManager = new HashShuffleManager(conf)
// List of block manager created during an unit test, so that all of the them can be stopped
// after the unit test.
- val allStores = new ArrayBuffer[BlockManager]
+ private val allStores = new ArrayBuffer[BlockManager]
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test
conf.set("spark.kryoserializer.buffer", "1m")
- val serializer = new KryoSerializer(conf)
+ private val serializer = new KryoSerializer(conf)
// Implicitly convert strings to BlockIds for test clarity.
- implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
+ private implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val store = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
+ val store = new BlockManager(name, rpcEnv, master, serializer, conf,
+ memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ memManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
allStores += store
store
@@ -258,8 +261,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
when(failableTransfer.hostName).thenReturn("some-hostname")
when(failableTransfer.port).thenReturn(1000)
- val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer,
- 10000, conf, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000)
+ val failableStore = new BlockManager("failable-store", rpcEnv, master, serializer, conf,
+ memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
+ memManager.setMemoryStore(failableStore.memoryStore)
failableStore.initialize("app-id")
allStores += failableStore // so that this gets stopped after test
assert(master.getPeers(store.blockManagerId).toSet === Set(failableStore.blockManagerId))
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 34bb4952e7..f3fab33ca2 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
+import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.{JavaSerializer, KryoSerializer}
import org.apache.spark.shuffle.hash.HashShuffleManager
@@ -67,10 +68,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val manager = new BlockManager(name, rpcEnv, master, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
- manager.initialize("app-id")
- manager
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
+ val blockManager = new BlockManager(name, rpcEnv, master, serializer, conf,
+ memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ memManager.setMemoryStore(blockManager.memoryStore)
+ blockManager.initialize("app-id")
+ blockManager
}
override def beforeEach(): Unit = {
@@ -820,9 +823,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("block store put failure") {
// Use Java serializer so we can create an unserializable error.
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
+ val memoryManager = new StaticMemoryManager(conf, Long.MaxValue, 1200)
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
- new JavaSerializer(conf), 1200, conf, mapOutputTracker, shuffleManager, transfer, securityMgr,
- 0)
+ new JavaSerializer(conf), conf, memoryManager, mapOutputTracker,
+ shuffleManager, transfer, securityMgr, 0)
+ memoryManager.setMemoryStore(store.memoryStore)
// The put should fail since a1 is not serializable.
class UnserializableClass
@@ -1043,14 +1048,19 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.currentUnrollMemory === 0)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
+ def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
+ memoryStore.reserveUnrollMemoryForThisTask(
+ TestBlockId(""), memory, new ArrayBuffer[(BlockId, BlockStatus)])
+ }
+
// Reserve
- memoryStore.reserveUnrollMemoryForThisTask(100)
+ assert(reserveUnrollMemoryForThisTask(100))
assert(memoryStore.currentUnrollMemoryForThisTask === 100)
- memoryStore.reserveUnrollMemoryForThisTask(200)
+ assert(reserveUnrollMemoryForThisTask(200))
assert(memoryStore.currentUnrollMemoryForThisTask === 300)
- memoryStore.reserveUnrollMemoryForThisTask(500)
+ assert(reserveUnrollMemoryForThisTask(500))
assert(memoryStore.currentUnrollMemoryForThisTask === 800)
- memoryStore.reserveUnrollMemoryForThisTask(1000000)
+ assert(!reserveUnrollMemoryForThisTask(1000000))
assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
// Release
memoryStore.releaseUnrollMemoryForThisTask(100)
@@ -1058,9 +1068,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
memoryStore.releaseUnrollMemoryForThisTask(100)
assert(memoryStore.currentUnrollMemoryForThisTask === 600)
// Reserve again
- memoryStore.reserveUnrollMemoryForThisTask(4400)
+ assert(reserveUnrollMemoryForThisTask(4400))
assert(memoryStore.currentUnrollMemoryForThisTask === 5000)
- memoryStore.reserveUnrollMemoryForThisTask(20000)
+ assert(!reserveUnrollMemoryForThisTask(20000))
assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
// Release again
memoryStore.releaseUnrollMemoryForThisTask(1000)