aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala11
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala4
2 files changed, 9 insertions, 6 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 408692ec9c..f60e56d959 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -27,7 +27,7 @@ import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
-import org.apache.spark.Logging
+import org.apache.spark.{SparkException, Logging}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -49,7 +49,7 @@ private[spark] class Client(
val REGISTRATION_TIMEOUT = 20.seconds
val REGISTRATION_RETRIES = 3
- var prevMaster: ActorRef = null // set for unwatching, when it fails.
+ var masterAddress: Address = null
var actor: ActorRef = null
var appId: String = null
var registered = false
@@ -103,11 +103,14 @@ private[spark] class Client(
def changeMaster(url: String) {
activeMasterUrl = url
master = context.actorSelection(Master.toAkkaUrl(activeMasterUrl))
+ masterAddress = activeMasterUrl match {
+ case Master.sparkUrlRegex(host, port) => Address("akka.tcp", Master.systemName, host, port.toInt)
+ case x => throw new SparkException("Invalid spark URL:"+x)
+ }
}
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
- prevMaster = sender
appId = appId_
registered = true
changeMaster(masterUrl)
@@ -137,7 +140,7 @@ private[spark] class Client(
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
- case DisassociatedEvent(_, address, _) =>
+ case DisassociatedEvent(_, address, _) if address == masterAddress =>
logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 81fb5c4e43..0e2b461b13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -517,9 +517,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
private[spark] object Master {
- private val systemName = "sparkMaster"
+ val systemName = "sparkMaster"
private val actorName = "Master"
- private val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
+ val sparkUrlRegex = "spark://([^:]+):([0-9]+)".r
def main(argStrings: Array[String]) {
val args = new MasterArguments(argStrings)