aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/SparkContext.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala58
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala18
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala28
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala145
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala183
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala24
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala109
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala64
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala125
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala11
-rw-r--r--pom.xml11
-rw-r--r--project/SparkBuild.scala1
25 files changed, 720 insertions, 170 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 912ce752fb..5318847276 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -169,7 +169,8 @@ class SparkContext(
case SPARK_REGEX(sparkUrl) =>
val scheduler = new ClusterScheduler(this)
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
+ val masterUrls = sparkUrl.split(",")
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
scheduler
@@ -185,8 +186,8 @@ class SparkContext(
val scheduler = new ClusterScheduler(this)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt)
- val sparkUrl = localCluster.start()
- val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, appName)
+ val masterUrls = localCluster.start()
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, masterUrls, appName)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: SparkDeploySchedulerBackend) => {
localCluster.stop()
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 0d0745a480..31d1909279 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -27,6 +27,7 @@ import org.apache.spark.util.Utils
private[deploy] sealed trait DeployMessage extends Serializable
+/** Contains messages sent between Scheduler actor nodes. */
private[deploy] object DeployMessages {
// Worker to Master
@@ -58,13 +59,14 @@ private[deploy] object DeployMessages {
// Master to Worker
- case class RegisteredWorker(masterWebUiUrl: String) extends DeployMessage
+ case class RegisteredWorker(masterUrl: String, masterWebUiUrl: String) extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
- case class KillExecutor(appId: String, execId: Int) extends DeployMessage
+ case class KillExecutor(masterUrl: String, appId: String, execId: Int) extends DeployMessage
case class LaunchExecutor(
+ masterUrl: String,
appId: String,
execId: Int,
appDesc: ApplicationDescription,
@@ -82,7 +84,7 @@ private[deploy] object DeployMessages {
// Master to Client
- case class RegisteredApplication(appId: String) extends DeployMessage
+ case class RegisteredApplication(appId: String, masterUrl: String) extends DeployMessage
// TODO(matei): replace hostPort with host
case class ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) {
@@ -131,16 +133,4 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
- // Actor System to Master
-
- case object CheckForWorkerTimeOut
-
- case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
-
- case object EndRecoveryProcess
-
- case object RequestWebUIPort
-
- case class WebUIPortResponse(webUIBoundPort: Int)
-
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
index 716ee483d5..2abf0b69dd 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorDescription.scala
@@ -17,6 +17,11 @@
package org.apache.spark.deploy
+/**
+ * Used to send state on-the-wire about Executors from Worker to Master.
+ * This state is sufficient for the Master to reconstruct its internal data structures during
+ * failover.
+ */
private[spark] class ExecutorDescription(
val appId: String,
val execId: Int,
diff --git a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
index 10161c8204..308a2bfa22 100644
--- a/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala
@@ -39,22 +39,23 @@ class LocalSparkCluster(numWorkers: Int, coresPerWorker: Int, memoryPerWorker: I
private val masterActorSystems = ArrayBuffer[ActorSystem]()
private val workerActorSystems = ArrayBuffer[ActorSystem]()
- def start(): String = {
+ def start(): Array[String] = {
logInfo("Starting a local Spark cluster with " + numWorkers + " workers.")
/* Start the Master */
val (masterSystem, masterPort, _) = Master.startSystemAndActor(localHostname, 0, 0)
masterActorSystems += masterSystem
val masterUrl = "spark://" + localHostname + ":" + masterPort
+ val masters = Array(masterUrl)
/* Start the Workers */
for (workerNum <- 1 to numWorkers) {
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
- memoryPerWorker, masterUrl, null, Some(workerNum))
+ memoryPerWorker, masters, null, Some(workerNum))
workerActorSystems += workerSystem
}
- return masterUrl
+ return masters
}
def stop() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index 28548a2ca9..aa2a10a8ad 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -23,6 +23,7 @@ import akka.actor._
import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Duration
+import akka.util.duration._
import akka.remote.RemoteClientDisconnected
import akka.remote.RemoteClientLifeCycleEvent
import akka.remote.RemoteClientShutdown
@@ -40,27 +41,27 @@ import org.apache.spark.deploy.master.Master
*/
private[spark] class Client(
actorSystem: ActorSystem,
- masterUrl: String,
+ masterUrls: Array[String],
appDescription: ApplicationDescription,
listener: ClientListener)
extends Logging {
+ val REGISTRATION_TIMEOUT = 60 * 1000
+
var actor: ActorRef = null
var appId: String = null
+ var registered = false
+ var activeMasterUrl: String = null
class ClientActor extends Actor with Logging {
var master: ActorRef = null
var masterAddress: Address = null
var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
+ var alreadyDead = false // To avoid calling listener.dead() multiple times
override def preStart() {
- logInfo("Connecting to master " + masterUrl)
try {
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- masterAddress = master.path.address
- master ! RegisterApplication(appDescription)
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ connectToMaster()
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
@@ -69,9 +70,34 @@ private[spark] class Client(
}
}
+ def connectToMaster() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
+ val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterApplication(appDescription)
+ }
+
+ context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) {
+ if (!registered) {
+ logError("All masters are unresponsive! Giving up.")
+ markDead()
+ }
+ }
+ }
+
+ def changeMaster(url: String) {
+ activeMasterUrl = url
+ master = context.actorFor(Master.toAkkaUrl(url))
+ masterAddress = master.path.address
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ }
+
override def receive = {
- case RegisteredApplication(appId_) =>
+ case RegisteredApplication(appId_, masterUrl) =>
appId = appId_
+ registered = true
+ changeMaster(masterUrl)
listener.connected(appId)
case ApplicationRemoved(message) =>
@@ -92,13 +118,12 @@ private[spark] class Client(
listener.executorRemoved(fullId, message.getOrElse(""), exitStatus)
}
- case MasterChanged(materUrl, masterWebUiUrl) =>
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
logInfo("Master has changed, new master is at " + masterUrl)
context.unwatch(master)
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- masterAddress = master.path.address
+ changeMaster(masterUrl)
+ alreadyDisconnected = false
sender ! MasterChangeAcknowledged(appId)
- context.watch(master)
case Terminated(actor_) if actor_ == master =>
logError("Connection to master failed; waiting for master to reconnect...")
@@ -113,7 +138,7 @@ private[spark] class Client(
markDisconnected()
case StopClient =>
- markDisconnected()
+ markDead()
sender ! true
context.stop(self)
}
@@ -127,6 +152,13 @@ private[spark] class Client(
alreadyDisconnected = true
}
}
+
+ def markDead() {
+ if (!alreadyDead) {
+ listener.dead()
+ alreadyDead = true
+ }
+ }
}
def start() {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
index 4605368c11..be7a11bd15 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/ClientListener.scala
@@ -27,8 +27,12 @@ package org.apache.spark.deploy.client
private[spark] trait ClientListener {
def connected(appId: String): Unit
+ /** Disconnection may be a temporary state, as we fail over to a new Master. */
def disconnected(): Unit
+ /** Dead means that we couldn't find any Masters to connect to, and have given up. */
+ def dead(): Unit
+
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index d5e9a0e095..5b62d3ba6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -33,6 +33,11 @@ private[spark] object TestClient {
System.exit(0)
}
+ def dead() {
+ logInfo("Could not connect to master")
+ System.exit(0)
+ }
+
def executorAdded(id: String, workerId: String, hostPort: String, cores: Int, memory: Int) {}
def executorRemoved(id: String, message: String, exitStatus: Option[Int]) {}
@@ -44,7 +49,7 @@ private[spark] object TestClient {
val desc = new ApplicationDescription(
"TestClient", 1, 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()), "dummy-spark-home", "ignored")
val listener = new TestListener
- val client = new Client(actorSystem, url, desc, listener)
+ val client = new Client(actorSystem, Array(url), desc, listener)
client.start()
actorSystem.awaitTermination()
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index e437a0e7ae..8291e29ec3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -23,12 +23,12 @@ import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class ApplicationInfo(
- val startTime: Long,
- val id: String,
- val desc: ApplicationDescription,
- val submitDate: Date,
- val driver: ActorRef,
- val appUiUrl: String)
+ val startTime: Long,
+ val id: String,
+ val desc: ApplicationDescription,
+ val submitDate: Date,
+ val driver: ActorRef,
+ val appUiUrl: String)
extends Serializable {
@transient var state: ApplicationState.Value = _
@@ -39,14 +39,14 @@ private[spark] class ApplicationInfo(
@transient private var nextExecutorId: Int = _
- init()
+ init
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
- init()
+ init
}
- private def init() {
+ private def init = {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
coresGranted = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
index d235234c13..76db61dd61 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ExecutorInfo.scala
@@ -28,7 +28,7 @@ private[spark] class ExecutorInfo(
var state = ExecutorState.LAUNCHING
- /** Copy all state variables from the given on-the-wire ExecutorDescription. */
+ /** Copy all state (non-val) variables from the given on-the-wire ExecutorDescription. */
def copyState(execDesc: ExecutorDescription) {
state = execDesc.state
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
index 2fc13821bd..c0849ef324 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/FileSystemPersistenceEngine.scala
@@ -32,8 +32,8 @@ import org.apache.spark.Logging
* @param serialization Used to serialize our objects.
*/
private[spark] class FileSystemPersistenceEngine(
- val dir: String,
- val serialization: Serialization)
+ val dir: String,
+ val serialization: Serialization)
extends PersistenceEngine with Logging {
new File(dir).mkdir()
@@ -57,11 +57,11 @@ private[spark] class FileSystemPersistenceEngine(
}
override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
- val sortedFiles = new File(dir).listFiles().sortBy(_.getName())
- val appFiles = sortedFiles.filter(_.getName().startsWith("app_"))
- val apps = appFiles.map(deserializeFromFile[ApplicationInfo](_))
- val workerFiles = sortedFiles.filter(_.getName().startsWith("worker_"))
- val workers = workerFiles.map(deserializeFromFile[WorkerInfo](_))
+ val sortedFiles = new File(dir).listFiles().sortBy(_.getName)
+ val appFiles = sortedFiles.filter(_.getName.startsWith("app_"))
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val workerFiles = sortedFiles.filter(_.getName.startsWith("worker_"))
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
(apps, workers)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
new file mode 100644
index 0000000000..c44a23f8c6
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -0,0 +1,28 @@
+package org.apache.spark.deploy.master
+
+import akka.actor.{Actor, ActorRef}
+
+import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
+
+/**
+ * A LeaderElectionAgent keeps track of whether the current Master is the leader, meaning it
+ * is the only Master serving requests.
+ * In addition to the API provided, the LeaderElectionAgent will use of the following messages
+ * to inform the Master of leader changes:
+ * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
+ * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
+ */
+trait LeaderElectionAgent extends Actor {
+ val masterActor: ActorRef
+}
+
+/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
+class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
+ override def preStart() {
+ masterActor ! ElectedLeader
+ }
+
+ override def receive = {
+ case _ =>
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index c6e039eed4..e13a8cba4a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -34,18 +34,18 @@ import akka.util.{Duration, Timeout}
import org.apache.spark.{Logging, SparkException}
import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.deploy.master.MasterState.MasterState
+import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
-
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
val REAPER_ITERATIONS = System.getProperty("spark.dead.worker.persistence", "15").toInt
val RECOVERY_DIR = System.getProperty("spark.deploy.recoveryDirectory", "")
+ val RECOVERY_MODE = System.getProperty("spark.deploy.recoveryMode", "NONE")
var nextAppNumber = 0
val workers = new HashSet[WorkerInfo]
@@ -76,75 +76,115 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (envVar != null) envVar else host
}
- var state: MasterState = _
+ val masterUrl = "spark://" + host + ":" + port
+ var masterWebUiUrl: String = _
+
+ var state = MasterState.STANDBY
var persistenceEngine: PersistenceEngine = _
+ var leaderElectionAgent: ActorRef = _
+
// As a temporary workaround before better ways of configuring memory, we allow users to set
// a flag that will perform round-robin scheduling across the nodes (spreading out each app
// among all the nodes) instead of trying to consolidate each app onto a small # of nodes.
val spreadOutApps = System.getProperty("spark.deploy.spreadOut", "true").toBoolean
override def preStart() {
- logInfo("Starting Spark master at spark://" + host + ":" + port)
+ logInfo("Starting Spark master at " + masterUrl)
// Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start()
+ masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis, self, CheckForWorkerTimeOut)
masterMetricsSystem.registerSource(masterSource)
masterMetricsSystem.start()
applicationMetricsSystem.start()
- persistenceEngine =
- if (RECOVERY_DIR.isEmpty()) {
- new BlackHolePersistenceEngine()
- } else {
+ persistenceEngine = RECOVERY_MODE match {
+ case "ZOOKEEPER" =>
+ logInfo("Persisting recovery state to ZooKeeper")
+ new ZooKeeperPersistenceEngine(SerializationExtension(context.system))
+ case "FILESYSTEM" =>
logInfo("Persisting recovery state to directory: " + RECOVERY_DIR)
new FileSystemPersistenceEngine(RECOVERY_DIR, SerializationExtension(context.system))
- }
+ case _ =>
+ new BlackHolePersistenceEngine()
+ }
- val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
- state =
- if (storedApps.isEmpty && storedWorkers.isEmpty) {
- MasterState.ALIVE
- } else {
- self ! BeginRecovery(storedApps, storedWorkers)
- MasterState.RECOVERING
- }
+ leaderElectionAgent = context.actorOf(Props(
+ RECOVERY_MODE match {
+ case "ZOOKEEPER" =>
+ new ZooKeeperLeaderElectionAgent(self, masterUrl)
+ case _ =>
+ new MonarchyLeaderAgent(self)
+ }))
+ }
+
+ override def preRestart(reason: Throwable, message: Option[Any]) {
+ logError("Master actor restarted due to exception", reason)
}
override def postStop() {
webUi.stop()
masterMetricsSystem.stop()
applicationMetricsSystem.stop()
+ persistenceEngine.close()
+ context.stop(leaderElectionAgent)
}
override def receive = {
+ case ElectedLeader => {
+ val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
+ state = if (storedApps.isEmpty && storedWorkers.isEmpty)
+ MasterState.ALIVE
+ else
+ MasterState.RECOVERING
+
+ logInfo("I have been elected leader! New state: " + state)
+
+ if (state == MasterState.RECOVERING) {
+ beginRecovery(storedApps, storedWorkers)
+ context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
+ }
+ }
+
+ case RevokedLeadership => {
+ logError("Leadership has been revoked -- master shutting down.")
+ System.exit(0)
+ }
+
case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
- if (idToWorker.contains(id)) {
+ if (state == MasterState.STANDBY) {
+ // ignore, don't send response
+ } else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort, publicAddress)
registerWorker(worker)
context.watch(sender) // This doesn't work with remote actors but helps for testing
persistenceEngine.addWorker(worker)
- sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
+ sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
}
}
case RegisterApplication(description) => {
- logInfo("Registering app " + description.name)
- val app = createApplication(description, sender)
- registerApplication(app)
- logInfo("Registered app " + description.name + " with ID " + app.id)
- context.watch(sender) // This doesn't work with remote actors but helps for testing
- persistenceEngine.addApplication(app)
- sender ! RegisteredApplication(app.id)
- schedule()
+ if (state == MasterState.STANDBY) {
+ // ignore, don't send response
+ } else {
+ logInfo("Registering app " + description.name)
+ val app = createApplication(description, sender)
+ registerApplication(app)
+ logInfo("Registered app " + description.name + " with ID " + app.id)
+ context.watch(sender) // This doesn't work with remote actors but helps for testing
+ persistenceEngine.addApplication(app)
+ sender ! RegisteredApplication(app.id, masterUrl)
+ schedule()
+ }
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
@@ -184,27 +224,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
- case BeginRecovery(storedApps, storedWorkers) => {
- context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, EndRecoveryProcess)
-
- val masterUrl = "spark://" + host + ":" + port
- val masterWebUiUrl = "http://" + masterPublicAddress + ":" + webUi.boundPort.get
- for (app <- storedApps) {
- registerApplication(app)
- app.state = ApplicationState.UNKNOWN
- app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
- }
- for (worker <- storedWorkers) {
- registerWorker(worker)
- worker.state = WorkerState.UNKNOWN
- worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
- }
- }
-
case MasterChangeAcknowledged(appId) => {
- val appOption = idToApp.get(appId)
- appOption match {
+ idToApp.get(appId) match {
case Some(app) =>
+ logInfo("Application has been re-registered: " + appId)
app.state = ApplicationState.WAITING
case None =>
logWarning("Master change ack from unknown app: " + appId)
@@ -216,9 +239,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case WorkerSchedulerStateResponse(workerId, executors) => {
idToWorker.get(workerId) match {
case Some(worker) =>
+ logInfo("Worker has been re-registered: " + workerId)
worker.state = WorkerState.ALIVE
- val validExecutors = executors.filter(exec => idToApp.get(exec.appId) != None)
+ val validExecutors = executors.filter(exec => idToApp.get(exec.appId).isDefined)
for (exec <- validExecutors) {
val app = idToApp.get(exec.appId).get
val execInfo = app.addExecutor(worker, exec.cores, Some(exec.execId))
@@ -232,10 +256,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (canCompleteRecovery) { completeRecovery() }
}
- case EndRecoveryProcess => {
- completeRecovery()
- }
-
case Terminated(actor) => {
// The disconnected actor could've been either a worker or an app; remove whichever of
// those we have an entry for in the corresponding actor hashmap
@@ -275,15 +295,29 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers.count(_.state == WorkerState.UNKNOWN) == 0 &&
apps.count(_.state == ApplicationState.UNKNOWN) == 0
+ def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
+ for (app <- storedApps) {
+ registerApplication(app)
+ app.state = ApplicationState.UNKNOWN
+ app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ }
+ for (worker <- storedWorkers) {
+ registerWorker(worker)
+ worker.state = WorkerState.UNKNOWN
+ worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ }
+ }
+
def completeRecovery() {
+ // Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
if (state != MasterState.RECOVERING) { return }
state = MasterState.COMPLETING_RECOVERY
}
// Kill off any workers and apps that didn't respond to us.
- workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker(_))
- apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication(_))
+ workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
+ apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
state = MasterState.ALIVE
schedule()
@@ -352,7 +386,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(
+ worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
@@ -415,6 +449,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
if (firstApp == None) {
firstApp = Some(app)
}
+ // TODO: What is firstApp?? Can we remove it?
val workersAlive = workers.filter(_.state == WorkerState.ALIVE).toArray
if (workersAlive.size > 0 && !workersAlive.exists(_.memoryFree >= app.desc.memoryPerSlave)) {
logWarning("Could not find any workers with enough memory for " + firstApp.get.id)
@@ -444,7 +479,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
waitingApps -= app
for (exec <- app.executors.values) {
exec.worker.removeExecutor(exec)
- exec.worker.actor ! KillExecutor(exec.application.id, exec.id)
+ exec.worker.actor ! KillExecutor(masterUrl, exec.application.id, exec.id)
exec.state = ExecutorState.KILLED
}
app.markFinished(state)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
new file mode 100644
index 0000000000..6e31d40b43
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -0,0 +1,29 @@
+package org.apache.spark.deploy.master
+
+import org.apache.spark.util.Utils
+
+sealed trait MasterMessages extends Serializable
+
+/** Contains messages seen only by the Master and its associated entities. */
+private[master] object MasterMessages {
+
+ // LeaderElectionAgent to Master
+
+ case object ElectedLeader
+
+ case object RevokedLeadership
+
+ // Actor System to LeaderElectionAgent
+
+ case object CheckLeader
+
+ // Actor System to Master
+
+ case object CheckForWorkerTimeOut
+
+ case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+
+ case object RequestWebUIPort
+
+ case class WebUIPortResponse(webUIBoundPort: Int)
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
index 9ea5e9752e..eec3df3b7a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
@@ -18,9 +18,9 @@
package org.apache.spark.deploy.master
private[spark] object MasterState
- extends Enumeration("ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
+ extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
type MasterState = Value
- val ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
+ val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index 07d23c6bf3..8c4878bd30 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -36,9 +36,11 @@ trait PersistenceEngine {
/**
* Returns the persisted data sorted by their respective ids (which implies that they're
- * sorted by time order of creation).
+ * sorted by time of creation).
*/
def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo])
+
+ def close() {}
}
class BlackHolePersistenceEngine extends PersistenceEngine {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
new file mode 100644
index 0000000000..5492a3a988
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -0,0 +1,183 @@
+package org.apache.spark.deploy.master
+
+import scala.collection.JavaConversions._
+import scala.concurrent.ops._
+
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+import org.apache.zookeeper.data.Stat
+import org.apache.zookeeper.Watcher.Event.KeeperState
+
+/**
+ * Provides a Scala-side interface to the standard ZooKeeper client, with the addition of retry
+ * logic. If the ZooKeeper session expires or otherwise dies, a new ZooKeeper session will be
+ * created. If ZooKeeper remains down after several retries, the given
+ * [[org.apache.spark.deploy.master.SparkZooKeeperWatcher SparkZooKeeperWatcher]] will be
+ * informed via zkDown().
+ *
+ * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
+ * times or a semantic exception is thrown (e.g.., "node already exists").
+ */
+class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
+ val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
+
+ val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
+ val ZK_TIMEOUT_MILLIS = 30000
+ val RETRY_WAIT_MILLIS = 5000
+ val ZK_CHECK_PERIOD_MILLIS = 10000
+ val MAX_RECONNECT_ATTEMPTS = 3
+
+ private var zk: ZooKeeper = _
+
+ private val watcher = new ZooKeeperWatcher()
+ private var reconnectAttempts = 0
+ private var closed = false
+
+ /** Connect to ZooKeeper to start the session. Must be called before anything else. */
+ def connect() {
+ connectToZooKeeper()
+ spawn(sessionMonitorThread)
+ }
+
+ def sessionMonitorThread = {
+ while (!closed) {
+ Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
+ if (zk.getState != ZooKeeper.States.CONNECTED) {
+ reconnectAttempts += 1
+ val attemptsLeft = MAX_RECONNECT_ATTEMPTS - reconnectAttempts
+ if (attemptsLeft <= 0) {
+ logError("Could not connect to ZooKeeper: system failure")
+ zkWatcher.zkDown()
+ close()
+ } else {
+ logWarning("ZooKeeper connection failed, retrying " + attemptsLeft + " more times...")
+ connectToZooKeeper()
+ }
+ }
+ }
+ }
+
+ def close() {
+ if (!closed && zk != null) { zk.close() }
+ closed = true
+ }
+
+ private def connectToZooKeeper() {
+ if (zk != null) zk.close()
+ zk = new ZooKeeper(ZK_URL, ZK_TIMEOUT_MILLIS, watcher)
+ }
+
+ /**
+ * Attempts to maintain a live ZooKeeper exception despite (very) transient failures.
+ * Mainly useful for handling the natural ZooKeeper session expiration.
+ */
+ private class ZooKeeperWatcher extends Watcher {
+ def process(event: WatchedEvent) {
+ if (closed) { return }
+
+ event.getState match {
+ case KeeperState.SyncConnected =>
+ reconnectAttempts = 0
+ zkWatcher.zkSessionCreated()
+ case KeeperState.Expired =>
+ connectToZooKeeper()
+ case KeeperState.Disconnected =>
+ logWarning("ZooKeeper disconnected, will retry...")
+ }
+ }
+ }
+
+ def create(path: String, bytes: Array[Byte], createMode: CreateMode): String = {
+ retry {
+ zk.create(path, bytes, ZK_ACL, createMode)
+ }
+ }
+
+ def exists(path: String, watcher: Watcher = null): Stat = {
+ retry {
+ zk.exists(path, watcher)
+ }
+ }
+
+ def getChildren(path: String, watcher: Watcher = null): List[String] = {
+ retry {
+ zk.getChildren(path, watcher).toList
+ }
+ }
+
+ def getData(path: String): Array[Byte] = {
+ retry {
+ zk.getData(path, false, null)
+ }
+ }
+
+ def delete(path: String, version: Int = -1): Unit = {
+ retry {
+ zk.delete(path, version)
+ }
+ }
+
+ /**
+ * Creates the given directory (non-recursively) if it doesn't exist.
+ * All znodes are created in PERSISTENT mode with no data.
+ */
+ def mkdir(path: String) {
+ if (exists(path) == null) {
+ try {
+ create(path, "".getBytes, CreateMode.PERSISTENT)
+ } catch {
+ case e: Exception =>
+ // If the exception caused the directory not to be created, bubble it up,
+ // otherwise ignore it.
+ if (exists(path) == null) { throw e }
+ }
+ }
+ }
+
+ /**
+ * Recursively creates all directories up to the given one.
+ * All znodes are created in PERSISTENT mode with no data.
+ */
+ def mkdirRecursive(path: String) {
+ var fullDir = ""
+ for (dentry <- path.split("/").tail) {
+ fullDir += "/" + dentry
+ mkdir(fullDir)
+ }
+ }
+
+ /**
+ * Retries the given function up to 3 times. The assumption is that failure is transient,
+ * UNLESS it is a semantic exception (i.e., trying to get data from a node that doesn't exist),
+ * in which case the exception will be thrown without retries.
+ *
+ * @param fn Block to execute, possibly multiple times.
+ */
+ def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = {
+ try {
+ fn
+ } catch {
+ case e: KeeperException.NoNodeException => throw e
+ case e: KeeperException.NodeExistsException => throw e
+ case e if n > 0 =>
+ logError("ZooKeeper exception, " + n + " more retries...", e)
+ Thread.sleep(RETRY_WAIT_MILLIS)
+ retry(fn)(n-1)
+ }
+ }
+}
+
+trait SparkZooKeeperWatcher {
+ /**
+ * Called whenever a ZK session is created --
+ * this will occur when we create our first session as well as each time
+ * the session expires or errors out.
+ */
+ def zkSessionCreated()
+
+ /**
+ * Called if ZK appears to be completely down (i.e., not just a transient error).
+ * We will no longer attempt to reconnect to ZK, and the SparkZooKeeperSession is considered dead.
+ */
+ def zkDown()
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 2ab7bb233c..26090c6a9c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -22,14 +22,14 @@ import scala.collection.mutable
import org.apache.spark.util.Utils
private[spark] class WorkerInfo(
- val id: String,
- val host: String,
- val port: Int,
- val cores: Int,
- val memory: Int,
- val actor: ActorRef,
- val webUiPort: Int,
- val publicAddress: String)
+ val id: String,
+ val host: String,
+ val port: Int,
+ val cores: Int,
+ val memory: Int,
+ val actor: ActorRef,
+ val webUiPort: Int,
+ val publicAddress: String)
extends Serializable {
Utils.checkHost(host, "Expected hostname")
@@ -42,18 +42,18 @@ private[spark] class WorkerInfo(
@transient var lastHeartbeat: Long = _
- init()
+ init
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
- init()
+ init
}
- private def init() {
- executors = new mutable.HashMap[String, ExecutorInfo]
+ private def init = {
+ executors = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
memoryUsed = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
new file mode 100644
index 0000000000..4ca59e5b24
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -0,0 +1,109 @@
+package org.apache.spark.deploy.master
+
+import scala.collection.JavaConversions._
+
+import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership}
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+import org.apache.zookeeper.Watcher.Event.EventType
+
+import akka.actor.{Cancellable, ActorRef}
+import akka.util.duration._
+
+class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+ extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
+
+ val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
+
+ private val watcher = new ZooKeeperWatcher()
+ private val zk = new SparkZooKeeperSession(this)
+ private var status = LeadershipStatus.NOT_LEADER
+ private var myLeaderFile: String = _
+ private var leaderUrl: String = _
+
+ override def preStart() {
+ logInfo("Starting ZooKeeper LeaderElection agent")
+ zk.connect()
+ }
+
+ override def zkSessionCreated() {
+ zk.mkdirRecursive(WORKING_DIR)
+ myLeaderFile =
+ zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
+ self ! CheckLeader
+ }
+
+ override def zkDown() {
+ logError("ZooKeeper down! LeaderElectionAgent shutting down Master.")
+ System.exit(1)
+ }
+
+ override def postStop() {
+ zk.close()
+ }
+
+ override def receive = {
+ case CheckLeader => checkLeader()
+ }
+
+ private class ZooKeeperWatcher extends Watcher {
+ def process(event: WatchedEvent) {
+ if (event.getType == EventType.NodeDeleted) {
+ logInfo("Leader file disappeared, a master is down!")
+ self ! CheckLeader
+ }
+ }
+ }
+
+ /** Uses ZK leader election. Navigates several ZK potholes along the way. */
+ def checkLeader() {
+ val masters = zk.getChildren(WORKING_DIR).toList
+ val leader = masters.sorted.get(0)
+ val leaderFile = WORKING_DIR + "/" + leader
+
+ // Setup a watch for the current leader.
+ zk.exists(leaderFile, watcher)
+
+ try {
+ leaderUrl = new String(zk.getData(leaderFile))
+ } catch {
+ // A NoNodeException may be thrown if old leader died since the start of this method call.
+ // This is fine -- just check again, since we're guaranteed to see the new values.
+ case e: KeeperException.NoNodeException =>
+ logInfo("Leader disappeared while reading it -- finding next leader")
+ checkLeader()
+ return
+ }
+
+ val isLeader = myLeaderFile == leaderFile
+ if (!isLeader && leaderUrl == masterUrl) {
+ // We found a different master file pointing to this process.
+ // This can happen in the following two cases:
+ // (1) The master process was restarted on the same node.
+ // (2) The ZK server died between creating the node and returning the name of the node.
+ // For this case, we will end up creating a second file, and MUST explicitly delete the
+ // first one, since our ZK session is still open.
+ // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
+ // leader changes.
+ logWarning("Cleaning up old ZK master election file that points to this master.")
+ zk.delete(leaderFile)
+ } else {
+ updateLeadershipStatus(isLeader)
+ }
+ }
+
+ def updateLeadershipStatus(isLeader: Boolean) {
+ if (isLeader && status == LeadershipStatus.NOT_LEADER) {
+ status = LeadershipStatus.LEADER
+ masterActor ! ElectedLeader
+ } else if (!isLeader && status == LeadershipStatus.LEADER) {
+ status = LeadershipStatus.NOT_LEADER
+ masterActor ! RevokedLeadership
+ }
+ }
+
+ private object LeadershipStatus extends Enumeration {
+ type LeadershipStatus = Value
+ val LEADER, NOT_LEADER = Value
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
new file mode 100644
index 0000000000..f45b62cbdd
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala
@@ -0,0 +1,64 @@
+package org.apache.spark.deploy.master
+
+import org.apache.spark.Logging
+import org.apache.zookeeper._
+
+import akka.serialization.Serialization
+
+class ZooKeeperPersistenceEngine(serialization: Serialization) extends PersistenceEngine with SparkZooKeeperWatcher with Logging {
+ val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
+
+ val zk = new SparkZooKeeperSession(this)
+
+ zk.connect()
+
+ override def zkSessionCreated() {
+ zk.mkdirRecursive(WORKING_DIR)
+ }
+
+ override def zkDown() {
+ logError("PersistenceEngine disconnected from ZooKeeper -- ZK looks down.")
+ }
+
+ override def addApplication(app: ApplicationInfo) {
+ serializeIntoFile(WORKING_DIR + "/app_" + app.id, app)
+ }
+
+ override def removeApplication(app: ApplicationInfo) {
+ zk.delete(WORKING_DIR + "/app_" + app.id)
+ }
+
+ override def addWorker(worker: WorkerInfo) {
+ serializeIntoFile(WORKING_DIR + "/worker_" + worker.id, worker)
+ }
+
+ override def removeWorker(worker: WorkerInfo) {
+ zk.delete(WORKING_DIR + "/worker_" + worker.id)
+ }
+
+ override def close() {
+ zk.close()
+ }
+
+ override def readPersistedData(): (Seq[ApplicationInfo], Seq[WorkerInfo]) = {
+ val sortedFiles = zk.getChildren(WORKING_DIR).toList.sorted
+ val appFiles = sortedFiles.filter(_.startsWith("app_"))
+ val apps = appFiles.map(deserializeFromFile[ApplicationInfo])
+ val workerFiles = sortedFiles.filter(_.startsWith("worker_"))
+ val workers = workerFiles.map(deserializeFromFile[WorkerInfo])
+ (apps, workers)
+ }
+
+ private def serializeIntoFile(path: String, value: Serializable) {
+ val serializer = serialization.findSerializerFor(value)
+ val serialized = serializer.toBinary(value)
+ zk.create(path, serialized, CreateMode.PERSISTENT)
+ }
+
+ def deserializeFromFile[T <: Serializable](filename: String)(implicit m: Manifest[T]): T = {
+ val fileData = zk.getData("/spark/master_status/" + filename)
+ val clazz = m.erasure.asInstanceOf[Class[T]]
+ val serializer = serialization.serializerFor(clazz)
+ serializer.fromBinary(fileData).asInstanceOf[T]
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 46455aa5ae..73fb0c8bd8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -23,11 +23,11 @@ import java.io.File
import scala.collection.mutable.HashMap
-import akka.actor.{ActorRef, Props, Actor, ActorSystem, Terminated}
+import akka.actor._
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import akka.util.duration._
-import org.apache.spark.{SparkEnv, Logging}
+import org.apache.spark.Logging
import org.apache.spark.deploy.{ExecutorDescription, ExecutorState}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -35,14 +35,13 @@ import org.apache.spark.deploy.worker.ui.WorkerWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{Utils, AkkaUtils}
-
private[spark] class Worker(
host: String,
port: Int,
webUiPort: Int,
cores: Int,
memory: Int,
- var masterUrl: String,
+ masterUrls: Array[String],
workDirPath: String = null)
extends Actor with Logging {
@@ -54,8 +53,16 @@ private[spark] class Worker(
// Send a heartbeat every (heartbeat timeout) / 4 milliseconds
val HEARTBEAT_MILLIS = System.getProperty("spark.worker.timeout", "60").toLong * 1000 / 4
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
+
+ // Index into masterUrls that we're currently trying to register with.
+ var masterIndex = 0
+
var master: ActorRef = null
- var masterWebUiUrl : String = ""
+ var activeMasterUrl: String = ""
+ var activeMasterWebUiUrl : String = ""
+ var registered = false
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
@@ -103,35 +110,62 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start()
- connectToMaster()
+ registerWithMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
}
- def connectToMaster() {
- logInfo("Connecting to master " + masterUrl)
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- master ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get, publicAddress)
+ def changeMaster(url: String, uiUrl: String) {
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
+ master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
}
+ def tryRegisterAllMasters() {
+ for (masterUrl <- masterUrls) {
+ logInfo("Connecting to master " + masterUrl + "...")
+ val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
+ actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort.get,
+ publicAddress)
+ }
+ }
+
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ System.exit(1)
+ } else {
+ tryRegisterAllMasters()
+ }
+ }
+ retryTimer // start timer
+ }
+
override def receive = {
- case RegisteredWorker(url) =>
- masterWebUiUrl = url
- logInfo("Successfully registered with master")
+ case RegisteredWorker(masterUrl, masterWebUiUrl) =>
+ logInfo("Successfully registered with master " + masterUrl)
+ registered = true
+ changeMaster(masterUrl, masterWebUiUrl)
context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
master ! Heartbeat(workerId)
}
- case MasterChanged(url, uiUrl) =>
- logInfo("Master has changed, new master is at " + url)
- masterUrl = url
- masterWebUiUrl = uiUrl
+ case MasterChanged(masterUrl, masterWebUiUrl) =>
+ logInfo("Master has changed, new master is at " + masterUrl)
context.unwatch(master)
- master = context.actorFor(Master.toAkkaUrl(masterUrl))
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ changeMaster(masterUrl, masterWebUiUrl)
+
val execs = executors.values.
map(e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state))
sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
@@ -140,15 +174,19 @@ private[spark] class Worker(
logError("Worker registration failed: " + message)
System.exit(1)
- case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
- logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
- val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
- self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
- executors(appId + "/" + execId) = manager
- manager.start()
- coresUsed += cores_
- memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
+ } else {
+ logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+ val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+ self, workerId, host, new File(execSparkHome_), workDir, ExecutorState.RUNNING)
+ executors(appId + "/" + execId) = manager
+ manager.start()
+ coresUsed += cores_
+ memoryUsed += memory_
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
@@ -164,14 +202,18 @@ private[spark] class Worker(
memoryUsed -= executor.memory
}
- case KillExecutor(appId, execId) =>
- val fullId = appId + "/" + execId
- executors.get(fullId) match {
- case Some(executor) =>
- logInfo("Asked to kill executor " + fullId)
- executor.kill()
- case None =>
- logInfo("Asked to kill unknown executor " + fullId)
+ case KillExecutor(masterUrl, appId, execId) =>
+ if (masterUrl != activeMasterUrl) {
+ logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor " + execId)
+ } else {
+ val fullId = appId + "/" + execId
+ executors.get(fullId) match {
+ case Some(executor) =>
+ logInfo("Asked to kill executor " + fullId)
+ executor.kill()
+ case None =>
+ logInfo("Asked to kill unknown executor " + fullId)
+ }
}
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
@@ -179,8 +221,8 @@ private[spark] class Worker(
case RequestWorkerState => {
sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
- finishedExecutors.values.toList, masterUrl, cores, memory,
- coresUsed, memoryUsed, masterWebUiUrl)
+ finishedExecutors.values.toList, activeMasterUrl, cores, memory,
+ coresUsed, memoryUsed, activeMasterWebUiUrl)
}
}
@@ -203,17 +245,18 @@ private[spark] object Worker {
def main(argStrings: Array[String]) {
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
- args.memory, args.master, args.workDir)
+ args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
def startSystemAndActor(host: String, port: Int, webUiPort: Int, cores: Int, memory: Int,
- masterUrl: String, workDir: String, workerNumber: Option[Int] = None): (ActorSystem, Int) = {
+ masterUrls: Array[String], workDir: String, workerNumber: Option[Int] = None)
+ : (ActorSystem, Int) = {
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port)
val actor = actorSystem.actorOf(Props(new Worker(host, boundPort, webUiPort, cores, memory,
- masterUrl, workDir)), name = "Worker")
+ masterUrls, workDir)), name = "Worker")
(actorSystem, boundPort)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
index 0ae89a864f..16d8686490 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/WorkerArguments.scala
@@ -29,7 +29,7 @@ private[spark] class WorkerArguments(args: Array[String]) {
var webUiPort = 8081
var cores = inferDefaultCores()
var memory = inferDefaultMemory()
- var master: String = null
+ var masters: Array[String] = null
var workDir: String = null
// Check for settings in environment variables
@@ -86,14 +86,14 @@ private[spark] class WorkerArguments(args: Array[String]) {
printUsageAndExit(0)
case value :: tail =>
- if (master != null) { // Two positional arguments were given
+ if (masters != null) { // Two positional arguments were given
printUsageAndExit(1)
}
- master = value
+ masters = value.split(",")
parse(tail)
case Nil =>
- if (master == null) { // No positional argument was given
+ if (masters == null) { // No positional argument was given
printUsageAndExit(1)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
index 95d6007f3b..800f1cafcc 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/WorkerWebUI.scala
@@ -105,7 +105,7 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
val logText = <node>{Utils.offsetBytes(path, startByte, endByte)}</node>
- val linkToMaster = <p><a href={worker.masterWebUiUrl}>Back to Master</a></p>
+ val linkToMaster = <p><a href={worker.activeMasterWebUiUrl}>Back to Master</a></p>
val range = <span>Bytes {startByte.toString} - {endByte.toString} of {logLength}</span>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index c173cdf449..8a3017e964 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
private[spark] class SparkDeploySchedulerBackend(
scheduler: ClusterScheduler,
sc: SparkContext,
- master: String,
+ masters: Array[String],
appName: String)
extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
with ClientListener
@@ -52,7 +52,7 @@ private[spark] class SparkDeploySchedulerBackend(
val appDesc = new ApplicationDescription(appName, maxCores, executorMemory, command, sparkHome,
"http://" + sc.ui.appUIAddress)
- client = new Client(sc.env.actorSystem, master, appDesc, this)
+ client = new Client(sc.env.actorSystem, masters, appDesc, this)
client.start()
}
@@ -75,6 +75,13 @@ private[spark] class SparkDeploySchedulerBackend(
}
}
+ override def dead() {
+ if (!stopping) {
+ logError("Spark cluster looks dead, giving up.")
+ scheduler.error("Spark cluster looks down")
+ }
+ }
+
override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) {
logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format(
fullId, hostPort, cores, Utils.megabytesToString(memory)))
diff --git a/pom.xml b/pom.xml
index d74d45adf1..f8ea2b8920 100644
--- a/pom.xml
+++ b/pom.xml
@@ -345,6 +345,17 @@
<scope>test</scope>
</dependency>
<dependency>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>zookeeper</artifactId>
+ <version>3.4.5</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 99cdadb9e7..156f501a04 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -211,6 +211,7 @@ object SparkBuild extends Build {
"net.java.dev.jets3t" % "jets3t" % "0.7.1",
"org.apache.avro" % "avro" % "1.7.4",
"org.apache.avro" % "avro-ipc" % "1.7.4" excludeAll(excludeNetty),
+ "org.apache.zookeeper" % "zookeeper" % "3.4.5" excludeAll(excludeNetty),
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",