aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-04 19:48:47 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-04 19:54:33 -0700
commitdb6f1549406be22f0b7c8ab4425af30602e52283 (patch)
tree0177c1b2481ec680e7d35d5db772e22df2a7e593 /core
parent42d72308fb772bf5dc579c9da174e6057ee86171 (diff)
downloadspark-db6f1549406be22f0b7c8ab4425af30602e52283.tar.gz
spark-db6f1549406be22f0b7c8ab4425af30602e52283.tar.bz2
spark-db6f1549406be22f0b7c8ab4425af30602e52283.zip
Fix race conditions during recovery
One major change was the use of messages instead of raw functions as the parameter of Akka scheduled timers. Since messages are serialized, unlike raw functions, the behavior is easier to think about and doesn't cause race conditions when exceptions are thrown. Another change is to avoid using global pointers that might change without a lock.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/client/Client.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala29
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala51
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala48
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala5
8 files changed, 122 insertions, 52 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 31d1909279..979e65ac6c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -21,6 +21,7 @@ import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
+import org.apache.spark.deploy.master.MasterState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils
@@ -111,7 +112,8 @@ private[deploy] object DeployMessages {
// Master to MasterWebUI
case class MasterStateResponse(host: String, port: Int, workers: Array[WorkerInfo],
- activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo]) {
+ activeApps: Array[ApplicationInfo], completedApps: Array[ApplicationInfo],
+ status: MasterState) {
Utils.checkHost(host, "Required hostname")
assert (port > 0)
@@ -133,4 +135,7 @@ private[deploy] object DeployMessages {
assert (port > 0)
}
+ // Actor System to Worker
+
+ case object SendHeartbeat
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index 87a703427c..f87b885286 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -71,7 +71,8 @@ private[spark] object JsonProtocol {
("memory" -> obj.workers.map(_.memory).sum) ~
("memoryused" -> obj.workers.map(_.memoryUsed).sum) ~
("activeapps" -> obj.activeApps.toList.map(writeApplicationInfo)) ~
- ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo))
+ ("completedapps" -> obj.completedApps.toList.map(writeApplicationInfo)) ~
+ ("status" -> obj.status.toString)
}
def writeWorkerState(obj: WorkerStateResponse) = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
index aa2a10a8ad..198d5cee7b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala
@@ -46,7 +46,8 @@ private[spark] class Client(
listener: ClientListener)
extends Logging {
- val REGISTRATION_TIMEOUT = 60 * 1000
+ val REGISTRATION_TIMEOUT = 20.seconds
+ val REGISTRATION_RETRIES = 3
var actor: ActorRef = null
var appId: String = null
@@ -61,7 +62,7 @@ private[spark] class Client(
override def preStart() {
try {
- connectToMaster()
+ registerWithMaster()
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
@@ -70,19 +71,31 @@ private[spark] class Client(
}
}
- def connectToMaster() {
+ def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorFor(Master.toAkkaUrl(masterUrl))
actor ! RegisterApplication(appDescription)
}
+ }
- context.system.scheduler.scheduleOnce(REGISTRATION_TIMEOUT millis) {
- if (!registered) {
- logError("All masters are unresponsive! Giving up.")
- markDead()
+ def registerWithMaster() {
+ tryRegisterAllMasters()
+
+ var retries = 0
+ lazy val retryTimer: Cancellable =
+ context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
+ retries += 1
+ if (registered) {
+ retryTimer.cancel()
+ } else if (retries >= REGISTRATION_RETRIES) {
+ logError("All masters are unresponsive! Giving up.")
+ markDead()
+ } else {
+ tryRegisterAllMasters()
+ }
}
- }
+ retryTimer // start timer
}
def changeMaster(url: String) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e13a8cba4a..093ce09b1d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -123,6 +123,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
override def preRestart(reason: Throwable, message: Option[Any]) {
+ super.preRestart(reason, message) // calls postStop()!
logError("Master actor restarted due to exception", reason)
}
@@ -279,7 +280,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RequestMasterState => {
- sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray)
+ sender ! MasterStateResponse(host, port, workers.toArray, apps.toArray, completedApps.toArray,
+ state)
}
case CheckForWorkerTimeOut => {
@@ -297,14 +299,25 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def beginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo]) {
for (app <- storedApps) {
- registerApplication(app)
- app.state = ApplicationState.UNKNOWN
- app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ logInfo("Trying to recover app: " + app.id)
+ try {
+ registerApplication(app)
+ app.state = ApplicationState.UNKNOWN
+ app.driver ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("App " + app.id + " had exception on reconnect")
+ }
}
+
for (worker <- storedWorkers) {
- registerWorker(worker)
- worker.state = WorkerState.UNKNOWN
- worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ logInfo("Trying to recover worker: " + worker.id)
+ try {
+ registerWorker(worker)
+ worker.state = WorkerState.UNKNOWN
+ worker.actor ! MasterChanged(masterUrl, masterWebUiUrl)
+ } catch {
+ case e: Exception => logInfo("Worker " + worker.id + " had exception on reconnect")
+ }
}
}
@@ -409,7 +422,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
workers += worker
idToWorker(worker.id) = worker
- actorToWorker(sender) = worker
+ actorToWorker(worker.actor) = worker
addressToWorker(workerAddress) = worker
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
index 08fe5334cf..74a9f8cd82 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/MasterMessages.scala
@@ -38,6 +38,8 @@ private[master] object MasterMessages {
case class BeginRecovery(storedApps: Seq[ApplicationInfo], storedWorkers: Seq[WorkerInfo])
+ case object CompleteRecovery
+
case object RequestWebUIPort
case class WebUIPortResponse(webUIBoundPort: Int)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index f8e86d633f..065635af85 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -44,10 +44,18 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
}
override def zkSessionCreated() {
- zk.mkdirRecursive(WORKING_DIR)
- myLeaderFile =
- zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
- self ! CheckLeader
+ synchronized {
+ zk.mkdirRecursive(WORKING_DIR)
+ myLeaderFile =
+ zk.create(WORKING_DIR + "/master_", masterUrl.getBytes, CreateMode.EPHEMERAL_SEQUENTIAL)
+ self ! CheckLeader
+ }
+ }
+
+ override def preRestart(reason: scala.Throwable, message: scala.Option[scala.Any]) {
+ logError("LeaderElectionAgent failed, waiting " + zk.ZK_TIMEOUT_MILLIS + "...", reason)
+ Thread.sleep(zk.ZK_TIMEOUT_MILLIS)
+ super.preRestart(reason, message)
}
override def zkDown() {
@@ -75,7 +83,7 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
/** Uses ZK leader election. Navigates several ZK potholes along the way. */
def checkLeader() {
val masters = zk.getChildren(WORKING_DIR).toList
- val leader = masters.sorted.get(0)
+ val leader = masters.sorted.head
val leaderFile = WORKING_DIR + "/" + leader
// Setup a watch for the current leader.
@@ -92,20 +100,25 @@ class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
return
}
- val isLeader = myLeaderFile == leaderFile
- if (!isLeader && leaderUrl == masterUrl) {
- // We found a different master file pointing to this process.
- // This can happen in the following two cases:
- // (1) The master process was restarted on the same node.
- // (2) The ZK server died between creating the node and returning the name of the node.
- // For this case, we will end up creating a second file, and MUST explicitly delete the
- // first one, since our ZK session is still open.
- // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
- // leader changes.
- logWarning("Cleaning up old ZK master election file that points to this master.")
- zk.delete(leaderFile)
- } else {
- updateLeadershipStatus(isLeader)
+ // Synchronization used to ensure no interleaving between the creation of a new session and the
+ // checking of a leader, which could cause us to delete our real leader file erroneously.
+ synchronized {
+ val isLeader = myLeaderFile == leaderFile
+ if (!isLeader && leaderUrl == masterUrl) {
+ // We found a different master file pointing to this process.
+ // This can happen in the following two cases:
+ // (1) The master process was restarted on the same node.
+ // (2) The ZK server died between creating the node and returning the name of the node.
+ // For this case, we will end up creating a second file, and MUST explicitly delete the
+ // first one, since our ZK session is still open.
+ // Note that this deletion will cause a NodeDeleted event to be fired so we check again for
+ // leader changes.
+ assert(leaderFile < myLeaderFile)
+ logWarning("Cleaning up old ZK master election file that points to this master.")
+ zk.delete(leaderFile)
+ } else {
+ updateLeadershipStatus(isLeader)
+ }
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 73fb0c8bd8..25ba75619a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -59,10 +59,12 @@ private[spark] class Worker(
// Index into masterUrls that we're currently trying to register with.
var masterIndex = 0
+ val masterLock: Object = new Object()
var master: ActorRef = null
var activeMasterUrl: String = ""
var activeMasterWebUiUrl : String = ""
- var registered = false
+ @volatile var registered = false
+ @volatile var connected = false
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
@@ -102,6 +104,7 @@ private[spark] class Worker(
}
override def preStart() {
+ assert(!registered)
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
host, port, cores, Utils.megabytesToString(memory)))
sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
@@ -117,11 +120,14 @@ private[spark] class Worker(
}
def changeMaster(url: String, uiUrl: String) {
- activeMasterUrl = url
- activeMasterWebUiUrl = uiUrl
- master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
- context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ masterLock.synchronized {
+ activeMasterUrl = url
+ activeMasterWebUiUrl = uiUrl
+ master = context.actorFor(Master.toAkkaUrl(activeMasterUrl))
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ connected = true
+ }
}
def tryRegisterAllMasters() {
@@ -157,8 +163,11 @@ private[spark] class Worker(
logInfo("Successfully registered with master " + masterUrl)
registered = true
changeMaster(masterUrl, masterWebUiUrl)
- context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
- master ! Heartbeat(workerId)
+ context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis, self, SendHeartbeat)
+
+ case SendHeartbeat =>
+ masterLock.synchronized {
+ if (connected) { master ! Heartbeat(workerId) }
}
case MasterChanged(masterUrl, masterWebUiUrl) =>
@@ -171,8 +180,10 @@ private[spark] class Worker(
sender ! WorkerSchedulerStateResponse(workerId, execs.toList)
case RegisterWorkerFailed(message) =>
- logError("Worker registration failed: " + message)
- System.exit(1)
+ if (!registered) {
+ logError("Worker registration failed: " + message)
+ System.exit(1)
+ }
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
if (masterUrl != activeMasterUrl) {
@@ -185,11 +196,15 @@ private[spark] class Worker(
manager.start()
coresUsed += cores_
memoryUsed += memory_
- master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+ }
}
case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
- master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ masterLock.synchronized {
+ master ! ExecutorStateChanged(appId, execId, state, message, exitStatus)
+ }
val fullId = appId + "/" + execId
if (ExecutorState.isFinished(state)) {
val executor = executors(fullId)
@@ -216,7 +231,13 @@ private[spark] class Worker(
}
}
- case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ case Terminated(actor_) if actor_ == master =>
+ masterDisconnected()
+
+ case RemoteClientDisconnected(transport, address) if address == master.path.address =>
+ masterDisconnected()
+
+ case RemoteClientShutdown(transport, address) if address == master.path.address =>
masterDisconnected()
case RequestWorkerState => {
@@ -228,6 +249,7 @@ private[spark] class Worker(
def masterDisconnected() {
logError("Connection to master failed! Waiting for master to reconnect...")
+ connected = false
}
def generateWorkerId(): String = {
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index 4346571c4d..c59e1f4de6 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
@@ -53,7 +53,8 @@ class JsonProtocolSuite extends FunSuite {
val workers = Array[WorkerInfo](createWorkerInfo(), createWorkerInfo())
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
- val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps)
+ val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
+ MasterState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}