diff options
author | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:08 -0800 |
---|---|---|
committer | Ankur Dave <ankurdave@gmail.com> | 2014-01-08 21:19:08 -0800 |
commit | 91227566bc9d8aabaec3f2a37a09a17afa20989c (patch) | |
tree | 80d19aac29217005b3f1cb08ca95fa08bbb9d946 /core/src/main/scala/org/apache/spark/deploy/client/Client.scala | |
parent | 7210257ba3038d5e22d4b60fe9c3113dc45c3dff (diff) | |
parent | 04d83fc37f9eef89c20331c85291a0a169f75e6d (diff) | |
download | spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.gz spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.tar.bz2 spark-91227566bc9d8aabaec3f2a37a09a17afa20989c.zip |
Merge remote-tracking branch 'spark-upstream/master' into HEAD
Conflicts:
README.md
core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala
core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala
core/src/main/scala/org/apache/spark/util/collection/PrimitiveKeyOpenHashMap.scala
pom.xml
project/SparkBuild.scala
repl/src/main/scala/org/apache/spark/repl/SparkILoop.scala
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/client/Client.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/client/Client.scala | 61 |
1 files changed, 32 insertions, 29 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 77422f61ec..481026eaa2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -19,21 +19,18 @@ package org.apache.spark.deploy.client import java.util.concurrent.TimeoutException +import scala.concurrent.Await +import scala.concurrent.duration._ + import akka.actor._ -import akka.actor.Terminated import akka.pattern.ask -import akka.util.Duration -import akka.util.duration._ -import akka.remote.RemoteClientDisconnected -import akka.remote.RemoteClientLifeCycleEvent -import akka.remote.RemoteClientShutdown -import akka.dispatch.Await - -import org.apache.spark.Logging +import akka.remote.{AssociationErrorEvent, DisassociatedEvent, RemotingLifecycleEvent} + +import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master - +import org.apache.spark.util.AkkaUtils /** * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, @@ -45,24 +42,26 @@ private[spark] class Client( actorSystem: ActorSystem, masterUrls: Array[String], appDescription: ApplicationDescription, - listener: ClientListener) + listener: ClientListener, + conf: SparkConf) extends Logging { val REGISTRATION_TIMEOUT = 20.seconds val REGISTRATION_RETRIES = 3 + var masterAddress: Address = null var actor: ActorRef = null var appId: String = null var registered = false var activeMasterUrl: String = null class ClientActor extends Actor with Logging { - var master: ActorRef = null - var masterAddress: Address = null + var master: ActorSelection = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { + context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) try { registerWithMaster() } catch { @@ -76,7 +75,7 @@ private[spark] class Client( def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") - val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterApplication(appDescription) } } @@ -84,6 +83,7 @@ private[spark] class Client( def registerWithMaster() { tryRegisterAllMasters() + import context.dispatcher var retries = 0 lazy val retryTimer: Cancellable = context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { @@ -102,10 +102,19 @@ private[spark] class Client( def changeMaster(url: String) { activeMasterUrl = url - master = context.actorFor(Master.toAkkaUrl(url)) - masterAddress = master.path.address - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl)) + masterAddress = activeMasterUrl match { + case Master.sparkUrlRegex(host, port) => + Address("akka.tcp", Master.systemName, host, port.toInt) + case x => + throw new SparkException("Invalid spark URL: " + x) + } + } + + private def isPossibleMaster(remoteUrl: Address) = { + masterUrls.map(s => Master.toAkkaUrl(s)) + .map(u => AddressFromURIString(u).hostPort) + .contains(remoteUrl.hostPort) } override def receive = { @@ -135,22 +144,16 @@ private[spark] class Client( case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) - context.unwatch(master) changeMaster(masterUrl) alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - case Terminated(actor_) if actor_ == master => - logWarning("Connection to master failed; waiting for master to reconnect...") - markDisconnected() - - case RemoteClientDisconnected(transport, address) if address == masterAddress => - logWarning("Connection to master failed; waiting for master to reconnect...") + case DisassociatedEvent(_, address, _) if address == masterAddress => + logWarning(s"Connection to $address failed; waiting for master to reconnect...") markDisconnected() - case RemoteClientShutdown(transport, address) if address == masterAddress => - logWarning("Connection to master failed; waiting for master to reconnect...") - markDisconnected() + case AssociationErrorEvent(cause, _, address, _) if isPossibleMaster(address) => + logWarning(s"Could not connect to $address: $cause") case StopClient => markDead() @@ -184,7 +187,7 @@ private[spark] class Client( def stop() { if (actor != null) { try { - val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeout = AkkaUtils.askTimeout(conf) val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { |