aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDenny <dennybritz@gmail.com>2012-11-12 10:56:54 -0800
committerDenny <dennybritz@gmail.com>2012-11-12 10:56:54 -0800
commit05e38073540c4be7926b2f194a9f00b29fcdf812 (patch)
treee2edbe950e07a6113754b7800f6903a8909e3706
parent4a1be7e0dbf0031d85b91dc1132fe101d87ba097 (diff)
parent173e0354c0fc95d63112c7ff7121d8ff39f961b7 (diff)
downloadspark-05e38073540c4be7926b2f194a9f00b29fcdf812.tar.gz
spark-05e38073540c4be7926b2f194a9f00b29fcdf812.tar.bz2
spark-05e38073540c4be7926b2f194a9f00b29fcdf812.zip
Merge branch 'master' into blockmanagerUI
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index e51b0c5c15..c57a1d33e9 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -35,6 +35,7 @@ private[spark] class Client(
class ClientActor extends Actor with Logging {
var master: ActorRef = null
+ var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
@@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
+ masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
@@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""))
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)