aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-11-11 21:06:57 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-11-11 21:06:57 -0800
commit173e0354c0fc95d63112c7ff7121d8ff39f961b7 (patch)
tree97e3ba31aa91c0b7d28eaa3cdee0025636432ad0
parentacf827232458e87773a71a38f88cb7ba9a6ab77e (diff)
downloadspark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.tar.gz
spark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.tar.bz2
spark-173e0354c0fc95d63112c7ff7121d8ff39f961b7.zip
Detect correctly when one has disconnected from a standalone cluster.
SPARK-617 #resolve
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala14
1 files changed, 13 insertions, 1 deletions
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index e51b0c5c15..c57a1d33e9 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -35,6 +35,7 @@ private[spark] class Client(
class ClientActor extends Actor with Logging {
var master: ActorRef = null
+ var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
override def preStart() {
@@ -43,6 +44,7 @@ private[spark] class Client(
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
+ masterAddress = master.path.address
master ! RegisterJob(jobDescription)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
@@ -72,7 +74,17 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""))
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientDisconnected(transport, address) if address == masterAddress =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case RemoteClientShutdown(transport, address) if address == masterAddress =>
logError("Connection to master failed; stopping client")
markDisconnected()
context.stop(self)