diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/deploy/master/Master.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 232 |
1 files changed, 199 insertions, 33 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index cb0fe6a850..26f980760d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -23,23 +23,25 @@ import java.text.SimpleDateFormat import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Await import scala.concurrent.duration._ +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.ExecutionContext.Implicits.global import akka.actor._ import akka.pattern.ask import akka.remote._ +import akka.util.Timeout import org.apache.spark.{Logging, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ +import org.apache.spark.deploy.master.MasterMessages._ import org.apache.spark.deploy.master.ui.MasterWebUI import org.apache.spark.metrics.MetricsSystem import org.apache.spark.util.{Utils, AkkaUtils} -import akka.util.Timeout import org.apache.spark.deploy.DeployMessages.RegisterWorkerFailed import org.apache.spark.deploy.DeployMessages.KillExecutor import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged import scala.Some -import org.apache.spark.deploy.DeployMessages.WebUIPortResponse import org.apache.spark.deploy.DeployMessages.LaunchExecutor import org.apache.spark.deploy.DeployMessages.RegisteredApplication import org.apache.spark.deploy.DeployMessages.RegisterWorker @@ -51,6 +53,8 @@ import org.apache.spark.deploy.DeployMessages.ApplicationRemoved import org.apache.spark.deploy.DeployMessages.Heartbeat import org.apache.spark.deploy.DeployMessages.RegisteredWorker import akka.actor.Terminated +import akka.serialization.SerializationExtension +import java.util.concurrent.TimeUnit private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { @@ -58,7 +62,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000 val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt - + val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "") + val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE") + var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] val idToWorker = new HashMap[String, WorkerInfo] @@ -88,52 +94,115 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (envVar != null) envVar else host } + val masterUrl = "spark://" + host + ":" + port + var masterWebUiUrl: String = _ + + var state = RecoveryState.STANDBY + + var persistenceEngine: PersistenceEngine = _ + + var leaderElectionAgent: ActorRef = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean override def preStart() { - logInfo("Starting Spark master at spark://" + host + ":" + port) + logInfo("Starting Spark master at " + masterUrl) // Listen for remote client disconnection events, since they don't go through Akka's watch() context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent]) webUi.start() - import context.dispatcher + masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut) masterMetricsSystem.registerSource(masterSource) masterMetricsSystem.start() applicationMetricsSystem.start() + + persistenceEngine = RECOVERY_MODE match { + case "ZOOKEEPER" => + logInfo("Persisting recovery state to ZooKeeper") + new ZooKeeperPersistenceEngine(SerializationExtension(context.system)) + case "FILESYSTEM" => + logInfo("Persisting recovery state to directory: " + RECOVERY_DIR) + new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system)) + case _ => + new BlackHolePersistenceEngine() + } + + leaderElectionAgent = RECOVERY_MODE match { + case "ZOOKEEPER" => + context.actorOf(Props(classOf[ZooKeeperLeaderElectionAgent], self, masterUrl)) + case _ => + context.actorOf(Props(classOf[MonarchyLeaderAgent], self)) + } + } + + override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) // calls postStop()! + logError("Master actor restarted due to exception", reason) } override def postStop() { webUi.stop() masterMetricsSystem.stop() applicationMetricsSystem.stop() + persistenceEngine.close() + context.stop(leaderElectionAgent) } override def receive = { - case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => { + case ElectedLeader => { + val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() + state = if (storedApps.isEmpty && storedWorkers.isEmpty) + RecoveryState.ALIVE + else + RecoveryState.RECOVERING + + logInfo("I have been elected leader! New state: " + state) + + if (state == RecoveryState.RECOVERING) { + beginRecovery(storedApps, storedWorkers) + context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + } + } + + case RevokedLeadership => { + logError("Leadership has been revoked -- master shutting down.") + System.exit(0) + } + + case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (idToWorker.contains(id)) { + if (state == RecoveryState.STANDBY) { + // ignore, don't send response + } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { - addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) + val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) + registerWorker(worker) context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) + persistenceEngine.addWorker(worker) + sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } } case RegisterApplication(description) => { - logInfo("Registering app " + description.name) - val app = addApplication(description, sender) - logInfo("Registered app " + description.name + " with ID " + app.id) - waitingApps += app - context.watch(sender) // This doesn't work with remote actors but helps for testing - sender ! RegisteredApplication(app.id) - schedule() + if (state == RecoveryState.STANDBY) { + // ignore, don't send response + } else { + logInfo("Registering app " + description.name) + val app = createApplication(description, sender) + registerApplication(app) + logInfo("Registered app " + description.name + " with ID " + app.id) + context.watch(sender) // This doesn't work with remote actors but helps for testing + persistenceEngine.addApplication(app) + sender ! RegisteredApplication(app.id, masterUrl) + schedule() + } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { @@ -173,27 +242,63 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + case MasterChangeAcknowledged(appId) => { + idToApp.get(appId) match { + case Some(app) => + logInfo("Application has been re-registered: " + appId) + app.state = ApplicationState.WAITING + case None => + logWarning("Master change ack from unknown app: " + appId) + } + + if (canCompleteRecovery) { completeRecovery() } + } + + case WorkerSchedulerStateResponse(workerId, executors) => { + idToWorker.get(workerId) match { + case Some(worker) => + logInfo("Worker has been re-registered: " + workerId) + worker.state = WorkerState.ALIVE + + val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined) + for (exec <- validExecutors) { + val app = idToApp.get(exec.appId).get + val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId)) + worker.addExecutor(execInfo) + execInfo.copyState(exec) + } + case None => + logWarning("Scheduler state from unknown worker: " + workerId) + } + + if (canCompleteRecovery) { completeRecovery() } + } + case Terminated(actor) => { // The disconnected actor could've been either a worker or an app; remove whichever of // those we have an entry for in the corresponding actor hashmap actorToWorker.get(actor).foreach(removeWorker) actorToApp.get(actor).foreach(finishApplication) + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case AssociationErrorEvent(_, _, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, + state) } case CheckForWorkerTimeOut => { @@ -205,6 +310,50 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } } + def canCompleteRecovery = + workers.count(_.state == WorkerState.UNKNOWN) == 0 && + apps.count(_.state == ApplicationState.UNKNOWN) == 0 + + def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { + for (app <- storedApps) { + logInfo("Trying to recover app: " + app.id) + try { + registerApplication(app) + app.state = ApplicationState.UNKNOWN + app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + } catch { + case e: Exception => logInfo("App " + app.id + " had exception on reconnect") + } + } + + for (worker <- storedWorkers) { + logInfo("Trying to recover worker: " + worker.id) + try { + registerWorker(worker) + worker.state = WorkerState.UNKNOWN + worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + } catch { + case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") + } + } + } + + def completeRecovery() { + // Ensure "only-once" recovery semantics using a short synchronization period. + synchronized { + if (state != RecoveryState.RECOVERING) { return } + state = RecoveryState.COMPLETING_RECOVERY + } + + // Kill off any workers and apps that didn't respond to us. + workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) + apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) + + state = RecoveryState.ALIVE + schedule() + logInfo("Recovery complete - resuming operations!") + } + /** * Can an app use the given worker? True if the worker has enough memory and we haven't already * launched an executor for the app on it (right now the standalone backend doesn't like having @@ -219,6 +368,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act * every time a new app joins or resource availability changes. */ def schedule() { + if (state != RecoveryState.ALIVE) { return } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { @@ -266,14 +416,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) - worker.actor ! LaunchExecutor( + worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) } - def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int, - publicAddress: String): WorkerInfo = { + def registerWorker(worker: WorkerInfo): Unit = { // There may be one or more refs to dead workers on this same node (w/ different ID's), // remove them. workers.filter { w => @@ -281,12 +430,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act }.foreach { w => workers -= w } - val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress) + + val workerAddress = worker.actor.path.address + if (addressToWorker.contains(workerAddress)) { + logInfo("Attempted to re-register worker at same address: " + workerAddress) + return + } + workers += worker idToWorker(worker.id) = worker - actorToWorker(sender) = worker - addressToWorker(sender.path.address) = worker - worker + actorToWorker(worker.actor) = worker + addressToWorker(workerAddress) = worker } def removeWorker(worker: WorkerInfo) { @@ -301,25 +455,36 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act exec.id, ExecutorState.LOST, Some("worker lost"), None) exec.application.removeExecutor(exec) } + persistenceEngine.removeWorker(worker) } - def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { + def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = { val now = System.currentTimeMillis() val date = new Date(now) - val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl) + } + + def registerApplication(app: ApplicationInfo): Unit = { + val appAddress = app.driver.path.address + if (addressToWorker.contains(appAddress)) { + logInfo("Attempted to re-register application at same address: " + appAddress) + return + } + applicationMetricsSystem.registerSource(app.appSource) apps += app idToApp(app.id) = app - actorToApp(driver) = app - addressToApp(driver.path.address) = app + actorToApp(app.driver) = app + addressToApp(appAddress) = app if (firstApp == None) { firstApp = Some(app) } + // TODO: What is firstApp?? Can we remove it? val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray - if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) { + if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) { logWarning("Could not find any workers with enough memory for " + firstApp.get.id) } - app + waitingApps += app } def finishApplication(app: ApplicationInfo) { @@ -344,13 +509,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act waitingApps -= app for (exec <- app.executors.values) { exec.worker.removeExecutor(exec) - exec.worker.actor ! KillExecutor(exec.application.id, exec.id) + exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id) exec.state = ExecutorState.KILLED } app.markFinished(state) if (state != ApplicationState.FINISHED) { app.driver ! ApplicationRemoved(state.toString) } + persistenceEngine.removeApplication(app) schedule() } } @@ -404,8 +570,8 @@ private[spark] object Master { def startSystemAndActor(host: String, port: Int, webUiPort: Int): (ActorSystem, Int, Int) = { val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port) val actor = actorSystem.actorOf(Props(classOf[Master], host, boundPort, webUiPort), name = actorName) - val timeoutDuration = Duration.create( - System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds") + val timeoutDuration : FiniteDuration = Duration.create( + System.getProperty("spark.akka.askTimeout", "10").toLong, TimeUnit.SECONDS) implicit val timeout = Timeout(timeoutDuration) val respFuture = actor ? RequestWebUIPort // ask pattern val resp = Await.result(respFuture, timeoutDuration).asInstanceOf[WebUIPortResponse] |