aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Or <andrew@databricks.com>2015-10-08 21:44:59 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-08 21:44:59 -0700
commit67fbecbf32fced87d3accd2618fef2af9f44fae2 (patch)
treea0b14456a5c7fad7b98a3c0508884f4e9698f6b6 /streaming
parent09841290055770a619a2e72fbaef1a5e694916ae (diff)
downloadspark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.gz
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.tar.bz2
spark-67fbecbf32fced87d3accd2618fef2af9f44fae2.zip
[SPARK-10956] Common MemoryManager interface for storage and execution
This patch introduces a `MemoryManager` that is the central arbiter of how much memory to grant to storage and execution. This patch is primarily concerned only with refactoring while preserving the existing behavior as much as possible. This is the first step away from the existing rigid separation of storage and execution memory, which has several major drawbacks discussed on the [issue](https://issues.apache.org/jira/browse/SPARK-10956). It is the precursor of a series of patches that will attempt to address those drawbacks. Author: Andrew Or <andrew@databricks.com> Author: Josh Rosen <joshrosen@databricks.com> Author: andrewor14 <andrew@databricks.com> Closes #9000 from andrewor14/memory-manager.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala13
1 files changed, 8 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 13cfe29d7b..b2b6848719 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
@@ -253,12 +254,14 @@ class ReceivedBlockHandlerSuite
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
- manager.initialize("app-id")
- blockManagerBuffer += manager
- manager
+ val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf,
+ memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ memManager.setMemoryStore(blockManager.memoryStore)
+ blockManager.initialize("app-id")
+ blockManagerBuffer += blockManager
+ blockManager
}
/**