aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2013-10-05 15:15:29 -0700
committerAaron Davidson <aaron@databricks.com>2013-10-05 15:15:29 -0700
commit0f070279e7cd224de48333b572d3080b742a82d7 (patch)
tree8a28404dee6cc1bec92d7bbc4caadaff31dbec70
parentdb6f1549406be22f0b7c8ab4425af30602e52283 (diff)
downloadspark-0f070279e7cd224de48333b572d3080b742a82d7.tar.gz
spark-0f070279e7cd224de48333b572d3080b742a82d7.tar.bz2
spark-0f070279e7cd224de48333b572d3080b742a82d7.zip
Address Matei's comments
-rwxr-xr-xbin/stop-slaves.sh2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala26
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala (renamed from core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala)2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala13
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala11
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala4
11 files changed, 41 insertions, 42 deletions
diff --git a/bin/stop-slaves.sh b/bin/stop-slaves.sh
index abf1c7be65..fcb8555d4e 100755
--- a/bin/stop-slaves.sh
+++ b/bin/stop-slaves.sh
@@ -17,8 +17,6 @@
# limitations under the License.
#
-# Starts workers on the machine this script is executed on.
-
bin=`dirname "$0"`
bin=`cd "$bin"; pwd`
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 979e65ac6c..275331724a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -21,7 +21,7 @@ import scala.collection.immutable.List
import org.apache.spark.deploy.ExecutorState.ExecutorState
import org.apache.spark.deploy.master.{WorkerInfo, ApplicationInfo}
-import org.apache.spark.deploy.master.MasterState.MasterState
+import org.apache.spark.deploy.master.RecoveryState.MasterState
import org.apache.spark.deploy.worker.ExecutorRunner
import org.apache.spark.util.Utils
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index 8291e29ec3..5150b7c7de 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -39,14 +39,14 @@ private[spark] class ApplicationInfo(
@transient private var nextExecutorId: Int = _
- init
+ init()
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
- init
+ init()
}
- private def init = {
+ private def init() {
state = ApplicationState.WAITING
executors = new mutable.HashMap[Int, ExecutorInfo]
coresGranted = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
index fc8255fa6f..f25a1ad3bf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/LeaderElectionAgent.scala
@@ -29,12 +29,12 @@ import org.apache.spark.deploy.master.MasterMessages.ElectedLeader
* [[org.apache.spark.deploy.master.MasterMessages.ElectedLeader ElectedLeader]]
* [[org.apache.spark.deploy.master.MasterMessages.RevokedLeadership RevokedLeadership]]
*/
-trait LeaderElectionAgent extends Actor {
+private[spark] trait LeaderElectionAgent extends Actor {
val masterActor: ActorRef
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */
-class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
+private[spark] class MonarchyLeaderAgent(val masterActor: ActorRef) extends LeaderElectionAgent {
override def preStart() {
masterActor ! ElectedLeader
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 093ce09b1d..cd916672ac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -79,7 +79,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
val masterUrl = "spark://" + host + ":" + port
var masterWebUiUrl: String = _
- var state = MasterState.STANDBY
+ var state = RecoveryState.STANDBY
var persistenceEngine: PersistenceEngine = _
@@ -139,13 +139,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case ElectedLeader => {
val (storedApps, storedWorkers) = persistenceEngine.readPersistedData()
state = if (storedApps.isEmpty && storedWorkers.isEmpty)
- MasterState.ALIVE
+ RecoveryState.ALIVE
else
- MasterState.RECOVERING
+ RecoveryState.RECOVERING
logInfo("I have been elected leader! New state: " + state)
- if (state == MasterState.RECOVERING) {
+ if (state == RecoveryState.RECOVERING) {
beginRecovery(storedApps, storedWorkers)
context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() }
}
@@ -159,7 +159,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
case RegisterWorker(id, host, workerPort, cores, memory, webUiPort, publicAddress) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.megabytesToString(memory)))
- if (state == MasterState.STANDBY) {
+ if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
@@ -174,7 +174,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case RegisterApplication(description) => {
- if (state == MasterState.STANDBY) {
+ if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
@@ -262,21 +262,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
// those we have an entry for in the corresponding actor hashmap
actorToWorker.get(actor).foreach(removeWorker)
actorToApp.get(actor).foreach(finishApplication)
- if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientDisconnected(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
- if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RemoteClientShutdown(transport, address) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
- if (state == MasterState.RECOVERING && canCompleteRecovery) { completeRecovery() }
+ if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
case RequestMasterState => {
@@ -324,15 +324,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
def completeRecovery() {
// Ensure "only-once" recovery semantics using a short synchronization period.
synchronized {
- if (state != MasterState.RECOVERING) { return }
- state = MasterState.COMPLETING_RECOVERY
+ if (state != RecoveryState.RECOVERING) { return }
+ state = RecoveryState.COMPLETING_RECOVERY
}
// Kill off any workers and apps that didn't respond to us.
workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
- state = MasterState.ALIVE
+ state = RecoveryState.ALIVE
schedule()
logInfo("Recovery complete - resuming operations!")
}
@@ -351,7 +351,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
* every time a new app joins or resource availability changes.
*/
def schedule() {
- if (state != MasterState.ALIVE) { return }
+ if (state != RecoveryState.ALIVE) { return }
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
index 8c4878bd30..94b986caf2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/PersistenceEngine.scala
@@ -23,9 +23,10 @@ package org.apache.spark.deploy.master
* - addApplication and addWorker are called before completing registration of a new app/worker.
* - removeApplication and removeWorker are called at any time.
* Given these two requirements, we will have all apps and workers persisted, but
- * we might not have yet deleted apps or workers that finished.
+ * we might not have yet deleted apps or workers that finished (so their liveness must be verified
+ * during recovery).
*/
-trait PersistenceEngine {
+private[spark] trait PersistenceEngine {
def addApplication(app: ApplicationInfo)
def removeApplication(app: ApplicationInfo)
@@ -43,7 +44,7 @@ trait PersistenceEngine {
def close() {}
}
-class BlackHolePersistenceEngine extends PersistenceEngine {
+private[spark] class BlackHolePersistenceEngine extends PersistenceEngine {
override def addApplication(app: ApplicationInfo) {}
override def removeApplication(app: ApplicationInfo) {}
override def addWorker(worker: WorkerInfo) {}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
index eec3df3b7a..b91be821f0 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/MasterState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/RecoveryState.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.master
-private[spark] object MasterState
+private[spark] object RecoveryState
extends Enumeration("STANDBY", "ALIVE", "RECOVERING", "COMPLETING_RECOVERY") {
type MasterState = Value
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
index f43f9f6ed7..81e15c534f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/SparkZooKeeperSession.scala
@@ -35,7 +35,7 @@ import org.apache.zookeeper.Watcher.Event.KeeperState
* Additionally, all commands sent to ZooKeeper will be retried until they either fail too many
* times or a semantic exception is thrown (e.g.., "node already exists").
*/
-class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
+private[spark] class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
val ZK_URL = System.getProperty("spark.deploy.zookeeper.url", "")
val ZK_ACL = ZooDefs.Ids.OPEN_ACL_UNSAFE
@@ -53,10 +53,13 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
/** Connect to ZooKeeper to start the session. Must be called before anything else. */
def connect() {
connectToZooKeeper()
- spawn(sessionMonitorThread)
+
+ new Thread() {
+ override def run() = sessionMonitorThread()
+ }.start()
}
- def sessionMonitorThread = {
+ def sessionMonitorThread(): Unit = {
while (!closed) {
Thread.sleep(ZK_CHECK_PERIOD_MILLIS)
if (zk.getState != ZooKeeper.States.CONNECTED) {
@@ -170,7 +173,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
*
* @param fn Block to execute, possibly multiple times.
*/
- def retry[T](fn: => T)(implicit n: Int = MAX_RECONNECT_ATTEMPTS): T = {
+ def retry[T](fn: => T, n: Int = MAX_RECONNECT_ATTEMPTS): T = {
try {
fn
} catch {
@@ -179,7 +182,7 @@ class SparkZooKeeperSession(zkWatcher: SparkZooKeeperWatcher) extends Logging {
case e if n > 0 =>
logError("ZooKeeper exception, " + n + " more retries...", e)
Thread.sleep(RETRY_WAIT_MILLIS)
- retry(fn)(n-1)
+ retry(fn, n-1)
}
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
index 26090c6a9c..e05f587b58 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/WorkerInfo.scala
@@ -42,17 +42,17 @@ private[spark] class WorkerInfo(
@transient var lastHeartbeat: Long = _
- init
+ init()
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
private def readObject(in: java.io.ObjectInputStream) : Unit = {
in.defaultReadObject()
- init
+ init()
}
- private def init = {
+ private def init() {
executors = new mutable.HashMap
state = WorkerState.ALIVE
coresUsed = 0
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
index 065635af85..7809013e83 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperLeaderElectionAgent.scala
@@ -17,17 +17,14 @@
package org.apache.spark.deploy.master
-import scala.collection.JavaConversions._
-
-import org.apache.spark.deploy.master.MasterMessages.{CheckLeader, ElectedLeader, RevokedLeadership}
-import org.apache.spark.Logging
+import akka.actor.ActorRef
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event.EventType
-import akka.actor.{Cancellable, ActorRef}
-import akka.util.duration._
+import org.apache.spark.deploy.master.MasterMessages._
+import org.apache.spark.Logging
-class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
+private[spark] class ZooKeeperLeaderElectionAgent(val masterActor: ActorRef, masterUrl: String)
extends LeaderElectionAgent with SparkZooKeeperWatcher with Logging {
val WORKING_DIR = System.getProperty("spark.deploy.zookeeper.dir", "/spark") + "/leader_election"
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index c59e1f4de6..0b38e239f9 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -25,7 +25,7 @@ import net.liftweb.json.JsonAST.JValue
import org.scalatest.FunSuite
import org.apache.spark.deploy.DeployMessages.{MasterStateResponse, WorkerStateResponse}
-import org.apache.spark.deploy.master.{ApplicationInfo, MasterState, WorkerInfo}
+import org.apache.spark.deploy.master.{ApplicationInfo, RecoveryState, WorkerInfo}
import org.apache.spark.deploy.worker.ExecutorRunner
class JsonProtocolSuite extends FunSuite {
@@ -54,7 +54,7 @@ class JsonProtocolSuite extends FunSuite {
val activeApps = Array[ApplicationInfo](createAppInfo())
val completedApps = Array[ApplicationInfo]()
val stateResponse = new MasterStateResponse("host", 8080, workers, activeApps, completedApps,
- MasterState.ALIVE)
+ RecoveryState.ALIVE)
val output = JsonProtocol.writeMasterState(stateResponse)
assertValidJson(output)
}