aboutsummaryrefslogtreecommitdiff
path: root/streaming
diff options
context:
space:
mode:
authorzsxwing <zsxwing@gmail.com>2015-04-04 11:52:05 -0700
committerReynold Xin <rxin@databricks.com>2015-04-04 11:52:05 -0700
commitf15806a8f8ca34288ddb2d74b9ff1972c8374b59 (patch)
tree88abe5de9fadf078e57951450cb3368d0fb7cb64 /streaming
parent7bca62f79056e592cf07b49d8b8d04c59dea25fc (diff)
downloadspark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.gz
spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.tar.bz2
spark-f15806a8f8ca34288ddb2d74b9ff1972c8374b59.zip
[SPARK-6602][Core] Replace direct use of Akka with Spark RPC interface - part 1
This PR replaced the following `Actor`s to `RpcEndpoint`: 1. HeartbeatReceiver 1. ExecutorActor 1. BlockManagerMasterActor 1. BlockManagerSlaveActor 1. CoarseGrainedExecutorBackend and subclasses 1. CoarseGrainedSchedulerBackend.DriverActor This is the first PR. I will split the work of SPARK-6602 to several PRs for code review. Author: zsxwing <zsxwing@gmail.com> Closes #5268 from zsxwing/rpc-rewrite and squashes the following commits: 287e9f8 [zsxwing] Fix the code style 26c56b7 [zsxwing] Merge branch 'master' into rpc-rewrite 9cc825a [zsxwing] Rmove setupThreadSafeEndpoint and add ThreadSafeRpcEndpoint 30a9036 [zsxwing] Make self return null after stopping RpcEndpointRef; fix docs and error messages 705245d [zsxwing] Fix some bugs after rebasing the changes on the master 003cf80 [zsxwing] Update CoarseGrainedExecutorBackend and CoarseGrainedSchedulerBackend to use RpcEndpoint 7d0e6dc [zsxwing] Update BlockManagerSlaveActor to use RpcEndpoint f5d6543 [zsxwing] Update BlockManagerMaster to use RpcEndpoint 30e3f9f [zsxwing] Update ExecutorActor to use RpcEndpoint 478b443 [zsxwing] Update HeartbeatReceiver to use RpcEndpoint
Diffstat (limited to 'streaming')
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala25
1 files changed, 11 insertions, 14 deletions
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
index 18a477f920..ef4873de2f 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala
@@ -24,20 +24,20 @@ import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.language.postfixOps
-import akka.actor.{ActorSystem, Props}
import org.apache.hadoop.conf.Configuration
import org.scalatest.{BeforeAndAfter, FunSuite, Matchers}
import org.scalatest.concurrent.Eventually._
import org.apache.spark._
import org.apache.spark.network.nio.NioBlockTransferService
+import org.apache.spark.rpc.RpcEnv
import org.apache.spark.scheduler.LiveListenerBus
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.shuffle.hash.HashShuffleManager
import org.apache.spark.storage._
import org.apache.spark.streaming.receiver._
import org.apache.spark.streaming.util._
-import org.apache.spark.util.{AkkaUtils, ManualClock, Utils}
+import org.apache.spark.util.{ManualClock, Utils}
import WriteAheadLogBasedBlockHandler._
import WriteAheadLogSuite._
@@ -54,22 +54,19 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
val manualClock = new ManualClock
val blockManagerSize = 10000000
- var actorSystem: ActorSystem = null
+ var rpcEnv: RpcEnv = null
var blockManagerMaster: BlockManagerMaster = null
var blockManager: BlockManager = null
var tempDirectory: File = null
before {
- val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
- "test", "localhost", 0, conf = conf, securityManager = securityMgr)
- this.actorSystem = actorSystem
- conf.set("spark.driver.port", boundPort.toString)
+ rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
+ conf.set("spark.driver.port", rpcEnv.address.port.toString)
- blockManagerMaster = new BlockManagerMaster(
- actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
- conf, true)
+ blockManagerMaster = new BlockManagerMaster(rpcEnv.setupEndpoint("blockmanager",
+ new BlockManagerMasterEndpoint(rpcEnv, true, conf, new LiveListenerBus)), conf, true)
- blockManager = new BlockManager("bm", actorSystem, blockManagerMaster, serializer,
+ blockManager = new BlockManager("bm", rpcEnv, blockManagerMaster, serializer,
blockManagerSize, conf, mapOutputTracker, shuffleManager,
new NioBlockTransferService(conf, securityMgr), securityMgr, 0)
blockManager.initialize("app-id")
@@ -87,9 +84,9 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
blockManagerMaster.stop()
blockManagerMaster = null
}
- actorSystem.shutdown()
- actorSystem.awaitTermination()
- actorSystem = null
+ rpcEnv.shutdown()
+ rpcEnv.awaitTermination()
+ rpcEnv = null
Utils.deleteRecursively(tempDirectory)
}