diff options
author | Andrew Or <andrew@databricks.com> | 2015-10-08 21:44:59 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2015-10-08 21:44:59 -0700 |
commit | 67fbecbf32fced87d3accd2618fef2af9f44fae2 (patch) | |
tree | a0b14456a5c7fad7b98a3c0508884f4e9698f6b6 /streaming | |
parent | 09841290055770a619a2e72fbaef1a5e694916ae (diff) | |
download | spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.gz spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.bz2 spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.zip |
[SPARK-10956] Common MemoryManager interface for storage and execution
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible.
This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks.
Author: Andrew Or <andrew@databricks.com>
Author: Josh Rosen <joshrosen@databricks.com>
Author: andrewor14 <andrew@databricks.com>
Closes #9000 from andrewor14/memory-manager.
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 13 |
1 files changed, 8 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 13cfe29d7b..b2b6848719 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ +import org.apache.spark.memory.StaticMemoryManager import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus @@ -253,12 +254,14 @@ class ReceivedBlockHandlerSuite maxMem: Long, conf: SparkConf, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf, - mapOutputTracker, shuffleManager, transfer, securityMgr, 0) - manager.initialize("app-id") - blockManagerBuffer += manager - manager + val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf, + memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) + memManager.setMemoryStore(blockManager.memoryStore) + blockManager.initialize("app-id") + blockManagerBuffer += blockManager + blockManager } /** |