aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-10-10 17:16:42 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-10-10 17:16:42 -0700
commitc71499b7795564e1d16495c59273ecc027070fc5 (patch)
tree3476cb0d4836bbb25308bb8f65e6a1fbdeea2b1a /core
parentcd08f73483658b872701ec1f74ce84933a45c6f0 (diff)
parent66c20635fa1fe18604bb4042ce31152180cb541d (diff)
downloadspark-c71499b7795564e1d16495c59273ecc027070fc5.tar.gz
spark-c71499b7795564e1d16495c59273ecc027070fc5.tar.bz2
spark-c71499b7795564e1d16495c59273ecc027070fc5.zip
Merge pull request #19 from aarondav/master-zk
Standalone Scheduler fault tolerance using ZooKeeper This patch implements full distributed fault tolerance for standalone scheduler Masters. There is only one master Leader at a time, which is actively serving scheduling requests. If this Leader crashes, another master will eventually be elected, reconstruct the state from the first Master, and continue serving scheduling requests. Leader election is performed using the ZooKeeper leader election pattern. We try to minimize the use of ZooKeeper and the assumptions about ZooKeeper's behavior, so there is a layer of retries and session monitoring on top of the ZooKeeper client. Master failover follows directly from the single-node Master recovery via the file system (patch d5a96fe), save that the Master state is stored in ZooKeeper instead. Configuration: By default, no recovery mechanism is enabled (spark.deploy.recoveryMode = NONE). By setting spark.deploy.recoveryMode to ZOOKEEPER and setting spark.deploy.zookeeper.url to an appropriate ZooKeeper URL, ZooKeeper recovery mode is enabled. By setting spark.deploy.recoveryMode to FILESYSTEM and setting spark.deploy.recoveryDirectory to an appropriate directory accessible by the Master, we will keep the behavior of from d5a96fe. Additionally, places where a Master could be specificied by a spark:// url can now take comma-delimited lists to specify backup masters. Note that this is only used for registration of NEW Workers and application Clients. Once a Worker or Client has registered with the Master Leader, it is "in the system" and will never need to register again.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala34
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala420
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala84
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala90
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala45
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala228
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala46
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala53
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala203
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala42
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala136
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala85
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala175
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala7
30 files changed, 1673 insertions, 169 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index ff45e76105..f8e980fa5e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -156,7 +156,7 @@ class SparkContext(
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
- val SPARK_REGEX = """(spark://.*)""".r
+ val SPARK_REGEX = """spark://(.*)""".r
//Regular expression for connection to Mesos cluster
val MESOS_REGEX = """(mesos://.*)""".r
@@ -172,7 +172,8 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
+ val masterUrls = sparkUrl.split(",").map("spark://" + _)
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
scheduler
@@ -188,8 +189,8 @@ class SparkContext(
val scheduler = new ClusterScheduler(this)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
- val sparkUrl = localCluster.start()
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
+ val masterUrls = localCluster.start()
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 1cfff5e565..275331724a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -21,12 +21,14 @@ import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils
private[deploy] sealed trait DeployMessage extends Serializable
+/** Contains messages sent between Scheduler actor nodes. */
private[deploy] object DeployMessages {
// Worker to Master
@@ -52,17 +54,20 @@ private[deploy] object DeployMessages {
exitStatus: Option[Int])
extends DeployMessage
+ case class WorkerSchedulerStateResponse(id: String, executors: List[ExecutorDescription])
+
case class Heartbeat(workerId: String) extends DeployMessage
// Master to Worker
- case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
+ case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
- case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+ case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
case class LaunchExecutor(
+ masterUrl: String,
appId: String,
execId: Int,
appDesc: ApplicationDescription,
@@ -76,9 +81,11 @@ private[deploy] object DeployMessages {
case class RegisterApplication(appDescription: ApplicationDescription)
extends DeployMessage
+ case class MasterChangeAcknowledged(appId: String)
+
// Master to Client
- case class RegisteredApplication(appId: String) extends DeployMessage
+ case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
@@ -94,6 +101,10 @@ private[deploy] object DeployMessages {
case object StopClient
+ // Master to Worker & Client
+
+ case class MasterChanged(masterUrl: String, masterWebUiUrl: String)
+
// MasterWebUI To Master
case object RequestMasterState
@@ -101,7 +112,8 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
+ status: MasterState) {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
@@ -123,12 +135,7 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
- // Actor System to Master
-
- case object CheckForWorkerTimeOut
-
- case object RequestWebUIPort
-
- case class WebUIPortResponse(webUIBoundPort: Int)
+ // Actor System to Worker
+ case object SendHeartbeat
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
new file mode 100644
index 0000000000..2abf0b69dd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy
+
+/**
+ * Used to send state on-the-wire about Executors from Worker to Master.
+ * This state is sufficient for the Master to reconstruct its internal data structures during
+ * failover.
+ */
+private[spark] class ExecutorDescription(
+ val appId: String,
+ val execId: Int,
+ val cores: Int,
+ val state: ExecutorState.Value)
+ extends Serializable {
+
+ override def toString: String =
+ "ExecutorState(appId=%s, execId=%d, cores=%d, state=%s)".format(appId, execId, cores, state)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
new file mode 100644
index 0000000000..668032a3a2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala
@@ -0,0 +1,420 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.spark.deploy
+
+import java.io._
+import java.net.URL
+import java.util.concurrent.TimeoutException
+
+import scala.concurrent.{Await, future, promise}
+import scala.concurrent.duration._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.collection.mutable.ListBuffer
+import scala.sys.process._
+
+import net.liftweb.json.JsonParser
+
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.deploy.master.RecoveryState
+
+/**
+ * This suite tests the fault tolerance of the Spark standalone scheduler, mainly the Master.
+ * In order to mimic a real distributed cluster more closely, Docker is used.
+ * Execute using
+ * ./spark-class org.apache.spark.deploy.FaultToleranceTest
+ *
+ * Make sure that that the environment includes the following properties in SPARK_DAEMON_JAVA_OPTS:
+ * - spark.deploy.recoveryMode=ZOOKEEPER
+ * - spark.deploy.zookeeper.url=172.17.42.1:2181
+ * Note that 172.17.42.1 is the default docker ip for the host and 2181 is the default ZK port.
+ *
+ * Unfortunately, due to the Docker dependency this suite cannot be run automatically without a
+ * working installation of Docker. In addition to having Docker, the following are assumed:
+ * - Docker can run without sudo (see http://docs.docker.io/en/latest/use/basics/)
+ * - The docker images tagged spark-test-master and spark-test-worker are built from the
+ * docker/ directory. Run 'docker/spark-test/build' to generate these.
+ */
+private[spark] object FaultToleranceTest extends App with Logging {
+ val masters = ListBuffer[TestMasterInfo]()
+ val workers = ListBuffer[TestWorkerInfo]()
+ var sc: SparkContext = _
+
+ var numPassed = 0
+ var numFailed = 0
+
+ val sparkHome = System.getenv("SPARK_HOME")
+ assertTrue(sparkHome != null, "Run with a valid SPARK_HOME")
+
+ val containerSparkHome = "/opt/spark"
+ val dockerMountDir = "%s:%s".format(sparkHome, containerSparkHome)
+
+ System.setProperty("spark.driver.host", "172.17.42.1") // default docker host ip
+
+ def afterEach() {
+ if (sc != null) {
+ sc.stop()
+ sc = null
+ }
+ terminateCluster()
+ }
+
+ test("sanity-basic") {
+ addMasters(1)
+ addWorkers(1)
+ createClient()
+ assertValidClusterState()
+ }
+
+ test("sanity-many-masters") {
+ addMasters(3)
+ addWorkers(3)
+ createClient()
+ assertValidClusterState()
+ }
+
+ test("single-master-halt") {
+ addMasters(3)
+ addWorkers(2)
+ createClient()
+ assertValidClusterState()
+
+ killLeader()
+ delay(30 seconds)
+ assertValidClusterState()
+ createClient()
+ assertValidClusterState()
+ }
+
+ test("single-master-restart") {
+ addMasters(1)
+ addWorkers(2)
+ createClient()
+ assertValidClusterState()
+
+ killLeader()
+ addMasters(1)
+ delay(30 seconds)
+ assertValidClusterState()
+
+ killLeader()
+ addMasters(1)
+ delay(30 seconds)
+ assertValidClusterState()
+ }
+
+ test("cluster-failure") {
+ addMasters(2)
+ addWorkers(2)
+ createClient()
+ assertValidClusterState()
+
+ terminateCluster()
+ addMasters(2)
+ addWorkers(2)
+ assertValidClusterState()
+ }
+
+ test("all-but-standby-failure") {
+ addMasters(2)
+ addWorkers(2)
+ createClient()
+ assertValidClusterState()
+
+ killLeader()
+ workers.foreach(_.kill())
+ workers.clear()
+ delay(30 seconds)
+ addWorkers(2)
+ assertValidClusterState()
+ }
+
+ test("rolling-outage") {
+ addMasters(1)
+ delay()
+ addMasters(1)
+ delay()
+ addMasters(1)
+ addWorkers(2)
+ createClient()
+ assertValidClusterState()
+ assertTrue(getLeader == masters.head)
+
+ (1 to 3).foreach { _ =>
+ killLeader()
+ delay(30 seconds)
+ assertValidClusterState()
+ assertTrue(getLeader == masters.head)
+ addMasters(1)
+ }
+ }
+
+ def test(name: String)(fn: => Unit) {
+ try {
+ fn
+ numPassed += 1
+ logInfo("Passed: " + name)
+ } catch {
+ case e: Exception =>
+ numFailed += 1
+ logError("FAILED: " + name, e)
+ }
+ afterEach()
+ }
+
+ def addMasters(num: Int) {
+ (1 to num).foreach { _ => masters += SparkDocker.startMaster(dockerMountDir) }
+ }
+
+ def addWorkers(num: Int) {
+ val masterUrls = getMasterUrls(masters)
+ (1 to num).foreach { _ => workers += SparkDocker.startWorker(dockerMountDir, masterUrls) }
+ }
+
+ /** Creates a SparkContext, which constructs a Client to interact with our cluster. */
+ def createClient() = {
+ if (sc != null) { sc.stop() }
+ // Counter-hack: Because of a hack in SparkEnv#createFromSystemProperties() that changes this
+ // property, we need to reset it.
+ System.setProperty("spark.driver.port", "0")
+ sc = new SparkContext(getMasterUrls(masters), "fault-tolerance", containerSparkHome)
+ }
+
+ def getMasterUrls(masters: Seq[TestMasterInfo]): String = {
+ "spark://" + masters.map(master => master.ip + ":7077").mkString(",")
+ }
+
+ def getLeader: TestMasterInfo = {
+ val leaders = masters.filter(_.state == RecoveryState.ALIVE)
+ assertTrue(leaders.size == 1)
+ leaders(0)
+ }
+
+ def killLeader(): Unit = {
+ masters.foreach(_.readState())
+ val leader = getLeader
+ masters -= leader
+ leader.kill()
+ }
+
+ def delay(secs: Duration = 5.seconds) = Thread.sleep(secs.toMillis)
+
+ def terminateCluster() {
+ masters.foreach(_.kill())
+ workers.foreach(_.kill())
+ masters.clear()
+ workers.clear()
+ }
+
+ /** This includes Client retry logic, so it may take a while if the cluster is recovering. */
+ def assertUsable() = {
+ val f = future {
+ try {
+ val res = sc.parallelize(0 until 10).collect()
+ assertTrue(res.toList == (0 until 10))
+ true
+ } catch {
+ case e: Exception =>
+ logError("assertUsable() had exception", e)
+ e.printStackTrace()
+ false
+ }
+ }
+
+ // Avoid waiting indefinitely (e.g., we could register but get no executors).
+ assertTrue(Await.result(f, 120 seconds))
+ }
+
+ /**
+ * Asserts that the cluster is usable and that the expected masters and workers
+ * are all alive in a proper configuration (e.g., only one leader).
+ */
+ def assertValidClusterState() = {
+ assertUsable()
+ var numAlive = 0
+ var numStandby = 0
+ var numLiveApps = 0
+ var liveWorkerIPs: Seq[String] = List()
+
+ def stateValid(): Boolean = {
+ (workers.map(_.ip) -- liveWorkerIPs).isEmpty &&
+ numAlive == 1 && numStandby == masters.size - 1 && numLiveApps >= 1
+ }
+
+ val f = future {
+ try {
+ while (!stateValid()) {
+ Thread.sleep(1000)
+
+ numAlive = 0
+ numStandby = 0
+ numLiveApps = 0
+
+ masters.foreach(_.readState())
+
+ for (master <- masters) {
+ master.state match {
+ case RecoveryState.ALIVE =>
+ numAlive += 1
+ liveWorkerIPs = master.liveWorkerIPs
+ case RecoveryState.STANDBY =>
+ numStandby += 1
+ case _ => // ignore
+ }
+
+ numLiveApps += master.numLiveApps
+ }
+ }
+ true
+ } catch {
+ case e: Exception =>
+ logError("assertValidClusterState() had exception", e)
+ false
+ }
+ }
+
+ try {
+ assertTrue(Await.result(f, 120 seconds))
+ } catch {
+ case e: TimeoutException =>
+ logError("Master states: " + masters.map(_.state))
+ logError("Num apps: " + numLiveApps)
+ logError("IPs expected: " + workers.map(_.ip) + " / found: " + liveWorkerIPs)
+ throw new RuntimeException("Failed to get into acceptable cluster state after 2 min.", e)
+ }
+ }
+
+ def assertTrue(bool: Boolean, message: String = "") {
+ if (!bool) {
+ throw new IllegalStateException("Assertion failed: " + message)
+ }
+ }
+
+ logInfo("Ran %s tests, %s passed and %s failed".format(numPassed+numFailed, numPassed, numFailed))
+}
+
+private[spark] class TestMasterInfo(val ip: String, val dockerId: DockerId, val logFile: File)
+ extends Logging {
+
+ implicit val formats = net.liftweb.json.DefaultFormats
+ var state: RecoveryState.Value = _
+ var liveWorkerIPs: List[String] = _
+ var numLiveApps = 0
+
+ logDebug("Created master: " + this)
+
+ def readState() {
+ try {
+ val masterStream = new InputStreamReader(new URL("http://%s:8080/json".format(ip)).openStream)
+ val json = JsonParser.parse(masterStream, closeAutomatically = true)
+
+ val workers = json \ "workers"
+ val liveWorkers = workers.children.filter(w => (w \ "state").extract[String] == "ALIVE")
+ liveWorkerIPs = liveWorkers.map(w => (w \ "host").extract[String])
+
+ numLiveApps = (json \ "activeapps").children.size
+
+ val status = json \\ "status"
+ val stateString = status.extract[String]
+ state = RecoveryState.values.filter(state => state.toString == stateString).head
+ } catch {
+ case e: Exception =>
+ // ignore, no state update
+ logWarning("Exception", e)
+ }
+ }
+
+ def kill() { Docker.kill(dockerId) }
+
+ override def toString: String =
+ "[ip=%s, id=%s, logFile=%s, state=%s]".
+ format(ip, dockerId.id, logFile.getAbsolutePath, state)
+}
+
+private[spark] class TestWorkerInfo(val ip: String, val dockerId: DockerId, val logFile: File)
+ extends Logging {
+
+ implicit val formats = net.liftweb.json.DefaultFormats
+
+ logDebug("Created worker: " + this)
+
+ def kill() { Docker.kill(dockerId) }
+
+ override def toString: String =
+ "[ip=%s, id=%s, logFile=%s]".format(ip, dockerId, logFile.getAbsolutePath)
+}
+
+private[spark] object SparkDocker {
+ def startMaster(mountDir: String): TestMasterInfo = {
+ val cmd = Docker.makeRunCmd("spark-test-master", mountDir = mountDir)
+ val (ip, id, outFile) = startNode(cmd)
+ new TestMasterInfo(ip, id, outFile)
+ }
+
+ def startWorker(mountDir: String, masters: String): TestWorkerInfo = {
+ val cmd = Docker.makeRunCmd("spark-test-worker", args = masters, mountDir = mountDir)
+ val (ip, id, outFile) = startNode(cmd)
+ new TestWorkerInfo(ip, id, outFile)
+ }
+
+ private def startNode(dockerCmd: ProcessBuilder) : (String, DockerId, File) = {
+ val ipPromise = promise[String]()
+ val outFile = File.createTempFile("fault-tolerance-test", "")
+ outFile.deleteOnExit()
+ val outStream: FileWriter = new FileWriter(outFile)
+ def findIpAndLog(line: String): Unit = {
+ if (line.startsWith("CONTAINER_IP=")) {
+ val ip = line.split("=")(1)
+ ipPromise.success(ip)
+ }
+
+ outStream.write(line + "\n")
+ outStream.flush()
+ }
+
+ dockerCmd.run(ProcessLogger(findIpAndLog _))
+ val ip = Await.result(ipPromise.future, 30 seconds)
+ val dockerId = Docker.getLastProcessId
+ (ip, dockerId, outFile)
+ }
+}
+
+private[spark] class DockerId(val id: String) {
+ override def toString = id
+}
+
+private[spark] object Docker extends Logging {
+ def makeRunCmd(imageTag: String, args: String = "", mountDir: String = ""): ProcessBuilder = {
+ val mountCmd = if (mountDir != "") { " -v " + mountDir } else ""
+
+ val cmd = "docker run %s %s %s".format(mountCmd, imageTag, args)
+ logDebug("Run command: " + cmd)
+ cmd
+ }
+
+ def kill(dockerId: DockerId) : Unit = {
+ "docker kill %s".format(dockerId.id).!
+ }
+
+ def getLastProcessId: DockerId = {
+ var id: String = null
+ "docker ps -l -q".!(ProcessLogger(line => id = line))
+ new DockerId(id)
+ }
+} \ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 04d01c169d..e607b8c6f4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -72,7 +72,8 @@ private[spark] object JsonProtocol {
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
+ ("status" -> obj.status.toString)
}
def writeWorkerState(obj: WorkerStateResponse) = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 10161c8204..308a2bfa22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -39,22 +39,23 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
- def start(): String = {
+ def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
+ val masters = Array(masterUrl)
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
- memoryPerWorker, masterUrl, null, Some(workerNum))
+ memoryPerWorker, masters, null, Some(workerNum))
workerActorSystems += workerSystem
}
- return masterUrl
+ return masters
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index a342dd724a..77422f61ec 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,6 +23,7 @@ import akka.actor._
import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Duration
+import akka.util.duration._
import akka.remote.RemoteClientDisconnected
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
@@ -37,41 +38,81 @@ import org.apache.spark.deploy.master.Master
/**
* The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
* and a listener for cluster events, and calls back the listener when various events occur.
+ *
+ * @param masterUrls Each url should look like spark://host:port.
*/
private[spark] class Client(
actorSystem: ActorSystem,
- masterUrl: String,
+ masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: ClientListener)
extends Logging {
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
+
var actor: ActorRef = null
var appId: String = null
+ var registered = false
+ var activeMasterUrl: String = null
class ClientActor extends Actor with Logging {
var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
+ var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
- logInfo("Connecting to master " + masterUrl)
try {
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- masterAddress = master.path.address
- master ! RegisterApplication(appDescription)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ registerWithMaster()
} catch {
case e: Exception =>
- logError("Failed to connect to master", e)
+ logWarning("Failed to connect to master", e)
markDisconnected()
context.stop(self)
}
}
+ def tryRegisterAllMasters() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
+ val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterApplication(appDescription)
+ }
+ }
+
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ markDead()
+ } else {
+ tryRegisterAllMasters()
+ }
+ }
+ retryTimer // start timer
+ }
+
+ def changeMaster(url: String) {
+ activeMasterUrl = url
+ master = context.actorFor(Master.toAkkaUrl(url))
+ masterAddress = master.path.address
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ }
+
override def receive = {
- case RegisteredApplication(appId_) =>
+ case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
+ registered = true
+ changeMaster(masterUrl)
listener.connected(appId)
case ApplicationRemoved(message) =>
@@ -92,23 +133,27 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
+ context.unwatch(master)
+ changeMaster(masterUrl)
+ alreadyDisconnected = false
+ sender ! MasterChangeAcknowledged(appId)
+
case Terminated(actor_) if actor_ == master =>
- logError("Connection to master failed; stopping client")
+ logWarning("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case RemoteClientDisconnected(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logWarning("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case RemoteClientShutdown(transport, address) if address == masterAddress =>
- logError("Connection to master failed; stopping client")
+ logWarning("Connection to master failed; waiting for master to reconnect...")
markDisconnected()
- context.stop(self)
case StopClient =>
- markDisconnected()
+ markDead()
sender ! true
context.stop(self)
}
@@ -122,6 +167,13 @@ private[spark] class Client(
alreadyDisconnected = true
}
}
+
+ def markDead() {
+ if (!alreadyDead) {
+ listener.dead()
+ alreadyDead = true
+ }
+ }
}
def start() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
index 4605368c11..be7a11bd15 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
@@ -27,8 +27,12 @@ package org.apache.spark.deploy.client
private[spark] trait ClientListener {
def connected(appId: String): Unit
+ /** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit
+ /** Dead means that we couldn't find any Masters to connect to, and have given up. */
+ def dead(): Unit
+
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index d5e9a0e095..5b62d3ba6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -33,6 +33,11 @@ private[spark] object TestClient {
System.exit(0)
}
+ def dead() {
+ logInfo("Could not connect to master")
+ System.exit(0)
+ }
+
def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
@@ -44,7 +49,7 @@ private[spark] object TestClient {
val desc = new ApplicationDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, url, desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bd5327627a..5150b7c7de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -29,23 +29,46 @@ private[spark] class ApplicationInfo(
val submitDate: Date,
val driver: ActorRef,
val appUiUrl: String)
-{
- var state = ApplicationState.WAITING
- var executors = new mutable.HashMap[Int, ExecutorInfo]
- var coresGranted = 0
- var endTime = -1L
- val appSource = new ApplicationSource(this)
-
- private var nextExecutorId = 0
-
- def newExecutorId(): Int = {
- val id = nextExecutorId
- nextExecutorId += 1
- id
+ extends Serializable {
+
+ @transient var state: ApplicationState.Value = _
+ @transient var executors: mutable.HashMap[Int, ExecutorInfo] = _
+ @transient var coresGranted: Int = _
+ @transient var endTime: Long = _
+ @transient var appSource: ApplicationSource = _
+
+ @transient private var nextExecutorId: Int = _
+
+ init()
+
+ private def readObject(in: java.io.ObjectInputStream) : Unit = {
+ in.defaultReadObject()
+ init()
+ }
+
+ private def init() {
+ state = ApplicationState.WAITING
+ executors = new mutable.HashMap[Int, ExecutorInfo]
+ coresGranted = 0
+ endTime = -1L
+ appSource = new ApplicationSource(this)
+ nextExecutorId = 0
+ }
+
+ private def newExecutorId(useID: Option[Int] = None): Int = {
+ useID match {
+ case Some(id) =>
+ nextExecutorId = math.max(nextExecutorId, id + 1)
+ id
+ case None =>
+ val id = nextExecutorId
+ nextExecutorId += 1
+ id
+ }
}
- def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
- val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+ def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): ExecutorInfo = {
+ val exec = new ExecutorInfo(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
coresGranted += cores
exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
index 7e804223cf..fedf879eff 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationState.scala
@@ -18,11 +18,11 @@
package org.apache.spark.deploy.master
private[spark] object ApplicationState
- extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+ extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED", "UNKNOWN") {
type ApplicationState = Value
- val WAITING, RUNNING, FINISHED, FAILED = Value
+ val WAITING, RUNNING, FINISHED, FAILED, UNKNOWN = Value
val MAX_NUM_RETRY = 10
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
index cf384a985e..76db61dd61 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
private[spark] class ExecutorInfo(
val id: Int,
@@ -28,5 +28,10 @@ private[spark] class ExecutorInfo(
var state = ExecutorState.LAUNCHING
+ /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */
+ def copyState(execDesc: ExecutorDescription) {
+ state = execDesc.state
+ }
+
def fullId: String = application.id + "/" + id
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
new file mode 100644
index 0000000000..c0849ef324
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import java.io._
+
+import scala.Serializable
+
+import akka.serialization.Serialization
+import org.apache.spark.Logging
+
+/**
+ * Stores data in a single on-disk directory with one file per application and worker.
+ * Files are deleted when applications and workers are removed.
+ *
+ * @param dir Directory to store files. Created if non-existent (but not recursively).
+ * @param serialization Used to serialize our objects.
+ */
+private[spark] class FileSystemPersistenceEngine(
+ val dir: String,
+ val serialization: Serialization)
+ extends PersistenceEngine with Logging {
+
+ new File(dir).mkdir()
+
+ override def addApplication(app: ApplicationInfo) {
+ val appFile = new File(dir + File.separator + "app_" + app.id)
+ serializeIntoFile(appFile, app)
+ }
+
+ override def removeApplication(app: ApplicationInfo) {
+ new File(dir + File.separator + "app_" + app.id).delete()
+ }
+
+ override def addWorker(worker: WorkerInfo) {
+ val workerFile = new File(dir + File.separator + "worker_" + worker.id)
+ serializeIntoFile(workerFile, worker)
+ }
+
+ override def removeWorker(worker: WorkerInfo) {
+ new File(dir + File.separator + "worker_" + worker.id).delete()
+ }
+
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
+ val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+ (apps, workers)
+ }
+
+ private def serializeIntoFile(file: File, value: Serializable) {
+ val created = file.createNewFile()
+ if (!created) { throw new IllegalStateException("Could not create file: " + file) }
+
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+
+ val out = new FileOutputStream(file)
+ out.write(serialized)
+ out.close()
+ }
+
+ def deserializeFromFile[T <: Serializable](file: File)(implicit m: Manifest[T]): T = {
+ val fileData = new Array[Byte](file.length().asInstanceOf[Int])
+ val dis = new DataInputStream(new FileInputStream(file))
+ dis.readFully(fileData)
+ dis.close()
+
+ val clazz = m.erasure.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
+ serializer.fromBinary(fileData).asInstanceOf[T]
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
new file mode 100644
index 0000000000..f25a1ad3bf
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.{Actor, ActorRef}
+
+import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
+
+/**
+ * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
+ * is the only Master serving requests.
+ * In addition to the API provided, the LeaderElectionAgent will use of the following messages
+ * to inform the Master of leader changes:
+ * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
+ * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
+ */
+private[spark] trait LeaderElectionAgent extends Actor {
+ val masterActor: ActorRef
+}
+
+/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
+private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
+ override def preStart() {
+ masterActor ! ElectedLeader
+ }
+
+ override def receive = {
+ case _ =>
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index bde59905bc..cd916672ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -27,24 +27,26 @@ import akka.actor.Terminated
import akka.dispatch.Await
import akka.pattern.ask
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientDisconnected, RemoteClientShutdown}
+import akka.serialization.SerializationExtension
import akka.util.duration._
-import akka.util.Timeout
+import akka.util.{Duration, Timeout}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
-import org.apache.spark.util.{Utils, AkkaUtils}
-import akka.util.{Duration, Timeout}
-
+import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
-
+ val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
+
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
val idToWorker = new HashMap[String, WorkerInfo]
@@ -74,51 +76,116 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (envVar != null) envVar else host
}
+ val masterUrl = "spark://" + host + ":" + port
+ var masterWebUiUrl: String = _
+
+ var state = RecoveryState.STANDBY
+
+ var persistenceEngine: PersistenceEngine = _
+
+ var leaderElectionAgent: ActorRef = _
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
- logInfo("Starting Spark master at spark://" + host + ":" + port)
+ logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
+ masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
+
+ persistenceEngine = RECOVERY_MODE match {
+ case "ZOOKEEPER" =>
+ logInfo("Persisting recovery state to ZooKeeper")
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ case "FILESYSTEM" =>
+ logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
+ new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
+ case _ =>
+ new BlackHolePersistenceEngine()
+ }
+
+ leaderElectionAgent = context.actorOf(Props(
+ RECOVERY_MODE match {
+ case "ZOOKEEPER" =>
+ new ZooKeeperLeaderElectionAgent(self, masterUrl)
+ case _ =>
+ new MonarchyLeaderAgent(self)
+ }))
+ }
+
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message) // calls postStop()!
+ logError("Master actor restarted due to exception", reason)
}
override def postStop() {
webUi.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
+ persistenceEngine.close()
+ context.stop(leaderElectionAgent)
}
override def receive = {
- case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
+ case ElectedLeader => {
+ val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+ state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+ RecoveryState.ALIVE
+ else
+ RecoveryState.RECOVERING
+
+ logInfo("I have been elected leader! New state: " + state)
+
+ if (state == RecoveryState.RECOVERING) {
+ beginRecovery(storedApps, storedWorkers)
+ context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
+ }
+ }
+
+ case RevokedLeadership => {
+ logError("Leadership has been revoked -- master shutting down.")
+ System.exit(0)
+ }
+
+ case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
- if (idToWorker.contains(id)) {
+ if (state == RecoveryState.STANDBY) {
+ // ignore, don't send response
+ } else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+ registerWorker(worker)
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+ persistenceEngine.addWorker(worker)
+ sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
}
case RegisterApplication(description) => {
- logInfo("Registering app " + description.name)
- val app = addApplication(description, sender)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- waitingApps += app
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredApplication(app.id)
- schedule()
+ if (state == RecoveryState.STANDBY) {
+ // ignore, don't send response
+ } else {
+ logInfo("Registering app " + description.name)
+ val app = createApplication(description, sender)
+ registerApplication(app)
+ logInfo("Registered app " + description.name + " with ID " + app.id)
+ context.watch(sender) // This doesn't work with remote actors but helps for testing
+ persistenceEngine.addApplication(app)
+ sender ! RegisteredApplication(app.id, masterUrl)
+ schedule()
+ }
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
@@ -158,27 +225,63 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ case MasterChangeAcknowledged(appId) => {
+ idToApp.get(appId) match {
+ case Some(app) =>
+ logInfo("Application has been re-registered: " + appId)
+ app.state = ApplicationState.WAITING
+ case None =>
+ logWarning("Master change ack from unknown app: " + appId)
+ }
+
+ if (canCompleteRecovery) { completeRecovery() }
+ }
+
+ case WorkerSchedulerStateResponse(workerId, executors) => {
+ idToWorker.get(workerId) match {
+ case Some(worker) =>
+ logInfo("Worker has been re-registered: " + workerId)
+ worker.state = WorkerState.ALIVE
+
+ val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
+ for (exec <- validExecutors) {
+ val app = idToApp.get(exec.appId).get
+ val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
+ worker.addExecutor(execInfo)
+ execInfo.copyState(exec)
+ }
+ case None =>
+ logWarning("Scheduler state from unknown worker: " + workerId)
+ }
+
+ if (canCompleteRecovery) { completeRecovery() }
+ }
+
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+ state)
}
case CheckForWorkerTimeOut => {
@@ -190,6 +293,50 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
+ def canCompleteRecovery =
+ workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
+ apps.count(_.state == ApplicationState.UNKNOWN) == 0
+
+ def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
+ for (app <- storedApps) {
+ logInfo("Trying to recover app: " + app.id)
+ try {
+ registerApplication(app)
+ app.state = ApplicationState.UNKNOWN
+ app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
+ }
+ }
+
+ for (worker <- storedWorkers) {
+ logInfo("Trying to recover worker: " + worker.id)
+ try {
+ registerWorker(worker)
+ worker.state = WorkerState.UNKNOWN
+ worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
+ }
+ }
+ }
+
+ def completeRecovery() {
+ // Ensure "only-once" recovery semantics using a short synchronization period.
+ synchronized {
+ if (state != RecoveryState.RECOVERING) { return }
+ state = RecoveryState.COMPLETING_RECOVERY
+ }
+
+ // Kill off any workers and apps that didn't respond to us.
+ workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
+ apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
+
+ state = RecoveryState.ALIVE
+ schedule()
+ logInfo("Recovery complete - resuming operations!")
+ }
+
/**
* Can an app use the given worker? True if the worker has enough memory and we haven't already
* launched an executor for the app on it (right now the standalone backend doesn't like having
@@ -204,6 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* every time a new app joins or resource availability changes.
*/
def schedule() {
+ if (state != RecoveryState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
@@ -251,14 +399,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(
+ worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
- def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int,
- publicAddress: String): WorkerInfo = {
+ def registerWorker(worker: WorkerInfo): Unit = {
// There may be one or more refs to dead workers on this same node (w/ different ID's),
// remove them.
workers.filter { w =>
@@ -266,12 +413,17 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}.foreach { w =>
workers -= w
}
- val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
+
+ val workerAddress = worker.actor.path.address
+ if (addressToWorker.contains(workerAddress)) {
+ logInfo("Attempted to re-register worker at same address: " + workerAddress)
+ return
+ }
+
workers += worker
idToWorker(worker.id) = worker
- actorToWorker(sender) = worker
- addressToWorker(sender.path.address) = worker
- worker
+ actorToWorker(worker.actor) = worker
+ addressToWorker(workerAddress) = worker
}
def removeWorker(worker: WorkerInfo) {
@@ -286,25 +438,36 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
exec.id, ExecutorState.LOST, Some("worker lost"), None)
exec.application.removeExecutor(exec)
}
+ persistenceEngine.removeWorker(worker)
}
- def addApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
+ def createApplication(desc: ApplicationDescription, driver: ActorRef): ApplicationInfo = {
val now = System.currentTimeMillis()
val date = new Date(now)
- val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
+ }
+
+ def registerApplication(app: ApplicationInfo): Unit = {
+ val appAddress = app.driver.path.address
+ if (addressToWorker.contains(appAddress)) {
+ logInfo("Attempted to re-register application at same address: " + appAddress)
+ return
+ }
+
applicationMetricsSystem.registerSource(app.appSource)
apps += app
idToApp(app.id) = app
- actorToApp(driver) = app
- addressToApp(driver.path.address) = app
+ actorToApp(app.driver) = app
+ addressToApp(appAddress) = app
if (firstApp == None) {
firstApp = Some(app)
}
+ // TODO: What is firstApp?? Can we remove it?
val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
- if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= desc.memoryPerSlave)) {
+ if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
}
- app
+ waitingApps += app
}
def finishApplication(app: ApplicationInfo) {
@@ -329,13 +492,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
+ exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
exec.state = ExecutorState.KILLED
}
app.markFinished(state)
if (state != ApplicationState.FINISHED) {
app.driver ! ApplicationRemoved(state.toString)
}
+ persistenceEngine.removeApplication(app)
schedule()
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
new file mode 100644
index 0000000000..74a9f8cd82
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+sealed trait MasterMessages extends Serializable
+
+/** Contains messages seen only by the Master and its associated entities. */
+private[master] object MasterMessages {
+
+ // LeaderElectionAgent to Master
+
+ case object ElectedLeader
+
+ case object RevokedLeadership
+
+ // Actor System to LeaderElectionAgent
+
+ case object CheckLeader
+
+ // Actor System to Master
+
+ case object CheckForWorkerTimeOut
+
+ case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+
+ case object CompleteRecovery
+
+ case object RequestWebUIPort
+
+ case class WebUIPortResponse(webUIBoundPort: Int)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
new file mode 100644
index 0000000000..94b986caf2
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+/**
+ * Allows Master to persist any state that is necessary in order to recover from a failure.
+ * The following semantics are required:
+ * - addApplication and addWorker are called before completing registration of a new app/worker.
+ * - removeApplication and removeWorker are called at any time.
+ * Given these two requirements, we will have all apps and workers persisted, but
+ * we might not have yet deleted apps or workers that finished (so their liveness must be verified
+ * during recovery).
+ */
+private[spark] trait PersistenceEngine {
+ def addApplication(app: ApplicationInfo)
+
+ def removeApplication(app: ApplicationInfo)
+
+ def addWorker(worker: WorkerInfo)
+
+ def removeWorker(worker: WorkerInfo)
+
+ /**
+ * Returns the persisted data sorted by their respective ids (which implies that they're
+ * sorted by time of creation).
+ */
+ def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+
+ def close() {}
+}
+
+private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
+ override def addApplication(app: ApplicationInfo) {}
+ override def removeApplication(app: ApplicationInfo) {}
+ override def addWorker(worker: WorkerInfo) {}
+ override def removeWorker(worker: WorkerInfo) {}
+ override def readPersistedData() = (Nil, Nil)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
new file mode 100644
index 0000000000..b91be821f0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+private[spark] object RecoveryState
+ extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
+
+ type MasterState = Value
+
+ val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
new file mode 100644
index 0000000000..81e15c534f
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ops._
+
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+/**
+ * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
+ * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be
+ * created. If ZooKeeper remains down after several retries, the given
+ * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be
+ * informed via zkDown().
+ *
+ * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
+ * times or a semantic exception is thrown (e.g.., "node already exists").
+ */
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
+ val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+
+ val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
+ val ZK_TIMEOUT_MILLIS = 30000
+ val RETRY_WAIT_MILLIS = 5000
+ val ZK_CHECK_PERIOD_MILLIS = 10000
+ val MAX_RECONNECT_ATTEMPTS = 3
+
+ private var zk: ZooKeeper = _
+
+ private val watcher = new ZooKeeperWatcher()
+ private var reconnectAttempts = 0
+ private var closed = false
+
+ /** Connect to ZooKeeper to start the session. Must be called before anything else. */
+ def connect() {
+ connectToZooKeeper()
+
+ new Thread() {
+ override def run() = sessionMonitorThread()
+ }.start()
+ }
+
+ def sessionMonitorThread(): Unit = {
+ while (!closed) {
+ Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
+ if (zk.getState != ZooKeeper.States.CONNECTED) {
+ reconnectAttempts += 1
+ val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts
+ if (attemptsLeft <= 0) {
+ logError("Could not connect to ZooKeeper: system failure")
+ zkWatcher.zkDown()
+ close()
+ } else {
+ logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...")
+ connectToZooKeeper()
+ }
+ }
+ }
+ }
+
+ def close() {
+ if (!closed && zk != null) { zk.close() }
+ closed = true
+ }
+
+ private def connectToZooKeeper() {
+ if (zk != null) zk.close()
+ zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher)
+ }
+
+ /**
+ * Attempts to maintain a live ZooKeeper exception despite (very) transient failures.
+ * Mainly useful for handling the natural ZooKeeper session expiration.
+ */
+ private class ZooKeeperWatcher extends Watcher {
+ def process(event: WatchedEvent) {
+ if (closed) { return }
+
+ event.getState match {
+ case KeeperState.SyncConnected =>
+ reconnectAttempts = 0
+ zkWatcher.zkSessionCreated()
+ case KeeperState.Expired =>
+ connectToZooKeeper()
+ case KeeperState.Disconnected =>
+ logWarning("ZooKeeper disconnected, will retry...")
+ }
+ }
+ }
+
+ def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = {
+ retry {
+ zk.create(path, bytes, ZK_ACL, createMode)
+ }
+ }
+
+ def exists(path: String, watcher: Watcher = null): Stat = {
+ retry {
+ zk.exists(path, watcher)
+ }
+ }
+
+ def getChildren(path: String, watcher: Watcher = null): List[String] = {
+ retry {
+ zk.getChildren(path, watcher).toList
+ }
+ }
+
+ def getData(path: String): Array[Byte] = {
+ retry {
+ zk.getData(path, false, null)
+ }
+ }
+
+ def delete(path: String, version: Int = -1): Unit = {
+ retry {
+ zk.delete(path, version)
+ }
+ }
+
+ /**
+ * Creates the given directory (non-recursively) if it doesn't exist.
+ * All znodes are created in PERSISTENT mode with no data.
+ */
+ def mkdir(path: String) {
+ if (exists(path) == null) {
+ try {
+ create(path, "".getBytes, CreateMode.PERSISTENT)
+ } catch {
+ case e: Exception =>
+ // If the exception caused the directory not to be created, bubble it up,
+ // otherwise ignore it.
+ if (exists(path) == null) { throw e }
+ }
+ }
+ }
+
+ /**
+ * Recursively creates all directories up to the given one.
+ * All znodes are created in PERSISTENT mode with no data.
+ */
+ def mkdirRecursive(path: String) {
+ var fullDir = ""
+ for (dentry <- path.split("/").tail) {
+ fullDir += "/" + dentry
+ mkdir(fullDir)
+ }
+ }
+
+ /**
+ * Retries the given function up to 3 times. The assumption is that failure is transient,
+ * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist),
+ * in which case the exception will be thrown without retries.
+ *
+ * @param fn Block to execute, possibly multiple times.
+ */
+ def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
+ try {
+ fn
+ } catch {
+ case e: KeeperException.NoNodeException => throw e
+ case e: KeeperException.NodeExistsException => throw e
+ case e if n > 0 =>
+ logError("ZooKeeper exception, " + n + " more retries...", e)
+ Thread.sleep(RETRY_WAIT_MILLIS)
+ retry(fn, n-1)
+ }
+ }
+}
+
+trait SparkZooKeeperWatcher {
+ /**
+ * Called whenever a ZK session is created --
+ * this will occur when we create our first session as well as each time
+ * the session expires or errors out.
+ */
+ def zkSessionCreated()
+
+ /**
+ * Called if ZK appears to be completely down (i.e., not just a transient error).
+ * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead.
+ */
+ def zkDown()
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 6219f11f2a..e05f587b58 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -22,28 +22,44 @@ import scala.collection.mutable
import org.apache.spark.util.Utils
private[spark] class WorkerInfo(
- val id: String,
- val host: String,
- val port: Int,
- val cores: Int,
- val memory: Int,
- val actor: ActorRef,
- val webUiPort: Int,
- val publicAddress: String) {
+ val id: String,
+ val host: String,
+ val port: Int,
+ val cores: Int,
+ val memory: Int,
+ val actor: ActorRef,
+ val webUiPort: Int,
+ val publicAddress: String)
+ extends Serializable {
Utils.checkHost(host, "Expected hostname")
assert (port > 0)
- var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
- var state: WorkerState.Value = WorkerState.ALIVE
- var coresUsed = 0
- var memoryUsed = 0
+ @transient var executors: mutable.HashMap[String, ExecutorInfo] = _ // fullId => info
+ @transient var state: WorkerState.Value = _
+ @transient var coresUsed: Int = _
+ @transient var memoryUsed: Int = _
- var lastHeartbeat = System.currentTimeMillis()
+ @transient var lastHeartbeat: Long = _
+
+ init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
+ private def readObject(in: java.io.ObjectInputStream) : Unit = {
+ in.defaultReadObject()
+ init()
+ }
+
+ private def init() {
+ executors = new mutable.HashMap
+ state = WorkerState.ALIVE
+ coresUsed = 0
+ memoryUsed = 0
+ lastHeartbeat = System.currentTimeMillis()
+ }
+
def hostPort: String = {
assert (port > 0)
host + ":" + port
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
index b5ee6dca79..c8d34f25e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerState.scala
@@ -17,8 +17,10 @@
package org.apache.spark.deploy.master
-private[spark] object WorkerState extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED") {
+private[spark] object WorkerState
+ extends Enumeration("ALIVE", "DEAD", "DECOMMISSIONED", "UNKNOWN") {
+
type WorkerState = Value
- val ALIVE, DEAD, DECOMMISSIONED = Value
+ val ALIVE, DEAD, DECOMMISSIONED, UNKNOWN = Value
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
new file mode 100644
index 0000000000..7809013e83
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import akka.actor.ActorRef
+import org.apache.zookeeper._
+import org.apache.zookeeper.Watcher.Event.EventType
+
+import org.apache.spark.deploy.master.MasterMessages._
+import org.apache.spark.Logging
+
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+ extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
+
+ val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+
+ private val watcher = new ZooKeeperWatcher()
+ private val zk = new SparkZooKeeperSession(this)
+ private var status = LeadershipStatus.NOT_LEADER
+ private var myLeaderFile: String = _
+ private var leaderUrl: String = _
+
+ override def preStart() {
+ logInfo("Starting ZooKeeper LeaderElection agent")
+ zk.connect()
+ }
+
+ override def zkSessionCreated() {
+ synchronized {
+ zk.mkdirRecursive(WORKING_DIR)
+ myLeaderFile =
+ zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
+ self ! CheckLeader
+ }
+ }
+
+ override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
+ logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
+ Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+ super.preRestart(reason, message)
+ }
+
+ override def zkDown() {
+ logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
+ System.exit(1)
+ }
+
+ override def postStop() {
+ zk.close()
+ }
+
+ override def receive = {
+ case CheckLeader => checkLeader()
+ }
+
+ private class ZooKeeperWatcher extends Watcher {
+ def process(event: WatchedEvent) {
+ if (event.getType == EventType.NodeDeleted) {
+ logInfo("Leader file disappeared, a master is down!")
+ self ! CheckLeader
+ }
+ }
+ }
+
+ /** Uses ZK leader election. Navigates several ZK potholes along the way. */
+ def checkLeader() {
+ val masters = zk.getChildren(WORKING_DIR).toList
+ val leader = masters.sorted.head
+ val leaderFile = WORKING_DIR + "/" + leader
+
+ // Setup a watch for the current leader.
+ zk.exists(leaderFile, watcher)
+
+ try {
+ leaderUrl = new String(zk.getData(leaderFile))
+ } catch {
+ // A NoNodeException may be thrown if old leader died since the start of this method call.
+ // This is fine -- just check again, since we're guaranteed to see the new values.
+ case e: KeeperException.NoNodeException =>
+ logInfo("Leader disappeared while reading it -- finding next leader")
+ checkLeader()
+ return
+ }
+
+ // Synchronization used to ensure no interleaving between the creation of a new session and the
+ // checking of a leader, which could cause us to delete our real leader file erroneously.
+ synchronized {
+ val isLeader = myLeaderFile == leaderFile
+ if (!isLeader && leaderUrl == masterUrl) {
+ // We found a different master file pointing to this process.
+ // This can happen in the following two cases:
+ // (1) The master process was restarted on the same node.
+ // (2) The ZK server died between creating the node and returning the name of the node.
+ // For this case, we will end up creating a second file, and MUST explicitly delete the
+ // first one, since our ZK session is still open.
+ // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
+ // leader changes.
+ assert(leaderFile < myLeaderFile)
+ logWarning("Cleaning up old ZK master election file that points to this master.")
+ zk.delete(leaderFile)
+ } else {
+ updateLeadershipStatus(isLeader)
+ }
+ }
+ }
+
+ def updateLeadershipStatus(isLeader: Boolean) {
+ if (isLeader && status == LeadershipStatus.NOT_LEADER) {
+ status = LeadershipStatus.LEADER
+ masterActor ! ElectedLeader
+ } else if (!isLeader && status == LeadershipStatus.LEADER) {
+ status = LeadershipStatus.NOT_LEADER
+ masterActor ! RevokedLeadership
+ }
+ }
+
+ private object LeadershipStatus extends Enumeration {
+ type LeadershipStatus = Value
+ val LEADER, NOT_LEADER = Value
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
new file mode 100644
index 0000000000..a0233a7271
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.master
+
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+
+import akka.serialization.Serialization
+
+class ZooKeeperPersistenceEngine(serialization: Serialization)
+ extends PersistenceEngine
+ with SparkZooKeeperWatcher
+ with Logging
+{
+ val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+
+ val zk = new SparkZooKeeperSession(this)
+
+ zk.connect()
+
+ override def zkSessionCreated() {
+ zk.mkdirRecursive(WORKING_DIR)
+ }
+
+ override def zkDown() {
+ logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
+ }
+
+ override def addApplication(app: ApplicationInfo) {
+ serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
+ }
+
+ override def removeApplication(app: ApplicationInfo) {
+ zk.delete(WORKING_DIR + "/app_" + app.id)
+ }
+
+ override def addWorker(worker: WorkerInfo) {
+ serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+ }
+
+ override def removeWorker(worker: WorkerInfo) {
+ zk.delete(WORKING_DIR + "/worker_" + worker.id)
+ }
+
+ override def close() {
+ zk.close()
+ }
+
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+ val appFiles = sortedFiles.filter(_.startsWith("app_"))
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+ (apps, workers)
+ }
+
+ private def serializeIntoFile(path: String, value: Serializable) {
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+ zk.create(path, serialized, CreateMode.PERSISTENT)
+ }
+
+ def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
+ val fileData = zk.getData("/spark/master_status/" + filename)
+ val clazz = m.erasure.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
+ serializer.fromBinary(fileData).asInstanceOf[T]
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index e3dc30eefc..8fabc95665 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -43,7 +43,8 @@ private[spark] class ExecutorRunner(
val workerId: String,
val host: String,
val sparkHome: File,
- val workDir: File)
+ val workDir: File,
+ var state: ExecutorState.Value)
extends Logging {
val fullId = appId + "/" + execId
@@ -83,7 +84,8 @@ private[spark] class ExecutorRunner(
process.destroy()
process.waitFor()
}
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.KILLED, None, None)
+ state = ExecutorState.KILLED
+ worker ! ExecutorStateChanged(appId, execId, state, None, None)
Runtime.getRuntime.removeShutdownHook(shutdownHook)
}
}
@@ -180,9 +182,9 @@ private[spark] class ExecutorRunner(
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
val exitCode = process.waitFor()
+ state = ExecutorState.FAILED
val message = "Command exited with code " + exitCode
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message),
- Some(exitCode))
+ worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
@@ -192,8 +194,9 @@ private[spark] class ExecutorRunner(
if (process != null) {
process.destroy()
}
+ state = ExecutorState.FAILED
val message = e.getClass + ": " + e.getMessage
- worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
+ worker ! ExecutorStateChanged(appId, execId, state, Some(message), None)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 09530beb3b..216d9d44ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,26 +23,28 @@ import java.io.File
import scala.collection.mutable.HashMap
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.actor._
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
-import org.apache.spark.{Logging}
-import org.apache.spark.deploy.ExecutorState
+import org.apache.spark.Logging
+import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
-
+/**
+ * @param masterUrls Each url should look like spark://host:port.
+ */
private[spark] class Worker(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
- masterUrl: String,
+ masterUrls: Array[String],
workDirPath: String = null)
extends Actor with Logging {
@@ -54,8 +56,18 @@ private[spark] class Worker(
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
+
+ // Index into masterUrls that we're currently trying to register with.
+ var masterIndex = 0
+
+ val masterLock: Object = new Object()
var master: ActorRef = null
- var masterWebUiUrl : String = ""
+ var activeMasterUrl: String = ""
+ var activeMasterWebUiUrl : String = ""
+ @volatile var registered = false
+ @volatile var connected = false
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
@@ -95,6 +107,7 @@ private[spark] class Worker(
}
override def preStart() {
+ assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
@@ -103,44 +116,98 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
- connectToMaster()
+ registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
}
- def connectToMaster() {
- logInfo("Connecting to master " + masterUrl)
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ def changeMaster(url: String, uiUrl: String) {
+ masterLock.synchronized {
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
+ master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ connected = true
+ }
+ }
+
+ def tryRegisterAllMasters() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
+ val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
+ publicAddress)
+ }
+ }
+
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ System.exit(1)
+ } else {
+ tryRegisterAllMasters()
+ }
+ }
+ retryTimer // start timer
}
override def receive = {
- case RegisteredWorker(url) =>
- masterWebUiUrl = url
- logInfo("Successfully registered with master")
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
- master ! Heartbeat(workerId)
+ case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+ logInfo("Successfully registered with master " + masterUrl)
+ registered = true
+ changeMaster(masterUrl, masterWebUiUrl)
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+
+ case SendHeartbeat =>
+ masterLock.synchronized {
+ if (connected) { master ! Heartbeat(workerId) }
}
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
+ context.unwatch(master)
+ changeMaster(masterUrl, masterWebUiUrl)
+
+ val execs = executors.values.
+ map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
+ sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
+
case RegisterWorkerFailed(message) =>
- logError("Worker registration failed: " + message)
- System.exit(1)
-
- case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- val manager = new ExecutorRunner(
- appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None)
+ if (!registered) {
+ logError("Worker registration failed: " + message)
+ System.exit(1)
+ }
+
+ case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+ } else {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
+ }
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ }
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
@@ -153,32 +220,39 @@ private[spark] class Worker(
memoryUsed -= executor.memory
}
- case KillExecutor(appId, execId) =>
- val fullId = appId + "/" + execId
- executors.get(fullId) match {
- case Some(executor) =>
- logInfo("Asked to kill executor " + fullId)
- executor.kill()
- case None =>
- logInfo("Asked to kill unknown executor " + fullId)
+ case KillExecutor(masterUrl, appId, execId) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
+ } else {
+ val fullId = appId + "/" + execId
+ executors.get(fullId) match {
+ case Some(executor) =>
+ logInfo("Asked to kill executor " + fullId)
+ executor.kill()
+ case None =>
+ logInfo("Asked to kill unknown executor " + fullId)
+ }
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ masterDisconnected()
+
+ case RemoteClientDisconnected(transport, address) if address == master.path.address =>
+ masterDisconnected()
+
+ case RemoteClientShutdown(transport, address) if address == master.path.address =>
masterDisconnected()
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, masterUrl, cores, memory,
- coresUsed, memoryUsed, masterWebUiUrl)
+ finishedExecutors.values.toList, activeMasterUrl, cores, memory,
+ coresUsed, memoryUsed, activeMasterWebUiUrl)
}
}
def masterDisconnected() {
- // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
- // (Note that if reconnecting we would also need to assign IDs differently.)
- logError("Connection to master failed! Shutting down.")
- executors.values.foreach(_.kill())
- System.exit(1)
+ logError("Connection to master failed! Waiting for master to reconnect...")
+ connected = false
}
def generateWorkerId(): String = {
@@ -196,17 +270,18 @@ private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
- args.memory, args.master, args.workDir)
+ args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
- masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+ masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+ : (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrl, workDir)), name = "Worker")
+ masterUrls, workDir)), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 0ae89a864f..3ed528e6b3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
var webUiPort = 8081
var cores = inferDefaultCores()
var memory = inferDefaultMemory()
- var master: String = null
+ var masters: Array[String] = null
var workDir: String = null
// Check for settings in environment variables
@@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) {
printUsageAndExit(0)
case value :: tail =>
- if (master != null) { // Two positional arguments were given
+ if (masters != null) { // Two positional arguments were given
printUsageAndExit(1)
}
- master = value
+ masters = value.stripPrefix("spark://").split(",").map("spark://" + _)
parse(tail)
case Nil =>
- if (master == null) { // No positional argument was given
+ if (masters == null) { // No positional argument was given
printUsageAndExit(1)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 95d6007f3b..800f1cafcc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
- val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+ val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 9c49768c0c..cb88159b8d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
- master: String,
+ masters: Array[String],
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
with ClientListener
@@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend(
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
- client = new Client(sc.env.actorSystem, master, appDesc, this)
+ client = new Client(sc.env.actorSystem, masters, appDesc, this)
client.start()
}
@@ -71,8 +71,14 @@ private[spark] class SparkDeploySchedulerBackend(
override def disconnected() {
if (!stopping) {
- logError("Disconnected from Spark cluster!")
- scheduler.error("Disconnected from Spark cluster")
+ logWarning("Disconnected from Spark cluster! Waiting for reconnection...")
+ }
+ }
+
+ override def dead() {
+ if (!stopping) {
+ logError("Spark cluster looks dead, giving up.")
+ scheduler.error("Spark cluster looks down")
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
index 453394dfda..fcd1b518d0 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala
@@ -35,7 +35,7 @@ private[spark] object UIWorkloadGenerator {
def main(args: Array[String]) {
if (args.length < 2) {
- println("usage: ./spark-class spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
+ println("usage: ./spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
System.exit(1)
}
val master = args(0)
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 05f8545c7b..0b38e239f9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
@@ -53,7 +53,8 @@ class JsonProtocolSuite extends FunSuite {
val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
- val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps)
+ val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
+ RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}
@@ -79,7 +80,7 @@ class JsonProtocolSuite extends FunSuite {
}
def createExecutorRunner() : ExecutorRunner = {
new ExecutorRunner("appId", 123, createAppDesc(), 4, 1234, null, "workerId", "host",
- new File("sparkHome"), new File("workDir"))
+ new File("sparkHome"), new File("workDir"), ExecutorState.RUNNING)
}
def assertValidJson(json: JValue) {