diff options
author | Aaron Davidson <aaron@databricks.com> | 2013-09-19 14:40:14 -0700 |
---|---|---|
committer | Aaron Davidson <aaron@databricks.com> | 2013-09-26 15:04:23 -0700 |
commit | f549ea33d3d5a584f5d9965bb8e56462a1d6528e (patch) | |
tree | 22d9e70e68aef097a48e6a5efc5958a3acb20b1b | |
parent | d5a96feccb15dd290b282af9e2f94479c8e4554e (diff) | |
download | spark-f549ea33d3d5a584f5d9965bb8e56462a1d6528e.tar.gz spark-f549ea33d3d5a584f5d9965bb8e56462a1d6528e.tar.bz2 spark-f549ea33d3d5a584f5d9965bb8e56462a1d6528e.zip |
Standalone Scheduler fault tolerance using ZooKeeper
This patch implements full distributed fault tolerance for standalone scheduler Masters.
There is only one master Leader at a time, which is actively serving scheduling
requests. If this Leader crashes, another master will eventually be elected, reconstruct
the state from the first Master, and continue serving scheduling requests.
Leader election is performed using the ZooKeeper leader election pattern. We try to minimize
the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of
retries and session monitoring on top of the ZooKeeper client.
Master failover follows directly from the single-node Master recovery via the file
system (patch 194ba4b8), save that the Master state is stored in ZooKeeper instead.
Configuration:
By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE).
By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url
to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled.
By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory
to an appropriate directory accessible by the Master, we will keep the behavior of from 194ba4b8.
Additionally, places where a Master could be specificied by a spark:// url can now take
comma-delimited lists to specify backup masters. Note that this is only used for registration
of NEW Workers and application Clients. Once a Worker or Client has registered with the
Master Leader, it is "in the system" and will never need to register again.
Forthcoming:
Documentation, tests (! - only ad hoc testing has been performed so far)
I do not intend for this commit to be merged until tests are added, but this patch should
still be mostly reviewable until then.
25 files changed, 720 insertions, 170 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 912ce752fb..5318847276 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -169,7 +169,8 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = sparkUrl.split(",") + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) scheduler @@ -185,8 +186,8 @@ class SparkContext( val scheduler = new ClusterScheduler(this) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) - val sparkUrl = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 0d0745a480..31d1909279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -27,6 +27,7 @@ import org.apache.spark.util.Utils private[deploy] sealed trait DeployMessage extends Serializable +/** Contains messages sent between Scheduler actor nodes. */ private[deploy] object DeployMessages { // Worker to Master @@ -58,13 +59,14 @@ private[deploy] object DeployMessages { // Master to Worker - case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage + case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage case class RegisterWorkerFailed(message: String) extends DeployMessage - case class KillExecutor(appId: String, execId: Int) extends DeployMessage + case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage case class LaunchExecutor( + masterUrl: String, appId: String, execId: Int, appDesc: ApplicationDescription, @@ -82,7 +84,7 @@ private[deploy] object DeployMessages { // Master to Client - case class RegisteredApplication(appId: String) extends DeployMessage + case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage // TODO(matei): replace hostPort with host case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { @@ -131,16 +133,4 @@ private[deploy] object DeployMessages { assert (port > 0) } - // Actor System to Master - - case object CheckForWorkerTimeOut - - case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) - - case object EndRecoveryProcess - - case object RequestWebUIPort - - case class WebUIPortResponse(webUIBoundPort: Int) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala index 716ee483d5..2abf0b69dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala @@ -17,6 +17,11 @@ package org.apache.spark.deploy +/** + * Used to send state on-the-wire about Executors from Worker to Master. + * This state is sufficient for the Master to reconstruct its internal data structures during + * failover. + */ private[spark] class ExecutorDescription( val appId: String, val execId: Int, diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 10161c8204..308a2bfa22 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -39,22 +39,23 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I private val masterActorSystems = ArrayBuffer[ActorSystem]() private val workerActorSystems = ArrayBuffer[ActorSystem]() - def start(): String = { + def start(): Array[String] = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort + val masters = Array(masterUrl) /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masterUrl, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum)) workerActorSystems += workerSystem } - return masterUrl + return masters } def stop() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 28548a2ca9..aa2a10a8ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,6 +23,7 @@ import akka.actor._ import akka.actor.Terminated import akka.pattern.ask import akka.util.Duration +import akka.util.duration._ import akka.remote.RemoteClientDisconnected import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown @@ -40,27 +41,27 @@ import org.apache.spark.deploy.master.Master */ private[spark] class Client( actorSystem: ActorSystem, - masterUrl: String, + masterUrls: Array[String], appDescription: ApplicationDescription, listener: ClientListener) extends Logging { + val REGISTRATION_TIMEOUT = 60 * 1000 + var actor: ActorRef = null var appId: String = null + var registered = false + var activeMasterUrl: String = null class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times + var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { - logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address - master ! RegisterApplication(appDescription) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + connectToMaster() } catch { case e: Exception => logError("Failed to connect to master", e) @@ -69,9 +70,34 @@ private[spark] class Client( } } + def connectToMaster() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterApplication(appDescription) + } + + context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) { + if (!registered) { + logError("All masters are unresponsive! Giving up.") + markDead() + } + } + } + + def changeMaster(url: String) { + activeMasterUrl = url + master = context.actorFor(Master.toAkkaUrl(url)) + masterAddress = master.path.address + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } + override def receive = { - case RegisteredApplication(appId_) => + case RegisteredApplication(appId_, masterUrl) => appId = appId_ + registered = true + changeMaster(masterUrl) listener.connected(appId) case ApplicationRemoved(message) => @@ -92,13 +118,12 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } - case MasterChanged(materUrl, masterWebUiUrl) => + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address + changeMaster(masterUrl) + alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - context.watch(master) case Terminated(actor_) if actor_ == master => logError("Connection to master failed; waiting for master to reconnect...") @@ -113,7 +138,7 @@ private[spark] class Client( markDisconnected() case StopClient => - markDisconnected() + markDead() sender ! true context.stop(self) } @@ -127,6 +152,13 @@ private[spark] class Client( alreadyDisconnected = true } } + + def markDead() { + if (!alreadyDead) { + listener.dead() + alreadyDead = true + } + } } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala index 4605368c11..be7a11bd15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala @@ -27,8 +27,12 @@ package org.apache.spark.deploy.client private[spark] trait ClientListener { def connected(appId: String): Unit + /** Disconnection may be a temporary state, as we fail over to a new Master. */ def disconnected(): Unit + /** Dead means that we couldn't find any Masters to connect to, and have given up. */ + def dead(): Unit + def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index d5e9a0e095..5b62d3ba6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -33,6 +33,11 @@ private[spark] object TestClient { System.exit(0) } + def dead() { + logInfo("Could not connect to master") + System.exit(0) + } + def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {} def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {} @@ -44,7 +49,7 @@ private[spark] object TestClient { val desc = new ApplicationDescription( "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener - val client = new Client(actorSystem, url, desc, listener) + val client = new Client(actorSystem, Array(url), desc, listener) client.start() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index e437a0e7ae..8291e29ec3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -23,12 +23,12 @@ import akka.actor.ActorRef import scala.collection.mutable private[spark] class ApplicationInfo( - val startTime: Long, - val id: String, - val desc: ApplicationDescription, - val submitDate: Date, - val driver: ActorRef, - val appUiUrl: String) + val startTime: Long, + val id: String, + val desc: ApplicationDescription, + val submitDate: Date, + val driver: ActorRef, + val appUiUrl: String) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init() + init private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { + private def init = { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index d235234c13..76db61dd61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -28,7 +28,7 @@ private[spark] class ExecutorInfo( var state = ExecutorState.LAUNCHING - /** Copy all state variables from the given on-the-wire ExecutorDescription. */ + /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ def copyState(execDesc: ExecutorDescription) { state = execDesc.state } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 2fc13821bd..c0849ef324 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -32,8 +32,8 @@ import org.apache.spark.Logging * @param serialization Used to serialize our objects. */ private[spark] class FileSystemPersistenceEngine( - val dir: String, - val serialization: Serialization) + val dir: String, + val serialization: Serialization) extends PersistenceEngine with Logging { new File(dir).mkdir() @@ -57,11 +57,11 @@ private[spark] class FileSystemPersistenceEngine( } override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { - val sortedFiles = new File(dir).listFiles().sortBy(_.getName()) - val appFiles = sortedFiles.filter(_.getName().startsWith("app_")) - val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_)) - val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_")) - val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_)) + val sortedFiles = new File(dir).listFiles().sortBy(_.getName) + val appFiles = sortedFiles.filter(_.getName.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) (apps, workers) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala new file mode 100644 index 0000000000..c44a23f8c6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -0,0 +1,28 @@ +package org.apache.spark.deploy.master + +import akka.actor.{Actor, ActorRef} + +import org.apache.spark.deploy.master.MasterMessages.ElectedLeader + +/** + * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it + * is the only Master serving requests. + * In addition to the API provided, the LeaderElectionAgent will use of the following messages + * to inform the Master of leader changes: + * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] + * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] + */ +trait LeaderElectionAgent extends Actor { + val masterActor: ActorRef +} + +/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ +class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { + override def preStart() { + masterActor ! ElectedLeader + } + + override def receive = { + case _ => + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6e039eed4..e13a8cba4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -34,18 +34,18 @@ import akka.util.{Duration, Timeout} import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} - private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "") + val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE") var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] @@ -76,75 +76,115 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (envVar != null) envVar else host } - var state: MasterState = _ + val masterUrl = "spark://" + host + ":" + port + var masterWebUiUrl: String = _ + + var state = MasterState.STANDBY var persistenceEngine: PersistenceEngine = _ + var leaderElectionAgent: ActorRef = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean override def preStart() { - logInfo("Starting Spark master at spark://" + host + ":" + port) + logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() - persistenceEngine = - if (RECOVERY_DIR.isEmpty()) { - new BlackHolePersistenceEngine() - } else { + persistenceEngine = RECOVERY_MODE match { + case "ZOOKEEPER" => + logInfo("Persisting recovery state to ZooKeeper") + new ZooKeeperPersistenceEngine(SerializationExtension(context.system)) + case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) - } + case _ => + new BlackHolePersistenceEngine() + } - val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() - state = - if (storedApps.isEmpty && storedWorkers.isEmpty) { - MasterState.ALIVE - } else { - self ! BeginRecovery(storedApps, storedWorkers) - MasterState.RECOVERING - } + leaderElectionAgent = context.actorOf(Props( + RECOVERY_MODE match { + case "ZOOKEEPER" => + new ZooKeeperLeaderElectionAgent(self, masterUrl) + case _ => + new MonarchyLeaderAgent(self) + })) + } + + override def preRestart(reason: Throwable, message: Option[Any]) { + logError("Master actor restarted due to exception", reason) } override def postStop() { webUi.stop() masterMetricsSystem.stop() applicationMetricsSystem.stop() + persistenceEngine.close() + context.stop(leaderElectionAgent) } override def receive = { + case ElectedLeader => { + val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedWorkers.isEmpty) + MasterState.ALIVE + else + MasterState.RECOVERING + + logInfo("I have been elected leader! New state: " + state) + + if (state == MasterState.RECOVERING) { + beginRecovery(storedApps, storedWorkers) + context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + } + } + + case RevokedLeadership => { + logError("Leadership has been revoked -- master shutting down.") + System.exit(0) + } + case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (idToWorker.contains(id)) { + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) registerWorker(worker) context.watch(sender) // This doesn't work with remote actors but helps for testing persistenceEngine.addWorker(worker) - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) + sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RegisterApplication(description) => { - logInfo("Registering app " + description.name) - val app = createApplication(description, sender) - registerApplication(app) - logInfo("Registered app " + description.name + " with ID " + app.id) - context.watch(sender) // This doesn't work with remote actors but helps for testing - persistenceEngine.addApplication(app) - sender ! RegisteredApplication(app.id) - schedule() + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else { + logInfo("Registering app " + description.name) + val app = createApplication(description, sender) + registerApplication(app) + logInfo("Registered app " + description.name + " with ID " + app.id) + context.watch(sender) // This doesn't work with remote actors but helps for testing + persistenceEngine.addApplication(app) + sender ! RegisteredApplication(app.id, masterUrl) + schedule() + } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { @@ -184,27 +224,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - case BeginRecovery(storedApps, storedWorkers) => { - context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess) - - val masterUrl = "spark://" + host + ":" + port - val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get - for (app <- storedApps) { - registerApplication(app) - app.state = ApplicationState.UNKNOWN - app.driver ! MasterChanged(masterUrl, masterWebUiUrl) - } - for (worker <- storedWorkers) { - registerWorker(worker) - worker.state = WorkerState.UNKNOWN - worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) - } - } - case MasterChangeAcknowledged(appId) => { - val appOption = idToApp.get(appId) - appOption match { + idToApp.get(appId) match { case Some(app) => + logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) @@ -216,9 +239,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case WorkerSchedulerStateResponse(workerId, executors) => { idToWorker.get(workerId) match { case Some(worker) => + logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE - val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None) + val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) @@ -232,10 +256,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case EndRecoveryProcess => { - completeRecovery() - } - case Terminated(actor) => { // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap @@ -275,15 +295,29 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.count(_.state == WorkerState.UNKNOWN) == 0 && apps.count(_.state == ApplicationState.UNKNOWN) == 0 + def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + for (app <- storedApps) { + registerApplication(app) + app.state = ApplicationState.UNKNOWN + app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + } + for (worker <- storedWorkers) { + registerWorker(worker) + worker.state = WorkerState.UNKNOWN + worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + } + } + def completeRecovery() { + // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { if (state != MasterState.RECOVERING) { return } state = MasterState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. - workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_)) - apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_)) + workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) state = MasterState.ALIVE schedule() @@ -352,7 +386,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor( + worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) @@ -415,6 +449,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (firstApp == None) { firstApp = Some(app) } + // TODO: What is firstApp?? Can we remove it? val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) { logWarning("Could not find any workers with enough memory for " + firstApp.get.id) @@ -444,7 +479,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act waitingApps -= app for (exec <- app.executors.values) { exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.application.id, exec.id) + exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id) exec.state = ExecutorState.KILLED } app.markFinished(state) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala new file mode 100644 index 0000000000..6e31d40b43 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -0,0 +1,29 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.util.Utils + +sealed trait MasterMessages extends Serializable + +/** Contains messages seen only by the Master and its associated entities. */ +private[master] object MasterMessages { + + // LeaderElectionAgent to Master + + case object ElectedLeader + + case object RevokedLeadership + + // Actor System to LeaderElectionAgent + + case object CheckLeader + + // Actor System to Master + + case object CheckForWorkerTimeOut + + case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) + + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala index 9ea5e9752e..eec3df3b7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala @@ -18,9 +18,9 @@ package org.apache.spark.deploy.master private[spark] object MasterState - extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { + extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { type MasterState = Value - val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value + val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 07d23c6bf3..8c4878bd30 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -36,9 +36,11 @@ trait PersistenceEngine { /** * Returns the persisted data sorted by their respective ids (which implies that they're - * sorted by time order of creation). + * sorted by time of creation). */ def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) + + def close() {} } class BlackHolePersistenceEngine extends PersistenceEngine { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala new file mode 100644 index 0000000000..5492a3a988 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -0,0 +1,183 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ +import scala.concurrent.ops._ + +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.Watcher.Event.KeeperState + +/** + * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry + * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be + * created. If ZooKeeper remains down after several retries, the given + * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be + * informed via zkDown(). + * + * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many + * times or a semantic exception is thrown (e.g.., "node already exists"). + */ +class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { + val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") + + val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE + val ZK_TIMEOUT_MILLIS = 30000 + val RETRY_WAIT_MILLIS = 5000 + val ZK_CHECK_PERIOD_MILLIS = 10000 + val MAX_RECONNECT_ATTEMPTS = 3 + + private var zk: ZooKeeper = _ + + private val watcher = new ZooKeeperWatcher() + private var reconnectAttempts = 0 + private var closed = false + + /** Connect to ZooKeeper to start the session. Must be called before anything else. */ + def connect() { + connectToZooKeeper() + spawn(sessionMonitorThread) + } + + def sessionMonitorThread = { + while (!closed) { + Thread.sleep(ZK_CHECK_PERIOD_MILLIS) + if (zk.getState != ZooKeeper.States.CONNECTED) { + reconnectAttempts += 1 + val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts + if (attemptsLeft <= 0) { + logError("Could not connect to ZooKeeper: system failure") + zkWatcher.zkDown() + close() + } else { + logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...") + connectToZooKeeper() + } + } + } + } + + def close() { + if (!closed && zk != null) { zk.close() } + closed = true + } + + private def connectToZooKeeper() { + if (zk != null) zk.close() + zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher) + } + + /** + * Attempts to maintain a live ZooKeeper exception despite (very) transient failures. + * Mainly useful for handling the natural ZooKeeper session expiration. + */ + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (closed) { return } + + event.getState match { + case KeeperState.SyncConnected => + reconnectAttempts = 0 + zkWatcher.zkSessionCreated() + case KeeperState.Expired => + connectToZooKeeper() + case KeeperState.Disconnected => + logWarning("ZooKeeper disconnected, will retry...") + } + } + } + + def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = { + retry { + zk.create(path, bytes, ZK_ACL, createMode) + } + } + + def exists(path: String, watcher: Watcher = null): Stat = { + retry { + zk.exists(path, watcher) + } + } + + def getChildren(path: String, watcher: Watcher = null): List[String] = { + retry { + zk.getChildren(path, watcher).toList + } + } + + def getData(path: String): Array[Byte] = { + retry { + zk.getData(path, false, null) + } + } + + def delete(path: String, version: Int = -1): Unit = { + retry { + zk.delete(path, version) + } + } + + /** + * Creates the given directory (non-recursively) if it doesn't exist. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdir(path: String) { + if (exists(path) == null) { + try { + create(path, "".getBytes, CreateMode.PERSISTENT) + } catch { + case e: Exception => + // If the exception caused the directory not to be created, bubble it up, + // otherwise ignore it. + if (exists(path) == null) { throw e } + } + } + } + + /** + * Recursively creates all directories up to the given one. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdirRecursive(path: String) { + var fullDir = "" + for (dentry <- path.split("/").tail) { + fullDir += "/" + dentry + mkdir(fullDir) + } + } + + /** + * Retries the given function up to 3 times. The assumption is that failure is transient, + * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist), + * in which case the exception will be thrown without retries. + * + * @param fn Block to execute, possibly multiple times. + */ + def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + try { + fn + } catch { + case e: KeeperException.NoNodeException => throw e + case e: KeeperException.NodeExistsException => throw e + case e if n > 0 => + logError("ZooKeeper exception, " + n + " more retries...", e) + Thread.sleep(RETRY_WAIT_MILLIS) + retry(fn)(n-1) + } + } +} + +trait SparkZooKeeperWatcher { + /** + * Called whenever a ZK session is created -- + * this will occur when we create our first session as well as each time + * the session expires or errors out. + */ + def zkSessionCreated() + + /** + * Called if ZK appears to be completely down (i.e., not just a transient error). + * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead. + */ + def zkDown() +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 2ab7bb233c..26090c6a9c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -22,14 +22,14 @@ import scala.collection.mutable import org.apache.spark.util.Utils private[spark] class WorkerInfo( - val id: String, - val host: String, - val port: Int, - val cores: Int, - val memory: Int, - val actor: ActorRef, - val webUiPort: Int, - val publicAddress: String) + val id: String, + val host: String, + val port: Int, + val cores: Int, + val memory: Int, + val actor: ActorRef, + val webUiPort: Int, + val publicAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -42,18 +42,18 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init() + init def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { - executors = new mutable.HashMap[String, ExecutorInfo] + private def init = { + executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala new file mode 100644 index 0000000000..4ca59e5b24 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -0,0 +1,109 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ + +import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.Watcher.Event.EventType + +import akka.actor.{Cancellable, ActorRef} +import akka.util.duration._ + +class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) + extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" + + private val watcher = new ZooKeeperWatcher() + private val zk = new SparkZooKeeperSession(this) + private var status = LeadershipStatus.NOT_LEADER + private var myLeaderFile: String = _ + private var leaderUrl: String = _ + + override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") + zk.connect() + } + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + myLeaderFile = + zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) + self ! CheckLeader + } + + override def zkDown() { + logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") + System.exit(1) + } + + override def postStop() { + zk.close() + } + + override def receive = { + case CheckLeader => checkLeader() + } + + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (event.getType == EventType.NodeDeleted) { + logInfo("Leader file disappeared, a master is down!") + self ! CheckLeader + } + } + } + + /** Uses ZK leader election. Navigates several ZK potholes along the way. */ + def checkLeader() { + val masters = zk.getChildren(WORKING_DIR).toList + val leader = masters.sorted.get(0) + val leaderFile = WORKING_DIR + "/" + leader + + // Setup a watch for the current leader. + zk.exists(leaderFile, watcher) + + try { + leaderUrl = new String(zk.getData(leaderFile)) + } catch { + // A NoNodeException may be thrown if old leader died since the start of this method call. + // This is fine -- just check again, since we're guaranteed to see the new values. + case e: KeeperException.NoNodeException => + logInfo("Leader disappeared while reading it -- finding next leader") + checkLeader() + return + } + + val isLeader = myLeaderFile == leaderFile + if (!isLeader && leaderUrl == masterUrl) { + // We found a different master file pointing to this process. + // This can happen in the following two cases: + // (1) The master process was restarted on the same node. + // (2) The ZK server died between creating the node and returning the name of the node. + // For this case, we will end up creating a second file, and MUST explicitly delete the + // first one, since our ZK session is still open. + // Note that this deletion will cause a NodeDeleted event to be fired so we check again for + // leader changes. + logWarning("Cleaning up old ZK master election file that points to this master.") + zk.delete(leaderFile) + } else { + updateLeadershipStatus(isLeader) + } + } + + def updateLeadershipStatus(isLeader: Boolean) { + if (isLeader && status == LeadershipStatus.NOT_LEADER) { + status = LeadershipStatus.LEADER + masterActor ! ElectedLeader + } else if (!isLeader && status == LeadershipStatus.LEADER) { + status = LeadershipStatus.NOT_LEADER + masterActor ! RevokedLeadership + } + } + + private object LeadershipStatus extends Enumeration { + type LeadershipStatus = Value + val LEADER, NOT_LEADER = Value + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala new file mode 100644 index 0000000000..f45b62cbdd --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -0,0 +1,64 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.Logging +import org.apache.zookeeper._ + +import akka.serialization.Serialization + +class ZooKeeperPersistenceEngine(serialization: Serialization) extends PersistenceEngine with SparkZooKeeperWatcher with Logging { + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + + val zk = new SparkZooKeeperSession(this) + + zk.connect() + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + } + + override def zkDown() { + logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.") + } + + override def addApplication(app: ApplicationInfo) { + serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) + } + + override def removeApplication(app: ApplicationInfo) { + zk.delete(WORKING_DIR + "/app_" + app.id) + } + + override def addWorker(worker: WorkerInfo) { + serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker) + } + + override def removeWorker(worker: WorkerInfo) { + zk.delete(WORKING_DIR + "/worker_" + worker.id) + } + + override def close() { + zk.close() + } + + override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted + val appFiles = sortedFiles.filter(_.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) + (apps, workers) + } + + private def serializeIntoFile(path: String, value: Serializable) { + val serializer = serialization.findSerializerFor(value) + val serialized = serializer.toBinary(value) + zk.create(path, serialized, CreateMode.PERSISTENT) + } + + def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = { + val fileData = zk.getData("/spark/master_status/" + filename) + val clazz = m.erasure.asInstanceOf[Class[T]] + val serializer = serialization.serializerFor(clazz) + serializer.fromBinary(fileData).asInstanceOf[T] + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 46455aa5ae..73fb0c8bd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -23,11 +23,11 @@ import java.io.File import scala.collection.mutable.HashMap -import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.actor._ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.util.duration._ -import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -35,14 +35,13 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} - private[spark] class Worker( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - var masterUrl: String, + masterUrls: Array[String], workDirPath: String = null) extends Actor with Logging { @@ -54,8 +53,16 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4 + val REGISTRATION_TIMEOUT = 20.seconds + val REGISTRATION_RETRIES = 3 + + // Index into masterUrls that we're currently trying to register with. + var masterIndex = 0 + var master: ActorRef = null - var masterWebUiUrl : String = "" + var activeMasterUrl: String = "" + var activeMasterWebUiUrl : String = "" + var registered = false val workerId = generateWorkerId() var sparkHome: File = null var workDir: File = null @@ -103,35 +110,62 @@ private[spark] class Worker( webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() - connectToMaster() + registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() } - def connectToMaster() { - logInfo("Connecting to master " + masterUrl) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) + def changeMaster(url: String, uiUrl: String) { + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } + def tryRegisterAllMasters() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, + publicAddress) + } + } + + def registerWithMaster() { + tryRegisterAllMasters() + + var retries = 0 + lazy val retryTimer: Cancellable = + context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + retries += 1 + if (registered) { + retryTimer.cancel() + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } + } + retryTimer // start timer + } + override def receive = { - case RegisteredWorker(url) => - masterWebUiUrl = url - logInfo("Successfully registered with master") + case RegisteredWorker(masterUrl, masterWebUiUrl) => + logInfo("Successfully registered with master " + masterUrl) + registered = true + changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } - case MasterChanged(url, uiUrl) => - logInfo("Master has changed, new master is at " + url) - masterUrl = url - masterWebUiUrl = uiUrl + case MasterChanged(masterUrl, masterWebUiUrl) => + logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - context.watch(master) // Doesn't work with remote actors, but useful for testing + changeMaster(masterUrl, masterWebUiUrl) + val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) sender ! WorkerSchedulerStateResponse(workerId, execs.toList) @@ -140,15 +174,19 @@ private[spark] class Worker( logError("Worker registration failed: " + message) System.exit(1) - case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) => - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) @@ -164,14 +202,18 @@ private[spark] class Worker( memoryUsed -= executor.memory } - case KillExecutor(appId, execId) => - val fullId = appId + "/" + execId - executors.get(fullId) match { - case Some(executor) => - logInfo("Asked to kill executor " + fullId) - executor.kill() - case None => - logInfo("Asked to kill unknown executor " + fullId) + case KillExecutor(masterUrl, appId, execId) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) + } else { + val fullId = appId + "/" + execId + executors.get(fullId) match { + case Some(executor) => + logInfo("Asked to kill executor " + fullId) + executor.kill() + case None => + logInfo("Asked to kill unknown executor " + fullId) + } } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => @@ -179,8 +221,8 @@ private[spark] class Worker( case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, masterUrl, cores, memory, - coresUsed, memoryUsed, masterWebUiUrl) + finishedExecutors.values.toList, activeMasterUrl, cores, memory, + coresUsed, memoryUsed, activeMasterWebUiUrl) } } @@ -203,17 +245,18 @@ private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.master, args.workDir) + args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) + : (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, - masterUrl, workDir)), name = "Worker") + masterUrls, workDir)), name = "Worker") (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 0ae89a864f..16d8686490 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) { var webUiPort = 8081 var cores = inferDefaultCores() var memory = inferDefaultMemory() - var master: String = null + var masters: Array[String] = null var workDir: String = null // Check for settings in environment variables @@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) { printUsageAndExit(0) case value :: tail => - if (master != null) { // Two positional arguments were given + if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - master = value + masters = value.split(",") parse(tail) case Nil => - if (master == null) { // No positional argument was given + if (masters == null) { // No positional argument was given printUsageAndExit(1) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 95d6007f3b..800f1cafcc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node> - val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p> + val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p> val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span> diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c173cdf449..8a3017e964 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, - master: String, + masters: Array[String], appName: String) extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener @@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend( val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) - client = new Client(sc.env.actorSystem, master, appDesc, this) + client = new Client(sc.env.actorSystem, masters, appDesc, this) client.start() } @@ -75,6 +75,13 @@ private[spark] class SparkDeploySchedulerBackend( } } + override def dead() { + if (!stopping) { + logError("Spark cluster looks dead, giving up.") + scheduler.error("Spark cluster looks down") + } + } + override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) @@ -345,6 +345,17 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99cdadb9e7..156f501a04 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -211,6 +211,7 @@ object SparkBuild extends Build { "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), + "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", |