diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 18 |
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 09530beb3b..3904b701b2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -22,12 +22,12 @@ import java.util.Date import java.io.File import scala.collection.mutable.HashMap +import scala.concurrent.duration._ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} -import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} -import akka.util.duration._ +import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent} -import org.apache.spark.{Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.ExecutorState import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -113,15 +113,17 @@ private[spark] class Worker( logInfo("Connecting to master " + masterUrl) master = context.actorFor(Master.toAkkaUrl(masterUrl)) master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } + import context.dispatcher + override def receive = { case RegisteredWorker(url) => masterWebUiUrl = url logInfo("Successfully registered with master") - context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { + context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } @@ -163,7 +165,7 @@ private[spark] class Worker( logInfo("Asked to kill unknown executor " + fullId) } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case DisassociatedEvent(_, _, _) => masterDisconnected() case RequestWorkerState => { @@ -205,8 +207,8 @@ private[spark] object Worker { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) - val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, - masterUrl, workDir)), name = "Worker") + actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, + masterUrl, workDir), name = "Worker") (actorSystem, boundPort) } |