diff options
author | zsxwing <zsxwing@gmail.com> | 2015-04-04 11:52:05 -0700 |
---|---|---|
committer | Reynold Xin <rxin@databricks.com> | 2015-04-04 11:52:05 -0700 |
commit | f15806a8f8ca34288ddb2d74b9ff1972c8374b59 (patch) | |
tree | 88abe5de9fadf078e57951450cb3368d0fb7cb64 /streaming | |
parent | 7bca62f79056e592cf07b49d8b8d04c59dea25fc (diff) | |
download | spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.gz spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.bz2 spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.zip |
[SPARK-6602][Core] Replace direct use of Akka with Spark RPC interface - part 1
This PR replaced the following `Actor`s to `RpcEndpoint`:
1. HeartbeatReceiver
1. ExecutorActor
1. BlockManagerMasterActor
1. BlockManagerSlaveActor
1. CoarseGrainedExecutorBackend and subclasses
1. CoarseGrainedSchedulerBackend.DriverActor
This is the first PR. I will split the work of SPARK-6602 to several PRs for code review.
Author: zsxwing <zsxwing@gmail.com>
Closes #5268 from zsxwing/rpc-rewrite and squashes the following commits:
287e9f8 [zsxwing] Fix the code style
26c56b7 [zsxwing] Merge branch 'master' into rpc-rewrite
9cc825a [zsxwing] Rmove setupThreadSafeEndpoint and add ThreadSafeRpcEndpoint
30a9036 [zsxwing] Make self return null after stopping RpcEndpointRef; fix docs and error messages
705245d [zsxwing] Fix some bugs after rebasing the changes on the master
003cf80 [zsxwing] Update CoarseGrainedExecutorBackend and CoarseGrainedSchedulerBackend to use RpcEndpoint
7d0e6dc [zsxwing] Update BlockManagerSlaveActor to use RpcEndpoint
f5d6543 [zsxwing] Update BlockManagerMaster to use RpcEndpoint
30e3f9f [zsxwing] Update ExecutorActor to use RpcEndpoint
478b443 [zsxwing] Update HeartbeatReceiver to use RpcEndpoint
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala | 25 |
1 files changed, 11 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 18a477f920..ef4873de2f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -24,20 +24,20 @@ import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.postfixOps -import akka.actor.{ActorSystem, Props} import org.apache.hadoop.conf.Configuration import org.scalatest.{BeforeAndAfter, FunSuite, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ import org.apache.spark.network.nio.NioBlockTransferService +import org.apache.spark.rpc.RpcEnv import org.apache.spark.scheduler.LiveListenerBus import org.apache.spark.serializer.KryoSerializer import org.apache.spark.shuffle.hash.HashShuffleManager import org.apache.spark.storage._ import org.apache.spark.streaming.receiver._ import org.apache.spark.streaming.util._ -import org.apache.spark.util.{AkkaUtils, ManualClock, Utils} +import org.apache.spark.util.{ManualClock, Utils} import WriteAheadLogBasedBlockHandler._ import WriteAheadLogSuite._ @@ -54,22 +54,19 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche val manualClock = new ManualClock val blockManagerSize = 10000000 - var actorSystem: ActorSystem = null + var rpcEnv: RpcEnv = null var blockManagerMaster: BlockManagerMaster = null var blockManager: BlockManager = null var tempDirectory: File = null before { - val (actorSystem, boundPort) = AkkaUtils.createActorSystem( - "test", "localhost", 0, conf = conf, securityManager = securityMgr) - this.actorSystem = actorSystem - conf.set("spark.driver.port", boundPort.toString) + rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr) + conf.set("spark.driver.port", rpcEnv.address.port.toString) - blockManagerMaster = new BlockManagerMaster( - actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))), - conf, true) + blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager", + new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true) - blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer, + blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer, blockManagerSize, conf, mapOutputTracker, shuffleManager, new NioBlockTransferService(conf, securityMgr), securityMgr, 0) blockManager.initialize("app-id") @@ -87,9 +84,9 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche blockManagerMaster.stop() blockManagerMaster = null } - actorSystem.shutdown() - actorSystem.awaitTermination() - actorSystem = null + rpcEnv.shutdown() + rpcEnv.awaitTermination() + rpcEnv = null Utils.deleteRecursively(tempDirectory) } |