aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala18
1 files changed, 10 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 09530beb3b..3904b701b2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -22,12 +22,12 @@ import java.util.Date
import java.io.File
import scala.collection.mutable.HashMap
+import scala.concurrent.duration._
import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
-import akka.util.duration._
+import akka.remote.{RemotingLifecycleEvent, AssociationErrorEvent, DisassociatedEvent}
-import org.apache.spark.{Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.ExecutorState
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -113,15 +113,17 @@ private[spark] class Worker(
logInfo("Connecting to master " + masterUrl)
master = context.actorFor(Master.toAkkaUrl(masterUrl))
master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
+ import context.dispatcher
+
override def receive = {
case RegisteredWorker(url) =>
masterWebUiUrl = url
logInfo("Successfully registered with master")
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
@@ -163,7 +165,7 @@ private[spark] class Worker(
logInfo("Asked to kill unknown executor " + fullId)
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case DisassociatedEvent(_, _, _) =>
masterDisconnected()
case RequestWorkerState => {
@@ -205,8 +207,8 @@ private[spark] object Worker {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
- val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrl, workDir)), name = "Worker")
+ actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
+ masterUrl, workDir), name = "Worker")
(actorSystem, boundPort)
}