From 0f070279e7cd224de48333b572d3080b742a82d7 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Sat, 5 Oct 2013 15:15:29 -0700 Subject: Address Matei's comments --- bin/stop-slaves.sh | 2 -- .../org/apache/spark/deploy/DeployMessage.scala | 2 +- .../spark/deploy/master/ApplicationInfo.scala | 6 ++--- .../spark/deploy/master/LeaderElectionAgent.scala | 4 ++-- .../org/apache/spark/deploy/master/Master.scala | 26 +++++++++++----------- .../apache/spark/deploy/master/MasterState.scala | 26 ---------------------- .../spark/deploy/master/PersistenceEngine.scala | 7 +++--- .../apache/spark/deploy/master/RecoveryState.scala | 26 ++++++++++++++++++++++ .../deploy/master/SparkZooKeeperSession.scala | 13 ++++++----- .../apache/spark/deploy/master/WorkerInfo.scala | 6 ++--- .../master/ZooKeeperLeaderElectionAgent.scala | 11 ++++----- .../apache/spark/deploy/JsonProtocolSuite.scala | 4 ++-- 12 files changed, 66 insertions(+), 67 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala create mode 100644 core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh index abf1c7be65..fcb8555d4e 100755 --- a/bin/stop-slaves.sh +++ b/bin/stop-slaves.sh @@ -17,8 +17,6 @@ # limitations under the License. # -# Starts workers on the machine this script is executed on. - bin=`dirname "$0"` bin=`cd "$bin"; pwd` diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 979e65ac6c..275331724a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -21,7 +21,7 @@ import scala.collection.immutable.List import org.apache.spark.deploy.ExecutorState.ExecutorState import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo} -import org.apache.spark.deploy.master.MasterState.MasterState +import org.apache.spark.deploy.master.RecoveryState.MasterState import org.apache.spark.deploy.worker.ExecutorRunner import org.apache.spark.util.Utils diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index 8291e29ec3..5150b7c7de 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -39,14 +39,14 @@ private[spark] class ApplicationInfo( @transient private var nextExecutorId: Int = _ - init + init() private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { state = ApplicationState.WAITING executors = new mutable.HashMap[Int, ExecutorInfo] coresGranted = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala index fc8255fa6f..f25a1ad3bf 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala @@ -29,12 +29,12 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader * [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]] * [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]] */ -trait LeaderElectionAgent extends Actor { +private[spark] trait LeaderElectionAgent extends Actor { val masterActor: ActorRef } /** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */ -class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { +private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent { override def preStart() { masterActor ! ElectedLeader } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 093ce09b1d..cd916672ac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -79,7 +79,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act val masterUrl = "spark://" + host + ":" + port var masterWebUiUrl: String = _ - var state = MasterState.STANDBY + var state = RecoveryState.STANDBY var persistenceEngine: PersistenceEngine = _ @@ -139,13 +139,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case ElectedLeader => { val (storedApps, storedWorkers) = persistenceEngine.readPersistedData() state = if (storedApps.isEmpty && storedWorkers.isEmpty) - MasterState.ALIVE + RecoveryState.ALIVE else - MasterState.RECOVERING + RecoveryState.RECOVERING logInfo("I have been elected leader! New state: " + state) - if (state == MasterState.RECOVERING) { + if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } } @@ -159,7 +159,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") @@ -174,7 +174,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } case RegisterApplication(description) => { - if (state == MasterState.STANDBY) { + if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) @@ -262,21 +262,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // those we have an entry for in the corresponding actor hashmap actorToWorker.get(actor).foreach(removeWorker) actorToApp.get(actor).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientDisconnected(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RemoteClientShutdown(transport, address) => { // The disconnected client could've been either a worker or an app; remove whichever it was addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) - if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() } + if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } } case RequestMasterState => { @@ -324,15 +324,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act def completeRecovery() { // Ensure "only-once" recovery semantics using a short synchronization period. synchronized { - if (state != MasterState.RECOVERING) { return } - state = MasterState.COMPLETING_RECOVERY + if (state != RecoveryState.RECOVERING) { return } + state = RecoveryState.COMPLETING_RECOVERY } // Kill off any workers and apps that didn't respond to us. workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker) apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication) - state = MasterState.ALIVE + state = RecoveryState.ALIVE schedule() logInfo("Recovery complete - resuming operations!") } @@ -351,7 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act * every time a new app joins or resource availability changes. */ def schedule() { - if (state != MasterState.ALIVE) { return } + if (state != RecoveryState.ALIVE) { return } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala deleted file mode 100644 index eec3df3b7a..0000000000 --- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.master - -private[spark] object MasterState - extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { - - type MasterState = Value - - val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value -} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala index 8c4878bd30..94b986caf2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala @@ -23,9 +23,10 @@ package org.apache.spark.deploy.master * - addApplication and addWorker are called before completing registration of a new app/worker. * - removeApplication and removeWorker are called at any time. * Given these two requirements, we will have all apps and workers persisted, but - * we might not have yet deleted apps or workers that finished. + * we might not have yet deleted apps or workers that finished (so their liveness must be verified + * during recovery). */ -trait PersistenceEngine { +private[spark] trait PersistenceEngine { def addApplication(app: ApplicationInfo) def removeApplication(app: ApplicationInfo) @@ -43,7 +44,7 @@ trait PersistenceEngine { def close() {} } -class BlackHolePersistenceEngine extends PersistenceEngine { +private[spark] class BlackHolePersistenceEngine extends PersistenceEngine { override def addApplication(app: ApplicationInfo) {} override def removeApplication(app: ApplicationInfo) {} override def addWorker(worker: WorkerInfo) {} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala new file mode 100644 index 0000000000..b91be821f0 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.master + +private[spark] object RecoveryState + extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") { + + type MasterState = Value + + val STANDBY, ALIVE, RECOVERING, COMPLETING_RECOVERY = Value +} diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala index f43f9f6ed7..81e15c534f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala @@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState * Additionally, all commands sent to ZooKeeper will be retried until they either fail too many * times or a semantic exception is thrown (e.g.., "node already exists"). */ -class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { +private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "") val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE @@ -53,10 +53,13 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { /** Connect to ZooKeeper to start the session. Must be called before anything else. */ def connect() { connectToZooKeeper() - spawn(sessionMonitorThread) + + new Thread() { + override def run() = sessionMonitorThread() + }.start() } - def sessionMonitorThread = { + def sessionMonitorThread(): Unit = { while (!closed) { Thread.sleep(ZK_CHECK_PERIOD_MILLIS) if (zk.getState != ZooKeeper.States.CONNECTED) { @@ -170,7 +173,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { * * @param fn Block to execute, possibly multiple times. */ - def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = { + def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = { try { fn } catch { @@ -179,7 +182,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging { case e if n > 0 => logError("ZooKeeper exception, " + n + " more retries...", e) Thread.sleep(RETRY_WAIT_MILLIS) - retry(fn)(n-1) + retry(fn, n-1) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala index 26090c6a9c..e05f587b58 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala @@ -42,17 +42,17 @@ private[spark] class WorkerInfo( @transient var lastHeartbeat: Long = _ - init + init() def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed private def readObject(in: java.io.ObjectInputStream) : Unit = { in.defaultReadObject() - init + init() } - private def init = { + private def init() { executors = new mutable.HashMap state = WorkerState.ALIVE coresUsed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala index 065635af85..7809013e83 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala @@ -17,17 +17,14 @@ package org.apache.spark.deploy.master -import scala.collection.JavaConversions._ - -import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership} -import org.apache.spark.Logging +import akka.actor.ActorRef import org.apache.zookeeper._ import org.apache.zookeeper.Watcher.Event.EventType -import akka.actor.{Cancellable, ActorRef} -import akka.util.duration._ +import org.apache.spark.deploy.master.MasterMessages._ +import org.apache.spark.Logging -class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) +private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String) extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging { val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election" diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala index c59e1f4de6..0b38e239f9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala @@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue import org.scalatest.FunSuite import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse} -import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo} +import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo} import org.apache.spark.deploy.worker.ExecutorRunner class JsonProtocolSuite extends FunSuite { @@ -54,7 +54,7 @@ class JsonProtocolSuite extends FunSuite { val activeApps = Array[ApplicationInfo](createAppInfo()) val completedApps = Array[ApplicationInfo]() val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps, - MasterState.ALIVE) + RecoveryState.ALIVE) val output = JsonProtocol.writeMasterState(stateResponse) assertValidJson(output) } -- cgit v1.2.3