From d5a96feccb15dd290b282af9e2f94479c8e4554e Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 17 Sep 2013 09:40:06 -0700 Subject: Standalone Scheduler fault recovery Implements a basic form of Standalone Scheduler fault recovery. In particular, this allows faults to be manually recovered from by means of restarting the Master process on the same machine. This is the majority of the code necessary for general fault tolerance, which will first elect a leader and then recover the Master state. In order to enable fault recovery, the Master will persist a small amount of state related to the registration of Workers and Applications to disk. If the Master is started and sees that this state is still around, it will enter Recovery mode, during which time it will not schedule any new Executors on Workers (but it does accept the registration of new Clients and Workers). At this point, the Master attempts to reconnect to all Workers and Client applications that were registered at the time of failure. After confirming either the existence or nonexistence of all such nodes (within a certain timeout), the Master will exit Recovery mode and resume normal scheduling. --- bin/stop-slaves.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'bin') diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index 03e416a132..abf1c7be65 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -17,7 +17,7 @@ # limitations under the License. # -# Starts the master on the machine this script is executed on. +# Starts workers on the machine this script is executed on. bin=`dirname "$0"` bin=`cd "$bin"; pwd` -- cgit v1.2.3 From 0f070279e7cd224de48333b572d3080b742a82d7 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sat, 5 Oct 2013 15:15:29 -0700 Subject: Address Matei's comments --- bin/stop-slaves.sh | 2 -- .../org/apache/spark/deploy/DeployMessage.scala | 2 +- .../spark/deploy/master/ApplicationInfo.scala | 6 ++--- .../spark/deploy/master/LeaderElectionAgent.scala | 4 ++-- .../org/apache/spark/deploy/master/Master.scala | 26 +++++++++++----------- .../apache/spark/deploy/master/MasterState.scala | 26 ---------------------- .../spark/deploy/master/PersistenceEngine.scala | 7 +++--- .../apache/spark/deploy/master/RecoveryState.scala | 26 ++++++++++++++++++++++ .../deploy/master/SparkZooKeeperSession.scala | 13 ++++++----- .../apache/spark/deploy/master/WorkerInfo.scala | 6 ++--- .../master/ZooKeeperLeaderElectionAgent.scala | 11 ++++----- .../apache/spark/deploy/JsonProtocolSuite.scala | 4 ++-- 12 files changed, 66 insertions(+), 67 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala (limited to 'bin') diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index abf1c7be65..fcb8555d4e 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -17,8 +17,6 @@ # limitations under the License. # -# Starts workers on the machine this script is executed on. - bin=`dirname "$0"` bin=`cd "$bin"; pwd` diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 979e65ac6c..275331724a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -21,7 +21,7 @@ import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 8291e29ec3..5150b7c7de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init + init() private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index fc8255fa6f..f25a1ad3bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -29,12 +29,12 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ -trait LeaderElectionAgent extends Actor { +private[spark] trait LeaderElectionAgent extends Actor { val masterActor: ActorRef } /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ -class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { +private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { override def preStart() { masterActor ! ElectedLeader } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 093ce09b1d..cd916672ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -79,7 +79,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val masterUrl = "spark://" + host + ":" + port var masterWebUiUrl: String = _ - var state = MasterState.STANDBY + var state = RecoveryState.STANDBY var persistenceEngine: PersistenceEngine = _ @@ -139,13 +139,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case ElectedLeader => { val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedWorkers.isEmpty) - MasterState.ALIVE + RecoveryState.ALIVE else - MasterState.RECOVERING + RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) - if (state == MasterState.RECOVERING) { + if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } @@ -159,7 +159,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") @@ -174,7 +174,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RegisterApplication(description) => { - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) @@ -262,21 +262,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // those we have an entry for in the corresponding actor hashmap actorToWorker.get(actor).foreach(removeWorker) actorToApp.get(actor).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientDisconnected(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientShutdown(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RequestMasterState => { @@ -324,15 +324,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { - if (state != MasterState.RECOVERING) { return } - state = MasterState.COMPLETING_RECOVERY + if (state != RecoveryState.RECOVERING) { return } + state = RecoveryState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) - state = MasterState.ALIVE + state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") } @@ -351,7 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act * every time a new app joins or resource availability changes. */ def schedule() { - if (state != MasterState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { return } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala deleted file mode 100644 index eec3df3b7a..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -private[spark] object MasterState - extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { - - type MasterState = Value - - val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 8c4878bd30..94b986caf2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -23,9 +23,10 @@ package org.apache.spark.deploy.master * - addApplication and addWorker are called before completing registration of a new app/worker. * - removeApplication and removeWorker are called at any time. * Given these two requirements, we will have all apps and workers persisted, but - * we might not have yet deleted apps or workers that finished. + * we might not have yet deleted apps or workers that finished (so their liveness must be verified + * during recovery). */ -trait PersistenceEngine { +private[spark] trait PersistenceEngine { def addApplication(app: ApplicationInfo) def removeApplication(app: ApplicationInfo) @@ -43,7 +44,7 @@ trait PersistenceEngine { def close() {} } -class BlackHolePersistenceEngine extends PersistenceEngine { +private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def addApplication(app: ApplicationInfo) {} override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala new file mode 100644 index 0000000000..b91be821f0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +private[spark] object RecoveryState + extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { + + type MasterState = Value + + val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index f43f9f6ed7..81e15c534f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many * times or a semantic exception is thrown (e.g.., "node already exists"). */ -class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { +private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE @@ -53,10 +53,13 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { /** Connect to ZooKeeper to start the session. Must be called before anything else. */ def connect() { connectToZooKeeper() - spawn(sessionMonitorThread) + + new Thread() { + override def run() = sessionMonitorThread() + }.start() } - def sessionMonitorThread = { + def sessionMonitorThread(): Unit = { while (!closed) { Thread.sleep(ZK_CHECK_PERIOD_MILLIS) if (zk.getState != ZooKeeper.States.CONNECTED) { @@ -170,7 +173,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { * * @param fn Block to execute, possibly multiple times. */ - def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = { try { fn } catch { @@ -179,7 +182,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { case e if n > 0 => logError("ZooKeeper exception, " + n + " more retries...", e) Thread.sleep(RETRY_WAIT_MILLIS) - retry(fn)(n-1) + retry(fn, n-1) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 26090c6a9c..e05f587b58 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -42,17 +42,17 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init + init() def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 065635af85..7809013e83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -17,17 +17,14 @@ package org.apache.spark.deploy.master -import scala.collection.JavaConversions._ - -import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} -import org.apache.spark.Logging +import akka.actor.ActorRef import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event.EventType -import akka.actor.{Cancellable, ActorRef} -import akka.util.duration._ +import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.spark.Logging -class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) +private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index c59e1f4de6..0b38e239f9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.ExecutorRunner class JsonProtocolSuite extends FunSuite { @@ -54,7 +54,7 @@ class JsonProtocolSuite extends FunSuite { val activeApps = Array[ApplicationInfo](createAppInfo()) val completedApps = Array[ApplicationInfo]() val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps, - MasterState.ALIVE) + RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) } -- cgit v1.2.3