aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala13
1 files changed, 8 insertions, 5 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 13cfe29d7b..b2b6848719 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -29,6 +29,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
+import org.apache.spark.memory.StaticMemoryManager
import org.apache.spark.network.netty.NettyBlockTransferService
import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
@@ -253,12 +254,14 @@ class ReceivedBlockHandlerSuite
maxMem: Long,
conf: SparkConf,
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
+ val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem)
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
- val manager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, maxMem, conf,
- mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
- manager.initialize("app-id")
- blockManagerBuffer += manager
- manager
+ val blockManager = new BlockManager(name, rpcEnv, blockManagerMaster, serializer, conf,
+ memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
+ memManager.setMemoryStore(blockManager.memoryStore)
+ blockManager.initialize("app-id")
+ blockManagerBuffer += blockManager
+ blockManager
}
/**