aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-25 17:55:21 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-25 18:00:02 +0530
commit489862a65766d30278c186d280c6286937c81155 (patch)
tree9542153b642e68bcfde0a193625a61049825a0af
parent77929cfeed95905106f5b3891e8de1b1c312d119 (diff)
downloadspark-489862a65766d30278c186d280c6286937c81155.tar.gz
spark-489862a65766d30278c186d280c6286937c81155.tar.bz2
spark-489862a65766d30278c186d280c6286937c81155.zip
Remote death watch has a funny bug.
https://gist.github.com/ScrapCodes/4805fd84906e40b7b03d
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala10
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/AkkaUtils.scala11
-rwxr-xr-xspark-class1
7 files changed, 8 insertions, 49 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 070f10f729..408692ec9c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,7 +23,6 @@ import scala.concurrent.duration._
import scala.concurrent.Await
import akka.actor._
-import akka.actor.Terminated
import akka.pattern.AskTimeoutException
import akka.pattern.ask
import akka.remote.{RemotingLifecycleEvent, DisassociatedEvent, AssociationErrorEvent}
@@ -62,6 +61,7 @@ private[spark] class Client(
var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
+ context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
try {
registerWithMaster()
} catch {
@@ -107,7 +107,6 @@ private[spark] class Client(
override def receive = {
case RegisteredApplication(appId_, masterUrl) =>
- context.watch(sender)
prevMaster = sender
appId = appId_
registered = true
@@ -123,7 +122,7 @@ private[spark] class Client(
val fullId = appId + "/" + id
logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, hostPort, cores))
listener.executorAdded(fullId, workerId, hostPort, cores, memory)
-
+
case ExecutorUpdated(id, state, message, exitStatus) =>
val fullId = appId + "/" + id
val messageText = message.map(s => " (" + s + ")").getOrElse("")
@@ -134,13 +133,12 @@ private[spark] class Client(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(prevMaster)
changeMaster(masterUrl)
alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
- case Terminated(actor_) =>
- logWarning(s"Connection to $actor_ failed; waiting for master to reconnect...")
+ case DisassociatedEvent(_, address, _) =>
+ logWarning(s"Connection to $address failed; waiting for master to reconnect...")
markDisconnected()
case StopClient =>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 25f5927128..81fb5c4e43 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -147,9 +147,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
RecoveryState.ALIVE
else
RecoveryState.RECOVERING
-
logInfo("I have been elected leader! New state: " + state)
-
if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
@@ -171,7 +169,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
} else {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
registerWorker(worker)
- context.watch(sender)
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
@@ -186,7 +183,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
- context.watch(sender)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
@@ -262,15 +258,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
- case Terminated(actor) => {
- // The disconnected actor could've been either a worker or an app; remove whichever of
- // those we have an entry for in the corresponding actor hashmap
- logInfo(s"$actor got terminated, removing it.")
- actorToWorker.get(actor).foreach(removeWorker)
- actorToApp.get(actor).foreach(finishApplication)
- if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
- }
-
case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
@@ -438,8 +425,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
- context.stop(worker.actor)
- context.unwatch(worker.actor)
persistenceEngine.removeWorker(worker)
}
@@ -502,8 +487,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
app.driver ! ApplicationRemoved(state.toString)
}
persistenceEngine.removeApplication(app)
- context.stop(app.driver)
- context.unwatch(app.driver)
schedule()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 3a7d0b859b..0a183afd8e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -73,7 +73,6 @@ private[spark] class Worker(
val masterLock: Object = new Object()
var master: ActorSelection = null
- var prevMaster: ActorRef = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
@volatile var registered = false
@@ -173,8 +172,6 @@ private[spark] class Worker(
case RegisteredWorker(masterUrl, masterWebUiUrl) =>
logInfo("Successfully registered with master " + masterUrl)
registered = true
- context.watch(sender) // remote death watch for master
- prevMaster = sender
changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
@@ -185,8 +182,6 @@ private[spark] class Worker(
case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
- context.unwatch(prevMaster)
- prevMaster = sender
changeMaster(masterUrl, masterWebUiUrl)
val execs = executors.values.
@@ -245,10 +240,6 @@ private[spark] class Worker(
}
}
- case Terminated(actor_) =>
- logInfo(s"$actor_ terminated !")
- masterDisconnected()
-
case x: DisassociatedEvent =>
logInfo(s"$x Disassociated !")
masterDisconnected()
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 2818a775d0..dcb12bed4e 100644
--- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -51,7 +51,6 @@ private[spark] class CoarseGrainedExecutorBackend(
override def receive = {
case RegisteredExecutor(sparkProperties) =>
logInfo("Successfully registered with driver")
- context.watch(sender) //Start watching for terminated messages.
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties)
@@ -76,10 +75,6 @@ private[spark] class CoarseGrainedExecutorBackend(
executor.killTask(taskId)
}
- case Terminated(actor) =>
- logError(s"Driver $actor terminated, Shutting down.")
- System.exit(1)
-
case x: DisassociatedEvent =>
logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e316f6b41f..d614dcbdd8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -73,7 +73,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
} else {
logInfo("Registered executor: " + sender + " with ID " + executorId)
sender ! RegisteredExecutor(sparkProperties)
- context.watch(sender)
executorActor(executorId) = sender
executorHost(executorId) = Utils.parseHostPort(hostPort)._1
freeCores(executorId) = cores
@@ -118,9 +117,6 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
removeExecutor(executorId, reason)
sender ! true
- case Terminated(actor) =>
- actorToExecutorId.get(actor).foreach(removeExecutor(_, "Akka actor terminated"))
-
case DisassociatedEvent(_, address, _) =>
addressToExecutorId.get(address).foreach(removeExecutor(_, "remote Akka client disassociated"))
diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
index 90a5387b2b..23e9b735f3 100644
--- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
@@ -46,20 +46,15 @@ private[spark] object AkkaUtils {
val akkaHeartBeatPauses = System.getProperty("spark.akka.pauses", "60").toInt
val akkaFailureDetector = System.getProperty("spark.akka.failure-detector.threshold", "12.0").toDouble
- // Since we have our own Heart Beat mechanism and TCP already tracks connections.
- // Using this makes very little sense. So setting this to a relatively larger value suffices.
- val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "3").toInt
+ val akkaHeartBeatInterval = System.getProperty("spark.akka.heartbeat.interval", "5").toInt
val akkaConf = ConfigFactory.parseString(
s"""
|akka.daemonic = on
|akka.loggers = [""akka.event.slf4j.Slf4jLogger""]
|akka.stdout-loglevel = "ERROR"
- |akka.remote.watch-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
- |akka.remote.watch-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
- |akka.remote.watch-failure-detector.threshold = $akkaFailureDetector
- |akka.remote.transport-failure-detector.heartbeat-interval = 30 s
- |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = ${akkaHeartBeatPauses + 10} s
+ |akka.remote.transport-failure-detector.heartbeat-interval = $akkaHeartBeatInterval s
+ |akka.remote.transport-failure-detector.acceptable-heartbeat-pause = $akkaHeartBeatPauses s
|akka.remote.transport-failure-detector.threshold = $akkaFailureDetector
|akka.actor.provider = "akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport"
diff --git a/spark-class b/spark-class
index 78d6e073b1..713404d077 100755
--- a/spark-class
+++ b/spark-class
@@ -136,3 +136,4 @@ fi
exec "$RUNNER" -cp "$CLASSPATH" $JAVA_OPTS "$@"
+