aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-09-17 09:40:06 -0700
committerAaron Davidson <aaron@databricks.com>2013-09-26 14:59:35 -0700
commitd5a96feccb15dd290b282af9e2f94479c8e4554e (patch)
tree55146010e613178553ff6fd1bc35e5d4d53addcf /core
parent13eced723f222095ea4b52c4f6cb078cae66342e (diff)
downloadspark-d5a96feccb15dd290b282af9e2f94479c8e4554e.tar.gz
spark-d5a96feccb15dd290b282af9e2f94479c8e4554e.tar.bz2
spark-d5a96feccb15dd290b282af9e2f94479c8e4554e.zip
Standalone Scheduler fault recovery
Implements a basic form of Standalone Scheduler fault recovery. In particular, this allows faults to be manually recovered from by means of restarting the Master process on the same machine. This is the majority of the code necessary for general fault tolerance, which will first elect a leader and then recover the Master state. In order to enable fault recovery, the Master will persist a small amount of state related to the registration of Workers and Applications to disk. If the Master is started and sees that this state is still around, it will enter Recovery mode, during which time it will not schedule any new Executors on Workers (but it does accept the registration of new Clients and Workers). At this point, the Master attempts to reconnect to all Workers and Client applications that were registered at the time of failure. After confirming either the existence or nonexistence of all such nodes (within a certain timeout), the Master will exit Recovery mode and resume normal scheduling.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala17
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala65
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala154
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala50
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala3
15 files changed, 458 insertions, 75 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 1cfff5e565..0d0745a480 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -52,6 +52,8 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage
+ case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
+
case class Heartbeat(workerId: String) extends DeployMessage
// Master to Worker
@@ -76,6 +78,8 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
+ case class MasterChangeAcknowledged(appId: String)
+
// Master to Client
case class RegisteredApplication(appId: String) extends DeployMessage
@@ -94,6 +98,10 @@ private[deploy] object DeployMessages {
case object StopClient
+ // Master to Worker & Client
+
+ case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
+
// MasterWebUI To Master
case object RequestMasterState
@@ -127,6 +135,10 @@ private[deploy] object DeployMessages {
case object CheckForWorkerTimeOut
+ case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+
+ case object EndRecoveryProcess
+
case object RequestWebUIPort
case class WebUIPortResponse(webUIBoundPort: Int)
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
new file mode 100644
index 0000000000..716ee483d5
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+private[spark] class ExecutorDescription(
+ val appId: String,
+ val execId: Int,
+ val cores: Int,
+ val state: ExecutorState.Value)
+ extends Serializable {
+
+ override def toString: String =
+ "ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index a342dd724a..28548a2ca9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -92,20 +92,25 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
+ case MasterChanged(materUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
+ context.unwatch(master)
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
+ masterAddress = master.path.address
+ sender ! MasterChangeAcknowledged(appId)
+ context.watch(master)
+
case Terminated(actor_) if actor_ == master =>
- logError("Connection to master failed; stopping client")
+ logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case RemoteClientDisconnected(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case RemoteClientShutdown(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logError("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case StopClient =>
markDisconnected()
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bd5327627a..e437a0e7ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -23,29 +23,52 @@ import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class ApplicationInfo(
- val startTime: Long,
- val id: String,
- val desc: ApplicationDescription,
- val submitDate: Date,
- val driver: ActorRef,
- val appUiUrl: String)
-{
- var state = ApplicationState.WAITING
- var executors = new mutable.HashMap[Int, ExecutorInfo]
- var coresGranted = 0
- var endTime = -1L
- val appSource = new ApplicationSource(this)
-
- private var nextExecutorId = 0
-
- def newExecutorId(): Int = {
- val id = nextExecutorId
- nextExecutorId += 1
- id
+ val startTime: Long,
+ val id: String,
+ val desc: ApplicationDescription,
+ val submitDate: Date,
+ val driver: ActorRef,
+ val appUiUrl: String)
+ extends Serializable {
+
+ @transient var state: ApplicationState.Value = _
+ @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
+ @transient var coresGranted: Int = _
+ @transient var endTime: Long = _
+ @transient var appSource: ApplicationSource = _
+
+ @transient private var nextExecutorId: Int = _
+
+ init()
+
+ private def readObject(in: java.io.ObjectInputStream) : Unit = {
+ in.defaultReadObject()
+ init()
+ }
+
+ private def init() {
+ state = ApplicationState.WAITING
+ executors = new mutable.HashMap[Int, ExecutorInfo]
+ coresGranted = 0
+ endTime = -1L
+ appSource = new ApplicationSource(this)
+ nextExecutorId = 0
+ }
+
+ private def newExecutorId(useID: Option[Int] = None): Int = {
+ useID match {
+ case Some(id) =>
+ nextExecutorId = math.max(nextExecutorId, id + 1)
+ id
+ case None =>
+ val id = nextExecutorId
+ nextExecutorId += 1
+ id
+ }
}
- def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
- val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+ def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
+ val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index 7e804223cf..fedf879eff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -18,11 +18,11 @@
package org.apache.spark.deploy.master
private[spark] object ApplicationState
- extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+ extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
type ApplicationState = Value
- val WAITING, RUNNING, FINISHED, FAILED = Value
+ val WAITING, RUNNING, FINISHED, FAILED, UNKNOWN = Value
val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
index cf384a985e..d235234c13 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
private[spark] class ExecutorInfo(
val id: Int,
@@ -28,5 +28,10 @@ private[spark] class ExecutorInfo(
var state = ExecutorState.LAUNCHING
+ /** Copy all state variables from the given on-the-wire ExecutorDescription. */
+ def copyState(execDesc: ExecutorDescription) {
+ state = execDesc.state
+ }
+
def fullId: String = application.id + "/" + id
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
new file mode 100644
index 0000000000..2fc13821bd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.io._
+
+import scala.Serializable
+
+import akka.serialization.Serialization
+import org.apache.spark.Logging
+
+/**
+ * Stores data in a single on-disk directory with one file per application and worker.
+ * Files are deleted when applications and workers are removed.
+ *
+ * @param dir Directory to store files. Created if non-existent (but not recursively).
+ * @param serialization Used to serialize our objects.
+ */
+private[spark] class FileSystemPersistenceEngine(
+ val dir: String,
+ val serialization: Serialization)
+ extends PersistenceEngine with Logging {
+
+ new File(dir).mkdir()
+
+ override def addApplication(app: ApplicationInfo) {
+ val appFile = new File(dir + File.separator + "app_" + app.id)
+ serializeIntoFile(appFile, app)
+ }
+
+ override def removeApplication(app: ApplicationInfo) {
+ new File(dir + File.separator + "app_" + app.id).delete()
+ }
+
+ override def addWorker(worker: WorkerInfo) {
+ val workerFile = new File(dir + File.separator + "worker_" + worker.id)
+ serializeIntoFile(workerFile, worker)
+ }
+
+ override def removeWorker(worker: WorkerInfo) {
+ new File(dir + File.separator + "worker_" + worker.id).delete()
+ }
+
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ val sortedFiles = new File(dir).listFiles().sortBy(_.getName())
+ val appFiles = sortedFiles.filter(_.getName().startsWith("app_"))
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_))
+ val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_"))
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_))
+ (apps, workers)
+ }
+
+ private def serializeIntoFile(file: File, value: Serializable) {
+ val created = file.createNewFile()
+ if (!created) { throw new IllegalStateException("Could not create file: " + file) }
+
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+
+ val out = new FileOutputStream(file)
+ out.write(serialized)
+ out.close()
+ }
+
+ def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
+ val fileData = new Array[Byte](file.length().asInstanceOf[Int])
+ val dis = new DataInputStream(new FileInputStream(file))
+ dis.readFully(fileData)
+ dis.close()
+
+ val clazz = m.erasure.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
+ serializer.fromBinary(fileData).asInstanceOf[T]
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index bde59905bc..c6e039eed4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -27,16 +27,17 @@ import akka.actor.Terminated
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.serialization.SerializationExtension
import akka.util.duration._
-import akka.util.Timeout
+import akka.util.{Duration, Timeout}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.MasterState.MasterState
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.util.{Duration, Timeout}
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
@@ -44,7 +45,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
-
+ val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
+
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@@ -74,6 +76,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (envVar != null) envVar else host
}
+ var state: MasterState = _
+
+ var persistenceEngine: PersistenceEngine = _
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
@@ -89,6 +95,23 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
+
+ persistenceEngine =
+ if (RECOVERY_DIR.isEmpty()) {
+ new BlackHolePersistenceEngine()
+ } else {
+ logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+ new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
+ }
+
+ val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+ state =
+ if (storedApps.isEmpty && storedWorkers.isEmpty) {
+ MasterState.ALIVE
+ } else {
+ self ! BeginRecovery(storedApps, storedWorkers)
+ MasterState.RECOVERING
+ }
}
override def postStop() {
@@ -98,14 +121,16 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+ case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ registerWorker(worker)
context.watch(sender) // This doesn't work with remote actors but helps for testing
+ persistenceEngine.addWorker(worker)
sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
schedule()
}
@@ -113,10 +138,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RegisterApplication(description) => {
logInfo("Registering app " + description.name)
- val app = addApplication(description, sender)
+ val app = createApplication(description, sender)
+ registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
- waitingApps += app
context.watch(sender) // This doesn't work with remote actors but helps for testing
+ persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id)
schedule()
}
@@ -158,23 +184,78 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ case BeginRecovery(storedApps, storedWorkers) => {
+ context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess)
+
+ val masterUrl = "spark://" + host + ":" + port
+ val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
+ for (app <- storedApps) {
+ registerApplication(app)
+ app.state = ApplicationState.UNKNOWN
+ app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ }
+ for (worker <- storedWorkers) {
+ registerWorker(worker)
+ worker.state = WorkerState.UNKNOWN
+ worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ }
+ }
+
+ case MasterChangeAcknowledged(appId) => {
+ val appOption = idToApp.get(appId)
+ appOption match {
+ case Some(app) =>
+ app.state = ApplicationState.WAITING
+ case None =>
+ logWarning("Master change ack from unknown app: " + appId)
+ }
+
+ if (canCompleteRecovery) { completeRecovery() }
+ }
+
+ case WorkerSchedulerStateResponse(workerId, executors) => {
+ idToWorker.get(workerId) match {
+ case Some(worker) =>
+ worker.state = WorkerState.ALIVE
+
+ val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None)
+ for (exec <- validExecutors) {
+ val app = idToApp.get(exec.appId).get
+ val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
+ worker.addExecutor(execInfo)
+ execInfo.copyState(exec)
+ }
+ case None =>
+ logWarning("Scheduler state from unknown worker: " + workerId)
+ }
+
+ if (canCompleteRecovery) { completeRecovery() }
+ }
+
+ case EndRecoveryProcess => {
+ completeRecovery()
+ }
+
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
+ if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
+ if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
+ if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
@@ -190,6 +271,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ def canCompleteRecovery =
+ workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
+ apps.count(_.state == ApplicationState.UNKNOWN) == 0
+
+ def completeRecovery() {
+ synchronized {
+ if (state != MasterState.RECOVERING) { return }
+ state = MasterState.COMPLETING_RECOVERY
+ }
+
+ // Kill off any workers and apps that didn't respond to us.
+ workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_))
+ apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_))
+
+ state = MasterState.ALIVE
+ schedule()
+ logInfo("Recovery complete - resuming operations!")
+ }
+
/**
* Can an app use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the app on it (right now the standalone backend doesn't like having
@@ -204,6 +304,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* every time a new app joins or resource availability changes.
*/
def schedule() {
+ if (state != MasterState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
@@ -257,8 +358,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
- publicAddress: String): WorkerInfo = {
+ def registerWorker(worker: WorkerInfo): Unit = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
@@ -266,12 +366,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}.foreach { w =>
workers -= w
}
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+
+ val workerAddress = worker.actor.path.address
+ if (addressToWorker.contains(workerAddress)) {
+ logInfo("Attempted to re-register worker at same address: " + workerAddress)
+ return
+ }
+
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
- addressToWorker(sender.path.address) = worker
- worker
+ addressToWorker(workerAddress) = worker
}
def removeWorker(worker: WorkerInfo) {
@@ -286,25 +391,35 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
+ persistenceEngine.removeWorker(worker)
}
- def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
+ def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ }
+
+ def registerApplication(app: ApplicationInfo): Unit = {
+ val appAddress = app.driver.path.address
+ if (addressToWorker.contains(appAddress)) {
+ logInfo("Attempted to re-register application at same address: " + appAddress)
+ return
+ }
+
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
- actorToApp(driver) = app
- addressToApp(driver.path.address) = app
+ actorToApp(app.driver) = app
+ addressToApp(appAddress) = app
if (firstApp == None) {
firstApp = Some(app)
}
val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
- if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
- app
+ waitingApps += app
}
def finishApplication(app: ApplicationInfo) {
@@ -336,6 +451,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (state != ApplicationState.FINISHED) {
app.driver ! ApplicationRemoved(state.toString)
}
+ persistenceEngine.removeApplication(app)
schedule()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
new file mode 100644
index 0000000000..9ea5e9752e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object MasterState
+ extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
+
+ type MasterState = Value
+
+ val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
new file mode 100644
index 0000000000..07d23c6bf3
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+/**
+ * Allows Master to persist any state that is necessary in order to recover from a failure.
+ * The following semantics are required:
+ * - addApplication and addWorker are called before completing registration of a new app/worker.
+ * - removeApplication and removeWorker are called at any time.
+ * Given these two requirements, we will have all apps and workers persisted, but
+ * we might not have yet deleted apps or workers that finished.
+ */
+trait PersistenceEngine {
+ def addApplication(app: ApplicationInfo)
+
+ def removeApplication(app: ApplicationInfo)
+
+ def addWorker(worker: WorkerInfo)
+
+ def removeWorker(worker: WorkerInfo)
+
+ /**
+ * Returns the persisted data sorted by their respective ids (which implies that they're
+ * sorted by time order of creation).
+ */
+ def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+}
+
+class BlackHolePersistenceEngine extends PersistenceEngine {
+ override def addApplication(app: ApplicationInfo) {}
+ override def removeApplication(app: ApplicationInfo) {}
+ override def addWorker(worker: WorkerInfo) {}
+ override def removeWorker(worker: WorkerInfo) {}
+ override def readPersistedData() = (Nil, Nil)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 6219f11f2a..2ab7bb233c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -29,21 +29,37 @@ private[spark] class WorkerInfo(
val memory: Int,
val actor: ActorRef,
val webUiPort: Int,
- val publicAddress: String) {
+ val publicAddress: String)
+ extends Serializable {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
- var state: WorkerState.Value = WorkerState.ALIVE
- var coresUsed = 0
- var memoryUsed = 0
+ @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
+ @transient var state: WorkerState.Value = _
+ @transient var coresUsed: Int = _
+ @transient var memoryUsed: Int = _
- var lastHeartbeat = System.currentTimeMillis()
+ @transient var lastHeartbeat: Long = _
+
+ init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
+ private def readObject(in: java.io.ObjectInputStream) : Unit = {
+ in.defaultReadObject()
+ init()
+ }
+
+ private def init() {
+ executors = new mutable.HashMap[String, ExecutorInfo]
+ state = WorkerState.ALIVE
+ coresUsed = 0
+ memoryUsed = 0
+ lastHeartbeat = System.currentTimeMillis()
+ }
+
def hostPort: String = {
assert (port > 0)
host + ":" + port
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index b5ee6dca79..c8d34f25e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,8 +17,10 @@
package org.apache.spark.deploy.master
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+private[spark] object WorkerState
+ extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
+
type WorkerState = Value
- val ALIVE, DEAD, DECOMMISSIONED = Value
+ val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index e3dc30eefc..8fabc95665 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,7 +43,8 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
- val workDir: File)
+ val workDir: File,
+ var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
@@ -83,7 +84,8 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+ state = ExecutorState.KILLED
+ worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -180,9 +182,9 @@ private[spark] class ExecutorRunner(
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
val exitCode = process.waitFor()
+ state = ExecutorState.FAILED
val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
- Some(exitCode))
+ worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -192,8 +194,9 @@ private[spark] class ExecutorRunner(
if (process != null) {
process.destroy()
}
+ state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+ worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 09530beb3b..46455aa5ae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -27,8 +27,8 @@ import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
@@ -42,7 +42,7 @@ private[spark] class Worker(
webUiPort: Int,
cores: Int,
memory: Int,
- masterUrl: String,
+ var masterUrl: String,
workDirPath: String = null)
extends Actor with Logging {
@@ -125,19 +125,30 @@ private[spark] class Worker(
master ! Heartbeat(workerId)
}
+ case MasterChanged(url, uiUrl) =>
+ logInfo("Master has changed, new master is at " + url)
+ masterUrl = url
+ masterWebUiUrl = uiUrl
+ context.unwatch(master)
+ master = context.actorFor(Master.toAkkaUrl(masterUrl))
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ val execs = executors.values.
+ map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+ sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+
case RegisterWorkerFailed(message) =>
logError("Worker registration failed: " + message)
System.exit(1)
case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- val manager = new ExecutorRunner(
- appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
@@ -174,11 +185,7 @@ private[spark] class Worker(
}
def masterDisconnected() {
- // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
- // (Note that if reconnecting we would also need to assign IDs differently.)
- logError("Connection to master failed! Shutting down.")
- executors.values.foreach(_.kill())
- System.exit(1)
+ logError("Connection to master failed! Waiting for master to reconnect...")
}
def generateWorkerId(): String = {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0c..c173cdf449 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -71,8 +71,7 @@ private[spark] class SparkDeploySchedulerBackend(
override def disconnected() {
if (!stopping) {
- logError("Disconnected from Spark cluster!")
- scheduler.error("Disconnected from Spark cluster")
+ logError("Disconnected from Spark cluster! Waiting for reconnection...")
}
}