aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
8 files changed, 122 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 31d1909279..979e65ac6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -21,6 +21,7 @@ import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.master.MasterState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils
@@ -111,7 +112,8 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
+ status: MasterState) {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
@@ -133,4 +135,7 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
+ // Actor System to Worker
+
+ case object SendHeartbeat
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 87a703427c..f87b885286 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -71,7 +71,8 @@ private[spark] object JsonProtocol {
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
+ ("status" -> obj.status.toString)
}
def writeWorkerState(obj: WorkerStateResponse) = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index aa2a10a8ad..198d5cee7b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -46,7 +46,8 @@ private[spark] class Client(
listener: ClientListener)
extends Logging {
- val REGISTRATION_TIMEOUT = 60 * 1000
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
var actor: ActorRef = null
var appId: String = null
@@ -61,7 +62,7 @@ private[spark] class Client(
override def preStart() {
try {
- connectToMaster()
+ registerWithMaster()
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
@@ -70,19 +71,31 @@ private[spark] class Client(
}
}
- def connectToMaster() {
+ def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
}
+ }
- context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) {
- if (!registered) {
- logError("All masters are unresponsive! Giving up.")
- markDead()
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ markDead()
+ } else {
+ tryRegisterAllMasters()
+ }
}
- }
+ retryTimer // start timer
}
def changeMaster(url: String) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e13a8cba4a..093ce09b1d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -123,6 +123,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message) // calls postStop()!
logError("Master actor restarted due to exception", reason)
}
@@ -279,7 +280,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+ state)
}
case CheckForWorkerTimeOut => {
@@ -297,14 +299,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
- registerApplication(app)
- app.state = ApplicationState.UNKNOWN
- app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ logInfo("Trying to recover app: " + app.id)
+ try {
+ registerApplication(app)
+ app.state = ApplicationState.UNKNOWN
+ app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
+ }
}
+
for (worker <- storedWorkers) {
- registerWorker(worker)
- worker.state = WorkerState.UNKNOWN
- worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ logInfo("Trying to recover worker: " + worker.id)
+ try {
+ registerWorker(worker)
+ worker.state = WorkerState.UNKNOWN
+ worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
+ }
}
}
@@ -409,7 +422,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers += worker
idToWorker(worker.id) = worker
- actorToWorker(sender) = worker
+ actorToWorker(worker.actor) = worker
addressToWorker(workerAddress) = worker
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 08fe5334cf..74a9f8cd82 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -38,6 +38,8 @@ private[master] object MasterMessages {
case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+ case object CompleteRecovery
+
case object RequestWebUIPort
case class WebUIPortResponse(webUIBoundPort: Int)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index f8e86d633f..065635af85 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -44,10 +44,18 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
}
override def zkSessionCreated() {
- zk.mkdirRecursive(WORKING_DIR)
- myLeaderFile =
- zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
- self ! CheckLeader
+ synchronized {
+ zk.mkdirRecursive(WORKING_DIR)
+ myLeaderFile =
+ zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
+ self ! CheckLeader
+ }
+ }
+
+ override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
+ logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
+ Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+ super.preRestart(reason, message)
}
override def zkDown() {
@@ -75,7 +83,7 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
/** Uses ZK leader election. Navigates several ZK potholes along the way. */
def checkLeader() {
val masters = zk.getChildren(WORKING_DIR).toList
- val leader = masters.sorted.get(0)
+ val leader = masters.sorted.head
val leaderFile = WORKING_DIR + "/" + leader
// Setup a watch for the current leader.
@@ -92,20 +100,25 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
return
}
- val isLeader = myLeaderFile == leaderFile
- if (!isLeader && leaderUrl == masterUrl) {
- // We found a different master file pointing to this process.
- // This can happen in the following two cases:
- // (1) The master process was restarted on the same node.
- // (2) The ZK server died between creating the node and returning the name of the node.
- // For this case, we will end up creating a second file, and MUST explicitly delete the
- // first one, since our ZK session is still open.
- // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
- // leader changes.
- logWarning("Cleaning up old ZK master election file that points to this master.")
- zk.delete(leaderFile)
- } else {
- updateLeadershipStatus(isLeader)
+ // Synchronization used to ensure no interleaving between the creation of a new session and the
+ // checking of a leader, which could cause us to delete our real leader file erroneously.
+ synchronized {
+ val isLeader = myLeaderFile == leaderFile
+ if (!isLeader && leaderUrl == masterUrl) {
+ // We found a different master file pointing to this process.
+ // This can happen in the following two cases:
+ // (1) The master process was restarted on the same node.
+ // (2) The ZK server died between creating the node and returning the name of the node.
+ // For this case, we will end up creating a second file, and MUST explicitly delete the
+ // first one, since our ZK session is still open.
+ // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
+ // leader changes.
+ assert(leaderFile < myLeaderFile)
+ logWarning("Cleaning up old ZK master election file that points to this master.")
+ zk.delete(leaderFile)
+ } else {
+ updateLeadershipStatus(isLeader)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 73fb0c8bd8..25ba75619a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -59,10 +59,12 @@ private[spark] class Worker(
// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0
+ val masterLock: Object = new Object()
var master: ActorRef = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
- var registered = false
+ @volatile var registered = false
+ @volatile var connected = false
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
@@ -102,6 +104,7 @@ private[spark] class Worker(
}
override def preStart() {
+ assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
@@ -117,11 +120,14 @@ private[spark] class Worker(
}
def changeMaster(url: String, uiUrl: String) {
- activeMasterUrl = url
- activeMasterWebUiUrl = uiUrl
- master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ masterLock.synchronized {
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
+ master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ connected = true
+ }
}
def tryRegisterAllMasters() {
@@ -157,8 +163,11 @@ private[spark] class Worker(
logInfo("Successfully registered with master " + masterUrl)
registered = true
changeMaster(masterUrl, masterWebUiUrl)
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
- master ! Heartbeat(workerId)
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+
+ case SendHeartbeat =>
+ masterLock.synchronized {
+ if (connected) { master ! Heartbeat(workerId) }
}
case MasterChanged(masterUrl, masterWebUiUrl) =>
@@ -171,8 +180,10 @@ private[spark] class Worker(
sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
case RegisterWorkerFailed(message) =>
- logError("Worker registration failed: " + message)
- System.exit(1)
+ if (!registered) {
+ logError("Worker registration failed: " + message)
+ System.exit(1)
+ }
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
if (masterUrl != activeMasterUrl) {
@@ -185,11 +196,15 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ }
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
@@ -216,7 +231,13 @@ private[spark] class Worker(
}
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ masterDisconnected()
+
+ case RemoteClientDisconnected(transport, address) if address == master.path.address =>
+ masterDisconnected()
+
+ case RemoteClientShutdown(transport, address) if address == master.path.address =>
masterDisconnected()
case RequestWorkerState => {
@@ -228,6 +249,7 @@ private[spark] class Worker(
def masterDisconnected() {
logError("Connection to master failed! Waiting for master to reconnect...")
+ connected = false
}
def generateWorkerId(): String = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 4346571c4d..c59e1f4de6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
@@ -53,7 +53,8 @@ class JsonProtocolSuite extends FunSuite {
val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
- val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps)
+ val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
+ MasterState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}