diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-11-11 21:06:57 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2012-11-11 21:06:57 -0800 |
commit | 173e0354c0fc95d63112c7ff7121d8ff39f961b7 (patch) | |
tree | 97e3ba31aa91c0b7d28eaa3cdee0025636432ad0 /core | |
parent | acf827232458e87773a71a38f88cb7ba9a6ab77e (diff) | |
download | spark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.tar.gz spark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.tar.bz2 spark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.zip |
Detect correctly when one has disconnected from a standalone cluster.
SPARK-617 #resolve
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/deploy/client/Client.scala | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index e51b0c5c15..c57a1d33e9 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -35,6 +35,7 @@ private[spark] class Client( class ClientActor extends Actor with Logging { var master: ActorRef = null + var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { @@ -43,6 +44,7 @@ private[spark] class Client( val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) + masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing @@ -72,7 +74,17 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse("")) } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case Terminated(actor_) if actor_ == master => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientDisconnected(transport, address) if address == masterAddress => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientShutdown(transport, address) if address == masterAddress => logError("Connection to master failed; stopping client") markDisconnected() context.stop(self) |