diff options
8 files changed, 122 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 31d1909279..979e65ac6c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -21,6 +21,7 @@ import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} +import org.apache.spark.deploy.master.MasterState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.util.Utils @@ -111,7 +112,8 @@ private[deploy] object DeployMessages { // Master to MasterWebUI case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo], - activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) { + activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo], + status: MasterState) { Utils.checkHost(host, "Required hostname") assert (port > 0) @@ -133,4 +135,7 @@ private[deploy] object DeployMessages { assert (port > 0) } + // Actor System to Worker + + case object SendHeartbeat } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index 87a703427c..f87b885286 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -71,7 +71,8 @@ private[spark] object JsonProtocol { ("memory" -> obj.workers.map(_.memory).sum) ~ ("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~ ("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~ - ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) + ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~ + ("status" -> obj.status.toString) } def writeWorkerState(obj: WorkerStateResponse) = { diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index aa2a10a8ad..198d5cee7b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -46,7 +46,8 @@ private[spark] class Client( listener: ClientListener) extends Logging { - val REGISTRATION_TIMEOUT = 60 * 1000 + val REGISTRATION_TIMEOUT = 20.seconds + val REGISTRATION_RETRIES = 3 var actor: ActorRef = null var appId: String = null @@ -61,7 +62,7 @@ private[spark] class Client( override def preStart() { try { - connectToMaster() + registerWithMaster() } catch { case e: Exception => logError("Failed to connect to master", e) @@ -70,19 +71,31 @@ private[spark] class Client( } } - def connectToMaster() { + def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorFor(Master.toAkkaUrl(masterUrl)) actor ! RegisterApplication(appDescription) } + } - context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) { - if (!registered) { - logError("All masters are unresponsive! Giving up.") - markDead() + def registerWithMaster() { + tryRegisterAllMasters() + + var retries = 0 + lazy val retryTimer: Cancellable = + context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + retries += 1 + if (registered) { + retryTimer.cancel() + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + markDead() + } else { + tryRegisterAllMasters() + } } - } + retryTimer // start timer } def changeMaster(url: String) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e13a8cba4a..093ce09b1d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -123,6 +123,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } override def preRestart(reason: Throwable, message: Option[Any]) { + super.preRestart(reason, message) // calls postStop()! logError("Master actor restarted due to exception", reason) } @@ -279,7 +280,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RequestMasterState => { - sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray) + sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray, + state) } case CheckForWorkerTimeOut => { @@ -297,14 +299,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) { for (app <- storedApps) { - registerApplication(app) - app.state = ApplicationState.UNKNOWN - app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + logInfo("Trying to recover app: " + app.id) + try { + registerApplication(app) + app.state = ApplicationState.UNKNOWN + app.driver ! MasterChanged(masterUrl, masterWebUiUrl) + } catch { + case e: Exception => logInfo("App " + app.id + " had exception on reconnect") + } } + for (worker <- storedWorkers) { - registerWorker(worker) - worker.state = WorkerState.UNKNOWN - worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + logInfo("Trying to recover worker: " + worker.id) + try { + registerWorker(worker) + worker.state = WorkerState.UNKNOWN + worker.actor ! MasterChanged(masterUrl, masterWebUiUrl) + } catch { + case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect") + } } } @@ -409,7 +422,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act workers += worker idToWorker(worker.id) = worker - actorToWorker(sender) = worker + actorToWorker(worker.actor) = worker addressToWorker(workerAddress) = worker } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala index 08fe5334cf..74a9f8cd82 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala @@ -38,6 +38,8 @@ private[master] object MasterMessages { case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) + case object CompleteRecovery + case object RequestWebUIPort case class WebUIPortResponse(webUIBoundPort: Int) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index f8e86d633f..065635af85 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -44,10 +44,18 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) } override def zkSessionCreated() { - zk.mkdirRecursive(WORKING_DIR) - myLeaderFile = - zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) - self ! CheckLeader + synchronized { + zk.mkdirRecursive(WORKING_DIR) + myLeaderFile = + zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL) + self ! CheckLeader + } + } + + override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) { + logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason) + Thread.sleep(zk.ZK_TIMEOUT_MILLIS) + super.preRestart(reason, message) } override def zkDown() { @@ -75,7 +83,7 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) /** Uses ZK leader election. Navigates several ZK potholes along the way. */ def checkLeader() { val masters = zk.getChildren(WORKING_DIR).toList - val leader = masters.sorted.get(0) + val leader = masters.sorted.head val leaderFile = WORKING_DIR + "/" + leader // Setup a watch for the current leader. @@ -92,20 +100,25 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) return } - val isLeader = myLeaderFile == leaderFile - if (!isLeader && leaderUrl == masterUrl) { - // We found a different master file pointing to this process. - // This can happen in the following two cases: - // (1) The master process was restarted on the same node. - // (2) The ZK server died between creating the node and returning the name of the node. - // For this case, we will end up creating a second file, and MUST explicitly delete the - // first one, since our ZK session is still open. - // Note that this deletion will cause a NodeDeleted event to be fired so we check again for - // leader changes. - logWarning("Cleaning up old ZK master election file that points to this master.") - zk.delete(leaderFile) - } else { - updateLeadershipStatus(isLeader) + // Synchronization used to ensure no interleaving between the creation of a new session and the + // checking of a leader, which could cause us to delete our real leader file erroneously. + synchronized { + val isLeader = myLeaderFile == leaderFile + if (!isLeader && leaderUrl == masterUrl) { + // We found a different master file pointing to this process. + // This can happen in the following two cases: + // (1) The master process was restarted on the same node. + // (2) The ZK server died between creating the node and returning the name of the node. + // For this case, we will end up creating a second file, and MUST explicitly delete the + // first one, since our ZK session is still open. + // Note that this deletion will cause a NodeDeleted event to be fired so we check again for + // leader changes. + assert(leaderFile < myLeaderFile) + logWarning("Cleaning up old ZK master election file that points to this master.") + zk.delete(leaderFile) + } else { + updateLeadershipStatus(isLeader) + } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 73fb0c8bd8..25ba75619a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -59,10 +59,12 @@ private[spark] class Worker( // Index into masterUrls that we're currently trying to register with. var masterIndex = 0 + val masterLock: Object = new Object() var master: ActorRef = null var activeMasterUrl: String = "" var activeMasterWebUiUrl : String = "" - var registered = false + @volatile var registered = false + @volatile var connected = false val workerId = generateWorkerId() var sparkHome: File = null var workDir: File = null @@ -102,6 +104,7 @@ private[spark] class Worker( } override def preStart() { + assert(!registered) logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format( host, port, cores, Utils.megabytesToString(memory))) sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) @@ -117,11 +120,14 @@ private[spark] class Worker( } def changeMaster(url: String, uiUrl: String) { - activeMasterUrl = url - activeMasterWebUiUrl = uiUrl - master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) - context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) - context.watch(master) // Doesn't work with remote actors, but useful for testing + masterLock.synchronized { + activeMasterUrl = url + activeMasterWebUiUrl = uiUrl + master = context.actorFor(Master.toAkkaUrl(activeMasterUrl)) + context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) + context.watch(master) // Doesn't work with remote actors, but useful for testing + connected = true + } } def tryRegisterAllMasters() { @@ -157,8 +163,11 @@ private[spark] class Worker( logInfo("Successfully registered with master " + masterUrl) registered = true changeMaster(masterUrl, masterWebUiUrl) - context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { - master ! Heartbeat(workerId) + context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat) + + case SendHeartbeat => + masterLock.synchronized { + if (connected) { master ! Heartbeat(workerId) } } case MasterChanged(masterUrl, masterWebUiUrl) => @@ -171,8 +180,10 @@ private[spark] class Worker( sender ! WorkerSchedulerStateResponse(workerId, execs.toList) case RegisterWorkerFailed(message) => - logError("Worker registration failed: " + message) - System.exit(1) + if (!registered) { + logError("Worker registration failed: " + message) + System.exit(1) + } case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) => if (masterUrl != activeMasterUrl) { @@ -185,11 +196,15 @@ private[spark] class Worker( manager.start() coresUsed += cores_ memoryUsed += memory_ - master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, manager.state, None, None) + } } case ExecutorStateChanged(appId, execId, state, message, exitStatus) => - master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) + masterLock.synchronized { + master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) + } val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { val executor = executors(fullId) @@ -216,7 +231,13 @@ private[spark] class Worker( } } - case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => + case Terminated(actor_) if actor_ == master => + masterDisconnected() + + case RemoteClientDisconnected(transport, address) if address == master.path.address => + masterDisconnected() + + case RemoteClientShutdown(transport, address) if address == master.path.address => masterDisconnected() case RequestWorkerState => { @@ -228,6 +249,7 @@ private[spark] class Worker( def masterDisconnected() { logError("Connection to master failed! Waiting for master to reconnect...") + connected = false } def generateWorkerId(): String = { diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index 4346571c4d..c59e1f4de6 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo} import org.apache.spark.deploy.worker.ExecutorRunner class JsonProtocolSuite extends FunSuite { @@ -53,7 +53,8 @@ class JsonProtocolSuite extends FunSuite { val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo()) val activeApps = Array[ApplicationInfo](createAppInfo()) val completedApps = Array[ApplicationInfo]() - val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps) + val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps, + MasterState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) } |