diff options
author | Denny <dennybritz@gmail.com> | 2012-11-12 10:56:54 -0800 |
---|---|---|
committer | Denny <dennybritz@gmail.com> | 2012-11-12 10:56:54 -0800 |
commit | 05e38073540c4be7926b2f194a9f00b29fcdf812 (patch) | |
tree | e2edbe950e07a6113754b7800f6903a8909e3706 | |
parent | 4a1be7e0dbf0031d85b91dc1132fe101d87ba097 (diff) | |
parent | 173e0354c0fc95d63112c7ff7121d8ff39f961b7 (diff) | |
download | spark-05e38073540c4be7926b2f194a9f00b29fcdf812.tar.gz spark-05e38073540c4be7926b2f194a9f00b29fcdf812.tar.bz2 spark-05e38073540c4be7926b2f194a9f00b29fcdf812.zip |
Merge branch 'master' into blockmanagerUI
-rw-r--r-- | core/src/main/scala/spark/deploy/client/Client.scala | 14 |
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala index e51b0c5c15..c57a1d33e9 100644 --- a/core/src/main/scala/spark/deploy/client/Client.scala +++ b/core/src/main/scala/spark/deploy/client/Client.scala @@ -35,6 +35,7 @@ private[spark] class Client( class ClientActor extends Actor with Logging { var master: ActorRef = null + var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { @@ -43,6 +44,7 @@ private[spark] class Client( val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort) try { master = context.actorFor(akkaUrl) + masterAddress = master.path.address master ! RegisterJob(jobDescription) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing @@ -72,7 +74,17 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse("")) } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case Terminated(actor_) if actor_ == master => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientDisconnected(transport, address) if address == masterAddress => + logError("Connection to master failed; stopping client") + markDisconnected() + context.stop(self) + + case RemoteClientShutdown(transport, address) if address == masterAddress => logError("Connection to master failed; stopping client") markDisconnected() context.stop(self) |