diff options
25 files changed, 720 insertions, 170 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 912ce752fb..5318847276 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -169,7 +169,8 @@ class SparkContext( case SPARK_REGEX(sparkUrl) => val scheduler = new ClusterScheduler(this) - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = sparkUrl.split(",") + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) scheduler @@ -185,8 +186,8 @@ class SparkContext( val scheduler = new ClusterScheduler(this) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt) - val sparkUrl = localCluster.start() - val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName) + val masterUrls = localCluster.start() + val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName) scheduler.initialize(backend) backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => { localCluster.stop() diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 0d0745a480..31d1909279 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -27,6 +27,7 @@ import org.apache.spark.util.Utils private[deploy] sealed trait DeployMessage extends Serializable +/** Contains messages sent between Scheduler actor nodes. */ private[deploy] object DeployMessages { // Worker to Master @@ -58,13 +59,14 @@ private[deploy] object DeployMessages { // Master to Worker - case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage + case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage case class RegisterWorkerFailed(message: String) extends DeployMessage - case class KillExecutor(appId: String, execId: Int) extends DeployMessage + case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage case class LaunchExecutor( + masterUrl: String, appId: String, execId: Int, appDesc: ApplicationDescription, @@ -82,7 +84,7 @@ private[deploy] object DeployMessages { // Master to Client - case class RegisteredApplication(appId: String) extends DeployMessage + case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage // TODO(matei): replace hostPort with host case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) { @@ -131,16 +133,4 @@ private[deploy] object DeployMessages { assert (port > 0) } - // Actor System to Master - - case object CheckForWorkerTimeOut - - case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) - - case object EndRecoveryProcess - - case object RequestWebUIPort - - case class WebUIPortResponse(webUIBoundPort: Int) - } diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala index 716ee483d5..2abf0b69dd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala @@ -17,6 +17,11 @@ package org.apache.spark.deploy +/** + * Used to send state on-the-wire about Executors from Worker to Master. + * This state is sufficient for the Master to reconstruct its internal data structures during + * failover. + */ private[spark] class ExecutorDescription( val appId: String, val execId: Int, diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala index 10161c8204..308a2bfa22 100644 --- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala @@ -39,22 +39,23 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I private val masterActorSystems = ArrayBuffer[ActorSystem]() private val workerActorSystems = ArrayBuffer[ActorSystem]() - def start(): String = { + def start(): Array[String] = { logInfo("Starting a local Spark cluster with " + numWorkers + " workers.") /* Start the Master */ val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0) masterActorSystems += masterSystem val masterUrl = "spark://" + localHostname + ":" + masterPort + val masters = Array(masterUrl) /* Start the Workers */ for (workerNum <- 1 to numWorkers) { val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker, - memoryPerWorker, masterUrl, null, Some(workerNum)) + memoryPerWorker, masters, null, Some(workerNum)) workerActorSystems += workerSystem } - return masterUrl + return masters } def stop() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 28548a2ca9..aa2a10a8ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -23,6 +23,7 @@ import akka.actor._ import akka.actor.Terminated import akka.pattern.ask import akka.util.Duration +import akka.util.duration._ import akka.remote.RemoteClientDisconnected import akka.remote.RemoteClientLifeCycleEvent import akka.remote.RemoteClientShutdown @@ -40,27 +41,27 @@ import org.apache.spark.deploy.master.Master */ private[spark] class Client( actorSystem: ActorSystem, - masterUrl: String, + masterUrls: Array[String], appDescription: ApplicationDescription, listener: ClientListener) extends Logging { + val REGISTRATION_TIMEOUT = 60 * 1000 + var actor: ActorRef = null var appId: String = null + var registered = false + var activeMasterUrl: String = null class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times + var alreadyDead = false // To avoid calling listener.dead() multiple times override def preStart() { - logInfo("Connecting to master " + masterUrl) try { - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address - master ! RegisterApplication(appDescription) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + connectToMaster() } catch { case e: Exception => logError("Failed to connect to master", e) @@ -69,9 +70,34 @@ private[spark] class Client( } } + def connectToMaster() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterApplication(appDescription) + } + + context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) { + if (!registered) { + logError("All masters are unresponsive! Giving up.") + markDead() + } + } + } + + def changeMaster(url: String) { + activeMasterUrl = url + master = context.actorFor(Master.toAkkaUrl(url)) + masterAddress = master.path.address + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + } + override def receive = { - case RegisteredApplication(appId_) => + case RegisteredApplication(appId_, masterUrl) => appId = appId_ + registered = true + changeMaster(masterUrl) listener.connected(appId) case ApplicationRemoved(message) => @@ -92,13 +118,12 @@ private[spark] class Client( listener.executorRemoved(fullId, message.getOrElse(""), exitStatus) } - case MasterChanged(materUrl, masterWebUiUrl) => + case MasterChanged(masterUrl, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - masterAddress = master.path.address + changeMaster(masterUrl) + alreadyDisconnected = false sender ! MasterChangeAcknowledged(appId) - context.watch(master) case Terminated(actor_) if actor_ == master => logError("Connection to master failed; waiting for master to reconnect...") @@ -113,7 +138,7 @@ private[spark] class Client( markDisconnected() case StopClient => - markDisconnected() + markDead() sender ! true context.stop(self) } @@ -127,6 +152,13 @@ private[spark] class Client( alreadyDisconnected = true } } + + def markDead() { + if (!alreadyDead) { + listener.dead() + alreadyDead = true + } + } } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala index 4605368c11..be7a11bd15 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala @@ -27,8 +27,12 @@ package org.apache.spark.deploy.client private[spark] trait ClientListener { def connected(appId: String): Unit + /** Disconnection may be a temporary state, as we fail over to a new Master. */ def disconnected(): Unit + /** Dead means that we couldn't find any Masters to connect to, and have given up. */ + def dead(): Unit + def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala index d5e9a0e095..5b62d3ba6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala @@ -33,6 +33,11 @@ private[spark] object TestClient { System.exit(0) } + def dead() { + logInfo("Could not connect to master") + System.exit(0) + } + def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {} def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {} @@ -44,7 +49,7 @@ private[spark] object TestClient { val desc = new ApplicationDescription( "TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored") val listener = new TestListener - val client = new Client(actorSystem, url, desc, listener) + val client = new Client(actorSystem, Array(url), desc, listener) client.start() actorSystem.awaitTermination() } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index e437a0e7ae..8291e29ec3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -23,12 +23,12 @@ import akka.actor.ActorRef import scala.collection.mutable private[spark] class ApplicationInfo( - val startTime: Long, - val id: String, - val desc: ApplicationDescription, - val submitDate: Date, - val driver: ActorRef, - val appUiUrl: String) + val startTime: Long, + val id: String, + val desc: ApplicationDescription, + val submitDate: Date, + val driver: ActorRef, + val appUiUrl: String) extends Serializable { @transient var state: ApplicationState.Value = _ @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init() + init private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { + private def init = { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala index d235234c13..76db61dd61 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala @@ -28,7 +28,7 @@ private[spark] class ExecutorInfo( var state = ExecutorState.LAUNCHING - /** Copy all state variables from the given on-the-wire ExecutorDescription. */ + /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */ def copyState(execDesc: ExecutorDescription) { state = execDesc.state } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala index 2fc13821bd..c0849ef324 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala @@ -32,8 +32,8 @@ import org.apache.spark.Logging * @param serialization Used to serialize our objects. */ private[spark] class FileSystemPersistenceEngine( - val dir: String, - val serialization: Serialization) + val dir: String, + val serialization: Serialization) extends PersistenceEngine with Logging { new File(dir).mkdir() @@ -57,11 +57,11 @@ private[spark] class FileSystemPersistenceEngine( } override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { - val sortedFiles = new File(dir).listFiles().sortBy(_.getName()) - val appFiles = sortedFiles.filter(_.getName().startsWith("app_")) - val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_)) - val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_")) - val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_)) + val sortedFiles = new File(dir).listFiles().sortBy(_.getName) + val appFiles = sortedFiles.filter(_.getName.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) (apps, workers) } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala new file mode 100644 index 0000000000..c44a23f8c6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -0,0 +1,28 @@ +package org.apache.spark.deploy.master + +import akka.actor.{Actor, ActorRef} + +import org.apache.spark.deploy.master.MasterMessages.ElectedLeader + +/** + * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it + * is the only Master serving requests. + * In addition to the API provided, the LeaderElectionAgent will use of the following messages + * to inform the Master of leader changes: + * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] + * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] + */ +trait LeaderElectionAgent extends Actor { + val masterActor: ActorRef +} + +/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ +class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { + override def preStart() { + masterActor ! ElectedLeader + } + + override def receive = { + case _ => + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index c6e039eed4..e13a8cba4a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -34,18 +34,18 @@ import akka.util.{Duration, Timeout} import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{AkkaUtils, Utils} - private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "") + val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE") var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] @@ -76,75 +76,115 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (envVar != null) envVar else host } - var state: MasterState = _ + val masterUrl = "spark://" + host + ":" + port + var masterWebUiUrl: String = _ + + var state = MasterState.STANDBY var persistenceEngine: PersistenceEngine = _ + var leaderElectionAgent: ActorRef = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean override def preStart() { - logInfo("Starting Spark master at spark://" + host + ":" + port) + logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) webUi.start() + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() - persistenceEngine = - if (RECOVERY_DIR.isEmpty()) { - new BlackHolePersistenceEngine() - } else { + persistenceEngine = RECOVERY_MODE match { + case "ZOOKEEPER" => + logInfo("Persisting recovery state to ZooKeeper") + new ZooKeeperPersistenceEngine(SerializationExtension(context.system)) + case "FILESYSTEM" => logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) - } + case _ => + new BlackHolePersistenceEngine() + } - val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() - state = - if (storedApps.isEmpty && storedWorkers.isEmpty) { - MasterState.ALIVE - } else { - self ! BeginRecovery(storedApps, storedWorkers) - MasterState.RECOVERING - } + leaderElectionAgent = context.actorOf(Props( + RECOVERY_MODE match { + case "ZOOKEEPER" => + new ZooKeeperLeaderElectionAgent(self, masterUrl) + case _ => + new MonarchyLeaderAgent(self) + })) + } + + override def preRestart(reason: Throwable, message: Option[Any]) { + logError("Master actor restarted due to exception", reason) } override def postStop() { webUi.stop() masterMetricsSystem.stop() applicationMetricsSystem.stop() + persistenceEngine.close() + context.stop(leaderElectionAgent) } override def receive = { + case ElectedLeader => { + val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedWorkers.isEmpty) + MasterState.ALIVE + else + MasterState.RECOVERING + + logInfo("I have been elected leader! New state: " + state) + + if (state == MasterState.RECOVERING) { + beginRecovery(storedApps, storedWorkers) + context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + } + } + + case RevokedLeadership => { + logError("Leadership has been revoked -- master shutting down.") + System.exit(0) + } + case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (idToWorker.contains(id)) { + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) registerWorker(worker) context.watch(sender) // This doesn't work with remote actors but helps for testing persistenceEngine.addWorker(worker) - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) + sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RegisterApplication(description) => { - logInfo("Registering app " + description.name) - val app = createApplication(description, sender) - registerApplication(app) - logInfo("Registered app " + description.name + " with ID " + app.id) - context.watch(sender) // This doesn't work with remote actors but helps for testing - persistenceEngine.addApplication(app) - sender ! RegisteredApplication(app.id) - schedule() + if (state == MasterState.STANDBY) { + // ignore, don't send response + } else { + logInfo("Registering app " + description.name) + val app = createApplication(description, sender) + registerApplication(app) + logInfo("Registered app " + description.name + " with ID " + app.id) + context.watch(sender) // This doesn't work with remote actors but helps for testing + persistenceEngine.addApplication(app) + sender ! RegisteredApplication(app.id, masterUrl) + schedule() + } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { @@ -184,27 +224,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } - case BeginRecovery(storedApps, storedWorkers) => { - context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess) - - val masterUrl = "spark://" + host + ":" + port - val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get - for (app <- storedApps) { - registerApplication(app) - app.state = ApplicationState.UNKNOWN - app.driver ! MasterChanged(masterUrl, masterWebUiUrl) - } - for (worker <- storedWorkers) { - registerWorker(worker) - worker.state = WorkerState.UNKNOWN - worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) - } - } - case MasterChangeAcknowledged(appId) => { - val appOption = idToApp.get(appId) - appOption match { + idToApp.get(appId) match { case Some(app) => + logInfo("Application has been re-registered: " + appId) app.state = ApplicationState.WAITING case None => logWarning("Master change ack from unknown app: " + appId) @@ -216,9 +239,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case WorkerSchedulerStateResponse(workerId, executors) => { idToWorker.get(workerId) match { case Some(worker) => + logInfo("Worker has been re-registered: " + workerId) worker.state = WorkerState.ALIVE - val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None) + val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) for (exec <- validExecutors) { val app = idToApp.get(exec.appId).get val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) @@ -232,10 +256,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (canCompleteRecovery) { completeRecovery() } } - case EndRecoveryProcess => { - completeRecovery() - } - case Terminated(actor) => { // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap @@ -275,15 +295,29 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers.count(_.state == WorkerState.UNKNOWN) == 0 && apps.count(_.state == ApplicationState.UNKNOWN) == 0 + def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + for (app <- storedApps) { + registerApplication(app) + app.state = ApplicationState.UNKNOWN + app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + } + for (worker <- storedWorkers) { + registerWorker(worker) + worker.state = WorkerState.UNKNOWN + worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + } + } + def completeRecovery() { + // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { if (state != MasterState.RECOVERING) { return } state = MasterState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. - workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_)) - apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_)) + workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) state = MasterState.ALIVE schedule() @@ -352,7 +386,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor( + worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) @@ -415,6 +449,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (firstApp == None) { firstApp = Some(app) } + // TODO: What is firstApp?? Can we remove it? val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) { logWarning("Could not find any workers with enough memory for " + firstApp.get.id) @@ -444,7 +479,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act waitingApps -= app for (exec <- app.executors.values) { exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.application.id, exec.id) + exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id) exec.state = ExecutorState.KILLED } app.markFinished(state) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala new file mode 100644 index 0000000000..6e31d40b43 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -0,0 +1,29 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.util.Utils + +sealed trait MasterMessages extends Serializable + +/** Contains messages seen only by the Master and its associated entities. */ +private[master] object MasterMessages { + + // LeaderElectionAgent to Master + + case object ElectedLeader + + case object RevokedLeadership + + // Actor System to LeaderElectionAgent + + case object CheckLeader + + // Actor System to Master + + case object CheckForWorkerTimeOut + + case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) + + case object RequestWebUIPort + + case class WebUIPortResponse(webUIBoundPort: Int) +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala index 9ea5e9752e..eec3df3b7a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala @@ -18,9 +18,9 @@ package org.apache.spark.deploy.master private[spark] object MasterState - extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { + extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { type MasterState = Value - val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value + val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 07d23c6bf3..8c4878bd30 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -36,9 +36,11 @@ trait PersistenceEngine { /** * Returns the persisted data sorted by their respective ids (which implies that they're - * sorted by time order of creation). + * sorted by time of creation). */ def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) + + def close() {} } class BlackHolePersistenceEngine extends PersistenceEngine { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala new file mode 100644 index 0000000000..5492a3a988 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -0,0 +1,183 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ +import scala.concurrent.ops._ + +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.data.Stat +import org.apache.zookeeper.Watcher.Event.KeeperState + +/** + * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry + * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be + * created. If ZooKeeper remains down after several retries, the given + * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be + * informed via zkDown(). + * + * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many + * times or a semantic exception is thrown (e.g.., "node already exists"). + */ +class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { + val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") + + val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE + val ZK_TIMEOUT_MILLIS = 30000 + val RETRY_WAIT_MILLIS = 5000 + val ZK_CHECK_PERIOD_MILLIS = 10000 + val MAX_RECONNECT_ATTEMPTS = 3 + + private var zk: ZooKeeper = _ + + private val watcher = new ZooKeeperWatcher() + private var reconnectAttempts = 0 + private var closed = false + + /** Connect to ZooKeeper to start the session. Must be called before anything else. */ + def connect() { + connectToZooKeeper() + spawn(sessionMonitorThread) + } + + def sessionMonitorThread = { + while (!closed) { + Thread.sleep(ZK_CHECK_PERIOD_MILLIS) + if (zk.getState != ZooKeeper.States.CONNECTED) { + reconnectAttempts += 1 + val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts + if (attemptsLeft <= 0) { + logError("Could not connect to ZooKeeper: system failure") + zkWatcher.zkDown() + close() + } else { + logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...") + connectToZooKeeper() + } + } + } + } + + def close() { + if (!closed && zk != null) { zk.close() } + closed = true + } + + private def connectToZooKeeper() { + if (zk != null) zk.close() + zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher) + } + + /** + * Attempts to maintain a live ZooKeeper exception despite (very) transient failures. + * Mainly useful for handling the natural ZooKeeper session expiration. + */ + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (closed) { return } + + event.getState match { + case KeeperState.SyncConnected => + reconnectAttempts = 0 + zkWatcher.zkSessionCreated() + case KeeperState.Expired => + connectToZooKeeper() + case KeeperState.Disconnected => + logWarning("ZooKeeper disconnected, will retry...") + } + } + } + + def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = { + retry { + zk.create(path, bytes, ZK_ACL, createMode) + } + } + + def exists(path: String, watcher: Watcher = null): Stat = { + retry { + zk.exists(path, watcher) + } + } + + def getChildren(path: String, watcher: Watcher = null): List[String] = { + retry { + zk.getChildren(path, watcher).toList + } + } + + def getData(path: String): Array[Byte] = { + retry { + zk.getData(path, false, null) + } + } + + def delete(path: String, version: Int = -1): Unit = { + retry { + zk.delete(path, version) + } + } + + /** + * Creates the given directory (non-recursively) if it doesn't exist. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdir(path: String) { + if (exists(path) == null) { + try { + create(path, "".getBytes, CreateMode.PERSISTENT) + } catch { + case e: Exception => + // If the exception caused the directory not to be created, bubble it up, + // otherwise ignore it. + if (exists(path) == null) { throw e } + } + } + } + + /** + * Recursively creates all directories up to the given one. + * All znodes are created in PERSISTENT mode with no data. + */ + def mkdirRecursive(path: String) { + var fullDir = "" + for (dentry <- path.split("/").tail) { + fullDir += "/" + dentry + mkdir(fullDir) + } + } + + /** + * Retries the given function up to 3 times. The assumption is that failure is transient, + * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist), + * in which case the exception will be thrown without retries. + * + * @param fn Block to execute, possibly multiple times. + */ + def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + try { + fn + } catch { + case e: KeeperException.NoNodeException => throw e + case e: KeeperException.NodeExistsException => throw e + case e if n > 0 => + logError("ZooKeeper exception, " + n + " more retries...", e) + Thread.sleep(RETRY_WAIT_MILLIS) + retry(fn)(n-1) + } + } +} + +trait SparkZooKeeperWatcher { + /** + * Called whenever a ZK session is created -- + * this will occur when we create our first session as well as each time + * the session expires or errors out. + */ + def zkSessionCreated() + + /** + * Called if ZK appears to be completely down (i.e., not just a transient error). + * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead. + */ + def zkDown() +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 2ab7bb233c..26090c6a9c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -22,14 +22,14 @@ import scala.collection.mutable import org.apache.spark.util.Utils private[spark] class WorkerInfo( - val id: String, - val host: String, - val port: Int, - val cores: Int, - val memory: Int, - val actor: ActorRef, - val webUiPort: Int, - val publicAddress: String) + val id: String, + val host: String, + val port: Int, + val cores: Int, + val memory: Int, + val actor: ActorRef, + val webUiPort: Int, + val publicAddress: String) extends Serializable { Utils.checkHost(host, "Expected hostname") @@ -42,18 +42,18 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init() + init def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init() + init } - private def init() { - executors = new mutable.HashMap[String, ExecutorInfo] + private def init = { + executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 memoryUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala new file mode 100644 index 0000000000..4ca59e5b24 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -0,0 +1,109 @@ +package org.apache.spark.deploy.master + +import scala.collection.JavaConversions._ + +import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} +import org.apache.spark.Logging +import org.apache.zookeeper._ +import org.apache.zookeeper.Watcher.Event.EventType + +import akka.actor.{Cancellable, ActorRef} +import akka.util.duration._ + +class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) + extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { + + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" + + private val watcher = new ZooKeeperWatcher() + private val zk = new SparkZooKeeperSession(this) + private var status = LeadershipStatus.NOT_LEADER + private var myLeaderFile: String = _ + private var leaderUrl: String = _ + + override def preStart() { + logInfo("Starting ZooKeeper LeaderElection agent") + zk.connect() + } + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + myLeaderFile = + zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) + self ! CheckLeader + } + + override def zkDown() { + logError("ZooKeeper down! LeaderElectionAgent shutting down Master.") + System.exit(1) + } + + override def postStop() { + zk.close() + } + + override def receive = { + case CheckLeader => checkLeader() + } + + private class ZooKeeperWatcher extends Watcher { + def process(event: WatchedEvent) { + if (event.getType == EventType.NodeDeleted) { + logInfo("Leader file disappeared, a master is down!") + self ! CheckLeader + } + } + } + + /** Uses ZK leader election. Navigates several ZK potholes along the way. */ + def checkLeader() { + val masters = zk.getChildren(WORKING_DIR).toList + val leader = masters.sorted.get(0) + val leaderFile = WORKING_DIR + "/" + leader + + // Setup a watch for the current leader. + zk.exists(leaderFile, watcher) + + try { + leaderUrl = new String(zk.getData(leaderFile)) + } catch { + // A NoNodeException may be thrown if old leader died since the start of this method call. + // This is fine -- just check again, since we're guaranteed to see the new values. + case e: KeeperException.NoNodeException => + logInfo("Leader disappeared while reading it -- finding next leader") + checkLeader() + return + } + + val isLeader = myLeaderFile == leaderFile + if (!isLeader && leaderUrl == masterUrl) { + // We found a different master file pointing to this process. + // This can happen in the following two cases: + // (1) The master process was restarted on the same node. + // (2) The ZK server died between creating the node and returning the name of the node. + // For this case, we will end up creating a second file, and MUST explicitly delete the + // first one, since our ZK session is still open. + // Note that this deletion will cause a NodeDeleted event to be fired so we check again for + // leader changes. + logWarning("Cleaning up old ZK master election file that points to this master.") + zk.delete(leaderFile) + } else { + updateLeadershipStatus(isLeader) + } + } + + def updateLeadershipStatus(isLeader: Boolean) { + if (isLeader && status == LeadershipStatus.NOT_LEADER) { + status = LeadershipStatus.LEADER + masterActor ! ElectedLeader + } else if (!isLeader && status == LeadershipStatus.LEADER) { + status = LeadershipStatus.NOT_LEADER + masterActor ! RevokedLeadership + } + } + + private object LeadershipStatus extends Enumeration { + type LeadershipStatus = Value + val LEADER, NOT_LEADER = Value + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala new file mode 100644 index 0000000000..f45b62cbdd --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -0,0 +1,64 @@ +package org.apache.spark.deploy.master + +import org.apache.spark.Logging +import org.apache.zookeeper._ + +import akka.serialization.Serialization + +class ZooKeeperPersistenceEngine(serialization: Serialization) extends PersistenceEngine with SparkZooKeeperWatcher with Logging { + val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status" + + val zk = new SparkZooKeeperSession(this) + + zk.connect() + + override def zkSessionCreated() { + zk.mkdirRecursive(WORKING_DIR) + } + + override def zkDown() { + logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.") + } + + override def addApplication(app: ApplicationInfo) { + serializeIntoFile(WORKING_DIR + "/app_" + app.id, app) + } + + override def removeApplication(app: ApplicationInfo) { + zk.delete(WORKING_DIR + "/app_" + app.id) + } + + override def addWorker(worker: WorkerInfo) { + serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker) + } + + override def removeWorker(worker: WorkerInfo) { + zk.delete(WORKING_DIR + "/worker_" + worker.id) + } + + override def close() { + zk.close() + } + + override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = { + val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted + val appFiles = sortedFiles.filter(_.startsWith("app_")) + val apps = appFiles.map(deserializeFromFile[ApplicationInfo]) + val workerFiles = sortedFiles.filter(_.startsWith("worker_")) + val workers = workerFiles.map(deserializeFromFile[WorkerInfo]) + (apps, workers) + } + + private def serializeIntoFile(path: String, value: Serializable) { + val serializer = serialization.findSerializerFor(value) + val serialized = serializer.toBinary(value) + zk.create(path, serialized, CreateMode.PERSISTENT) + } + + def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = { + val fileData = zk.getData("/spark/master_status/" + filename) + val clazz = m.erasure.asInstanceOf[Class[T]] + val serializer = serialization.serializerFor(clazz) + serializer.fromBinary(fileData).asInstanceOf[T] + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 46455aa5ae..73fb0c8bd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -23,11 +23,11 @@ import java.io.File import scala.collection.mutable.HashMap -import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated} +import akka.actor._ import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.util.duration._ -import org.apache.spark.{SparkEnv, Logging} +import org.apache.spark.Logging import org.apache.spark.deploy.{ExecutorDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master @@ -35,14 +35,13 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} - private[spark] class Worker( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - var masterUrl: String, + masterUrls: Array[String], workDirPath: String = null) extends Actor with Logging { @@ -54,8 +53,16 @@ private[spark] class Worker( // Send a heartbeat every (heartbeat timeout) / 4 milliseconds val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4 + val REGISTRATION_TIMEOUT = 20.seconds + val REGISTRATION_RETRIES = 3 + + // Index into masterUrls that we're currently trying to register with. + var masterIndex = 0 + var master: ActorRef = null - var masterWebUiUrl : String = "" + var activeMasterUrl: String = "" + var activeMasterWebUiUrl : String = "" + var registered = false val workerId = generateWorkerId() var sparkHome: File = null var workDir: File = null @@ -103,35 +110,62 @@ private[spark] class Worker( webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi.start() - connectToMaster() + registerWithMaster() metricsSystem.registerSource(workerSource) metricsSystem.start() } - def connectToMaster() { - logInfo("Connecting to master " + masterUrl) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress) + def changeMaster(url: String, uiUrl: String) { + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } + def tryRegisterAllMasters() { + for (masterUrl <- masterUrls) { + logInfo("Connecting to master " + masterUrl + "...") + val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) + actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, + publicAddress) + } + } + + def registerWithMaster() { + tryRegisterAllMasters() + + var retries = 0 + lazy val retryTimer: Cancellable = + context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + retries += 1 + if (registered) { + retryTimer.cancel() + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } + } + retryTimer // start timer + } + override def receive = { - case RegisteredWorker(url) => - masterWebUiUrl = url - logInfo("Successfully registered with master") + case RegisteredWorker(masterUrl, masterWebUiUrl) => + logInfo("Successfully registered with master " + masterUrl) + registered = true + changeMaster(masterUrl, masterWebUiUrl) context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) } - case MasterChanged(url, uiUrl) => - logInfo("Master has changed, new master is at " + url) - masterUrl = url - masterWebUiUrl = uiUrl + case MasterChanged(masterUrl, masterWebUiUrl) => + logInfo("Master has changed, new master is at " + masterUrl) context.unwatch(master) - master = context.actorFor(Master.toAkkaUrl(masterUrl)) - context.watch(master) // Doesn't work with remote actors, but useful for testing + changeMaster(masterUrl, masterWebUiUrl) + val execs = executors.values. map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state)) sender ! WorkerSchedulerStateResponse(workerId, execs.toList) @@ -140,15 +174,19 @@ private[spark] class Worker( logError("Worker registration failed: " + message) System.exit(1) - case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) => - logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) - val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, - self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) - executors(appId + "/" + execId) = manager - manager.start() - coresUsed += cores_ - memoryUsed += memory_ - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") + } else { + logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) + val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, + self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING) + executors(appId + "/" + execId) = manager + manager.start() + coresUsed += cores_ + memoryUsed += memory_ + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) @@ -164,14 +202,18 @@ private[spark] class Worker( memoryUsed -= executor.memory } - case KillExecutor(appId, execId) => - val fullId = appId + "/" + execId - executors.get(fullId) match { - case Some(executor) => - logInfo("Asked to kill executor " + fullId) - executor.kill() - case None => - logInfo("Asked to kill unknown executor " + fullId) + case KillExecutor(masterUrl, appId, execId) => + if (masterUrl != activeMasterUrl) { + logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId) + } else { + val fullId = appId + "/" + execId + executors.get(fullId) match { + case Some(executor) => + logInfo("Asked to kill executor " + fullId) + executor.kill() + case None => + logInfo("Asked to kill unknown executor " + fullId) + } } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => @@ -179,8 +221,8 @@ private[spark] class Worker( case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, - finishedExecutors.values.toList, masterUrl, cores, memory, - coresUsed, memoryUsed, masterWebUiUrl) + finishedExecutors.values.toList, activeMasterUrl, cores, memory, + coresUsed, memoryUsed, activeMasterWebUiUrl) } } @@ -203,17 +245,18 @@ private[spark] object Worker { def main(argStrings: Array[String]) { val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, - args.memory, args.master, args.workDir) + args.memory, args.masters, args.workDir) actorSystem.awaitTermination() } def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, - masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = { + masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None) + : (ActorSystem, Int) = { // The LocalSparkCluster runs multiple local sparkWorkerX actor systems val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("") val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory, - masterUrl, workDir)), name = "Worker") + masterUrls, workDir)), name = "Worker") (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala index 0ae89a864f..16d8686490 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala @@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) { var webUiPort = 8081 var cores = inferDefaultCores() var memory = inferDefaultMemory() - var master: String = null + var masters: Array[String] = null var workDir: String = null // Check for settings in environment variables @@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) { printUsageAndExit(0) case value :: tail => - if (master != null) { // Two positional arguments were given + if (masters != null) { // Two positional arguments were given printUsageAndExit(1) } - master = value + masters = value.split(",") parse(tail) case Nil => - if (master == null) { // No positional argument was given + if (masters == null) { // No positional argument was given printUsageAndExit(1) } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala index 95d6007f3b..800f1cafcc 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala @@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node> - val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p> + val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p> val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span> diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index c173cdf449..8a3017e964 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -26,7 +26,7 @@ import org.apache.spark.util.Utils private[spark] class SparkDeploySchedulerBackend( scheduler: ClusterScheduler, sc: SparkContext, - master: String, + masters: Array[String], appName: String) extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) with ClientListener @@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend( val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome, "http://" + sc.ui.appUIAddress) - client = new Client(sc.env.actorSystem, master, appDesc, this) + client = new Client(sc.env.actorSystem, masters, appDesc, this) client.start() } @@ -75,6 +75,13 @@ private[spark] class SparkDeploySchedulerBackend( } } + override def dead() { + if (!stopping) { + logError("Spark cluster looks dead, giving up.") + scheduler.error("Spark cluster looks down") + } + } + override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) @@ -345,6 +345,17 @@ <scope>test</scope> </dependency> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>3.4.5</version> + <exclusions> + <exclusion> + <groupId>org.jboss.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 99cdadb9e7..156f501a04 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -211,6 +211,7 @@ object SparkBuild extends Build { "net.java.dev.jets3t" % "jets3t" % "0.7.1", "org.apache.avro" % "avro" % "1.7.4", "org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty), + "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty), "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", |