aboutsummaryrefslogtreecommitdiff
path: root/core/src
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2012-07-01 01:05:59 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2012-07-01 01:05:59 -0700
commit51c46eaca0b4540a39ba78d0809a30d9b118e629 (patch)
treea31021de1c0720ba875da3e9eb7eb947026697e1 /core/src
parenta6eb9fda61c91b150c045911441623f6e57c737a (diff)
downloadspark-51c46eaca0b4540a39ba78d0809a30d9b118e629.tar.gz
spark-51c46eaca0b4540a39ba78d0809a30d9b118e629.tar.bz2
spark-51c46eaca0b4540a39ba78d0809a30d9b118e629.zip
More work on standalone deploy system.
Diffstat (limited to 'core/src')
-rw-r--r--core/src/main/scala/spark/deploy/Command.scala9
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala30
-rw-r--r--core/src/main/scala/spark/deploy/ExecutorState.scala7
-rw-r--r--core/src/main/scala/spark/deploy/JobDescription.scala12
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala110
-rw-r--r--core/src/main/scala/spark/deploy/client/ClientListener.scala18
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala34
-rw-r--r--core/src/main/scala/spark/deploy/master/ExecutorInfo.scala15
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala25
-rw-r--r--core/src/main/scala/spark/deploy/master/JobState.scala5
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala202
-rw-r--r--core/src/main/scala/spark/deploy/master/WorkerInfo.scala35
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala46
-rw-r--r--core/src/main/scala/spark/util/AkkaUtils.scala2
14 files changed, 465 insertions, 85 deletions
diff --git a/core/src/main/scala/spark/deploy/Command.scala b/core/src/main/scala/spark/deploy/Command.scala
new file mode 100644
index 0000000000..344888919a
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/Command.scala
@@ -0,0 +1,9 @@
+package spark.deploy
+
+import scala.collection.Map
+
+case class Command(
+ mainClass: String,
+ arguments: Seq[String],
+ environment: Map[String, String]) {
+}
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index 4e641157e1..c63f542bb0 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -1,6 +1,30 @@
package spark.deploy
-sealed trait DeployMessage
+sealed trait DeployMessage extends Serializable
-case class RegisterSlave(host: String, port: Int, cores: Int, memory: Int) extends DeployMessage
-case class RegisteredSlave(clusterId: String, slaveId: Int) extends DeployMessage
+// Worker to Master
+
+case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int)
+ extends DeployMessage
+case class ExecutorStateChanged(jobId: String, execId: Int, state: ExecutorState.Value, message: String)
+ extends DeployMessage
+
+// Master to Worker
+
+case object RegisteredWorker extends DeployMessage
+case class RegisterWorkerFailed(message: String) extends DeployMessage
+case class LaunchExecutor(jobId: String, execId: Int, jobDesc: JobDescription) extends DeployMessage
+
+// Client to Master
+
+case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
+
+// Master to Client
+
+case class RegisteredJob(jobId: String) extends DeployMessage
+case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
+case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: String)
+
+// Internal message in Client
+
+case object StopClient \ No newline at end of file
diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala
new file mode 100644
index 0000000000..f26609d8c9
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/ExecutorState.scala
@@ -0,0 +1,7 @@
+package spark.deploy
+
+object ExecutorState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED") {
+ val LAUNCHING, RUNNING, FINISHED, FAILED = Value
+
+ def isFinished(state: Value): Boolean = (state == FINISHED || state == FAILED)
+}
diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala
new file mode 100644
index 0000000000..545bcdcc74
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/JobDescription.scala
@@ -0,0 +1,12 @@
+package spark.deploy
+
+class JobDescription(
+ val name: String,
+ val memoryPerSlave: Int,
+ val cores: Int,
+ val resources: Seq[String],
+ val command: Command)
+ extends Serializable {
+
+ val user = System.getProperty("user.name", "<unknown>")
+}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
new file mode 100644
index 0000000000..f894804993
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -0,0 +1,110 @@
+package spark.deploy.client
+
+import spark.deploy._
+import akka.actor._
+import akka.pattern.ask
+import akka.util.duration._
+import spark.{SparkException, Logging}
+import akka.remote.RemoteClientLifeCycleEvent
+import akka.remote.RemoteClientShutdown
+import spark.deploy.RegisterJob
+import akka.remote.RemoteClientDisconnected
+import akka.actor.Terminated
+import akka.dispatch.Await
+
+/**
+ * The main class used to talk to a Spark deploy cluster. Takes a master URL, a job description,
+ * and a listener for job events, and calls back the listener when various events occur.
+ */
+class Client(
+ actorSystem: ActorSystem,
+ masterUrl: String,
+ jobDescription: JobDescription,
+ listener: ClientListener)
+ extends Logging {
+
+ val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
+
+ var actor: ActorRef = null
+ var jobId: String = null
+
+ if (MASTER_REGEX.unapplySeq(masterUrl) == None) {
+ throw new SparkException("Invalid master URL: " + masterUrl)
+ }
+
+ class ClientActor extends Actor with Logging {
+ var master: ActorRef = null
+ var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times
+
+ override def preStart() {
+ val Seq(masterHost, masterPort) = MASTER_REGEX.unapplySeq(masterUrl).get
+ logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
+ val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
+ try {
+ master = context.actorFor(akkaUrl)
+ //master ! RegisterWorker(ip, port, cores, memory)
+ master ! RegisterJob(jobDescription)
+ context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
+ } catch {
+ case e: Exception =>
+ logError("Failed to connect to master", e)
+ markDisconnected()
+ context.stop(self)
+ }
+ }
+
+ override def receive = {
+ case RegisteredJob(jobId_) =>
+ jobId = jobId_
+ listener.connected(jobId)
+
+ case ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) =>
+ val fullId = jobId + "/" + id
+ logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, workerId, host, cores))
+ listener.executorAdded(fullId, workerId, host, cores, memory)
+
+ case ExecutorUpdated(id, state, message) =>
+ val fullId = jobId + "/" + id
+ val messageText = if (message == null) "" else " (" + message + ")"
+ logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
+ if (ExecutorState.isFinished(state)) {
+ listener.executorRemoved(fullId, message)
+ }
+
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
+ logError("Connection to master failed; stopping client")
+ markDisconnected()
+ context.stop(self)
+
+ case StopClient =>
+ markDisconnected()
+ sender ! true
+ context.stop(self)
+ }
+
+ /**
+ * Notify the listener that we disconnected, if we hadn't already done so before.
+ */
+ def markDisconnected() {
+ if (!alreadyDisconnected) {
+ listener.disconnected()
+ alreadyDisconnected = true
+ }
+ }
+ }
+
+ def start() {
+ // Just launch an actor; it will call back into the listener.
+ actor = actorSystem.actorOf(Props(new ClientActor))
+ }
+
+ def stop() {
+ if (actor != null) {
+ val timeout = 1.seconds
+ val future = actor.ask(StopClient)(timeout)
+ Await.result(future, timeout)
+ actor = null
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/client/ClientListener.scala b/core/src/main/scala/spark/deploy/client/ClientListener.scala
new file mode 100644
index 0000000000..7d23baff32
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/client/ClientListener.scala
@@ -0,0 +1,18 @@
+package spark.deploy.client
+
+/**
+ * Callbacks invoked by deploy client when various events happen. There are currently four events:
+ * connecting to the cluster, disconnecting, being given an executor, and having an executor
+ * removed (either due to failure or due to revocation).
+ *
+ * Users of this API should *not* block inside the callback methods.
+ */
+trait ClientListener {
+ def connected(jobId: String): Unit
+
+ def disconnected(): Unit
+
+ def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int): Unit
+
+ def executorRemoved(id: String, message: String): Unit
+}
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
new file mode 100644
index 0000000000..e6d76f2751
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -0,0 +1,34 @@
+package spark.deploy.client
+
+import spark.util.AkkaUtils
+import spark.{Logging, Utils}
+import spark.deploy.{Command, JobDescription}
+
+object TestClient {
+
+ class TestListener extends ClientListener with Logging {
+ def connected(id: String) {
+ logInfo("Connected to master, got job ID " + id)
+ }
+
+ def disconnected() {
+ logInfo("Disconnected from master")
+ System.exit(0)
+ }
+
+ def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {}
+
+ def executorRemoved(id: String, message: String) {}
+ }
+
+ def main(args: Array[String]) {
+ val url = args(0)
+ val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0)
+ val desc = new JobDescription("TestClient", 200, 1, Seq(),
+ Command("spark.deploy.client.TestExecutor", Seq(), Map()))
+ val listener = new TestListener
+ val client = new Client(actorSystem, url, desc, listener)
+ client.start()
+ actorSystem.awaitTermination()
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
new file mode 100644
index 0000000000..335e00958c
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/ExecutorInfo.scala
@@ -0,0 +1,15 @@
+package spark.deploy.master
+
+import spark.deploy.ExecutorState
+
+class ExecutorInfo(
+ val id: Int,
+ val job: JobInfo,
+ val worker: WorkerInfo,
+ val cores: Int,
+ val memory: Int) {
+
+ var state = ExecutorState.LAUNCHING
+
+ def fullId: String = job.id + "/" + id
+}
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
new file mode 100644
index 0000000000..e8502f0b8f
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -0,0 +1,25 @@
+package spark.deploy.master
+
+import spark.deploy.JobDescription
+import java.util.Date
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
+ var state = JobState.WAITING
+ var executors = new mutable.HashMap[Int, ExecutorInfo]
+
+ var nextExecutorId = 0
+
+ def newExecutorId(): Int = {
+ val id = nextExecutorId
+ nextExecutorId += 1
+ id
+ }
+
+ def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
+ val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
+ executors(exec.id) = exec
+ exec
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/JobState.scala b/core/src/main/scala/spark/deploy/master/JobState.scala
new file mode 100644
index 0000000000..3a69a37aca
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/JobState.scala
@@ -0,0 +1,5 @@
+package spark.deploy.master
+
+object JobState extends Enumeration("WAITING", "RUNNING", "FINISHED", "FAILED") {
+ val WAITING, RUNNING, FINISHED, FAILED = Value
+}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index cb0208e0b6..a21f156a51 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -1,47 +1,40 @@
package spark.deploy.master
-import scala.collection.mutable.HashMap
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import akka.actor._
import spark.{Logging, Utils}
import spark.util.AkkaUtils
import java.text.SimpleDateFormat
import java.util.Date
-import spark.deploy.{RegisteredSlave, RegisterSlave}
-import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
+import akka.remote.RemoteClientLifeCycleEvent
+import spark.deploy._
import akka.remote.RemoteClientShutdown
-import spark.deploy.RegisteredSlave
import akka.remote.RemoteClientDisconnected
+import spark.deploy.RegisterWorker
+import spark.deploy.RegisterWorkerFailed
import akka.actor.Terminated
-import scala.Some
-import spark.deploy.RegisterSlave
-
-class SlaveInfo(
- val id: Int,
- val host: String,
- val port: Int,
- val cores: Int,
- val memory: Int,
- val actor: ActorRef) {
- var coresUsed = 0
- var memoryUsed = 0
-
- def coresFree: Int = cores - coresUsed
-
- def memoryFree: Int = memory - memoryUsed
-}
class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
- val clusterId = newClusterId()
- var nextSlaveId = 0
- var nextJobId = 0
- val slaves = new HashMap[Int, SlaveInfo]
- val actorToSlave = new HashMap[ActorRef, SlaveInfo]
- val addressToSlave = new HashMap[Address, SlaveInfo]
+ val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For job IDs
+
+ var nextJobNumber = 0
+ val workers = new HashSet[WorkerInfo]
+ val idToWorker = new HashMap[String, WorkerInfo]
+ val actorToWorker = new HashMap[ActorRef, WorkerInfo]
+ val addressToWorker = new HashMap[Address, WorkerInfo]
+
+ val jobs = new HashSet[WorkerInfo]
+ val idToJob = new HashMap[String, JobInfo]
+ val actorToJob = new HashMap[ActorRef, JobInfo]
+ val addressToJob = new HashMap[Address, JobInfo]
+
+ val waitingJobs = new ArrayBuffer[JobInfo]
+ val completedJobs = new ArrayBuffer[JobInfo]
override def preStart() {
logInfo("Starting Spark master at spark://" + ip + ":" + port)
- logInfo("Cluster ID: " + clusterId)
+ // Listen for remote client disconnection events, since they don't go through Akka's watch()
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
startWebUi()
}
@@ -58,50 +51,141 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
override def receive = {
- case RegisterSlave(host, slavePort, cores, memory) => {
- logInfo("Registering slave %s:%d with %d cores, %s RAM".format(
- host, slavePort, cores, Utils.memoryMegabytesToString(memory)))
- val slave = addSlave(host, slavePort, cores, memory)
+ case RegisterWorker(id, host, workerPort, cores, memory) => {
+ logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
+ host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
+ if (idToWorker.contains(id)) {
+ sender ! RegisterWorkerFailed("Duplicate worker ID")
+ } else {
+ val worker = addWorker(id, host, workerPort, cores, memory)
+ context.watch(sender) // This doesn't work with remote actors but helps for testing
+ sender ! RegisteredWorker
+ schedule()
+ }
+ }
+
+ case RegisterJob(description) => {
+ logInfo("Registering job " + description.name)
+ val job = addJob(description, sender)
+ logInfo("Registered job " + description.name + " with ID " + job.id)
+ waitingJobs += job
context.watch(sender) // This doesn't work with remote actors but helps for testing
- sender ! RegisteredSlave(clusterId, slave.id)
+ sender ! RegisteredJob(job.id)
+ schedule()
+ }
+
+ case ExecutorStateChanged(jobId, execId, state, message) => {
+ val execOption = idToJob.get(jobId).flatMap(job => job.executors.get(execId))
+ execOption match {
+ case Some(exec) => {
+ exec.state = state
+ exec.job.actor ! ExecutorUpdated(execId, state, message)
+ if (ExecutorState.isFinished(state)) {
+ // Remove this executor from the worker and job
+ logInfo("Removing executor " + exec.fullId)
+ idToJob(jobId).executors -= exec.id
+ exec.worker.removeExecutor(exec)
+ }
+ }
+ case None =>
+ logWarning("Got status update for unknown executor " + jobId + "/" + execId)
+ }
+ }
+
+ case Terminated(actor) => {
+ // The disconnected actor could've been either a worker or a job; remove whichever of
+ // those we have an entry for in the corresponding actor hashmap
+ actorToWorker.get(actor).foreach(removeWorker)
+ actorToJob.get(actor).foreach(removeJob)
}
- case RemoteClientDisconnected(transport, address) =>
- logInfo("Remote client disconnected: " + address)
- addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
+ case RemoteClientDisconnected(transport, address) => {
+ // The disconnected client could've been either a worker or a job; remove whichever it was
+ addressToWorker.get(address).foreach(removeWorker)
+ addressToJob.get(address).foreach(removeJob)
+ }
+
+ case RemoteClientShutdown(transport, address) => {
+ // The disconnected client could've been either a worker or a job; remove whichever it was
+ addressToWorker.get(address).foreach(removeWorker)
+ addressToJob.get(address).foreach(removeJob)
+ }
+ }
+
+ /**
+ * Schedule the currently available resources among waiting jobs. This method will be called
+ * every time a new job joins or resource availability changes.
+ */
+ def schedule() {
+ // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
+ // in order of submission time and launching the first one that fits in the cluster.
+ // It's also not very efficient in terms of algorithmic complexity.
+ for (job <- waitingJobs) {
+ // Figure out how many cores the job could use on the whole cluster
+ val jobMemory = job.desc.memoryPerSlave
+ val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
+ if (usableCores >= job.desc.cores) {
+ // We can launch it! Let's just partition the workers into executors for this job.
+ // TODO: Probably want to spread stuff out across nodes more.
+ var coresLeft = job.desc.cores
+ for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) {
+ val coresToUse = math.min(worker.coresFree, coresLeft)
+ val exec = job.newExecutor(worker, coresToUse)
+ launchExecutor(worker, exec)
+ coresLeft -= coresToUse
+ }
+ }
+ }
+ }
- case RemoteClientShutdown(transport, address) =>
- logInfo("Remote client shutdown: " + address)
- addressToSlave.get(address).foreach(s => removeSlave(s)) // Remove slave, if any, at address
+ def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
+ logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
+ worker.addExecutor(exec)
+ worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc)
+ exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
+ }
- case Terminated(actor) =>
- logInfo("Slave disconnected: " + actor)
- actorToSlave.get(actor).foreach(s => removeSlave(s)) // Remove slave, if any, at actor
+ def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
+ val worker = new WorkerInfo(id, host, port, cores, memory, sender)
+ idToWorker(worker.id) = worker
+ actorToWorker(sender) = worker
+ addressToWorker(sender.path.address) = worker
+ return worker
}
- def addSlave(host: String, slavePort: Int, cores: Int, memory: Int): SlaveInfo = {
- val slave = new SlaveInfo(newSlaveId(), host, slavePort, cores, memory, sender)
- slaves(slave.id) = slave
- actorToSlave(sender) = slave
- addressToSlave(sender.path.address) = slave
- return slave
+ def removeWorker(worker: WorkerInfo) {
+ logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
+ idToWorker -= worker.id
+ actorToWorker -= worker.actor
+ addressToWorker -= worker.actor.path.address
}
- def removeSlave(slave: SlaveInfo) {
- logInfo("Removing slave " + slave.id + " on " + slave.host + ":" + slave.port)
- slaves -= slave.id
- actorToSlave -= slave.actor
- addressToSlave -= slave.actor.path.address
+ def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
+ val date = new Date
+ val job = new JobInfo(newJobId(date), desc, date, actor)
+ idToJob(job.id) = job
+ actorToJob(sender) = job
+ addressToJob(sender.path.address) = job
+ return job
}
- def newClusterId(): String = {
- val date = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date())
- "%s-%04d".format(date, (math.random * 10000).toInt)
+ def removeJob(job: JobInfo) {
+ logInfo("Removing job " + job.id)
+ idToJob -= job.id
+ actorToJob -= job.actor
+ addressToWorker -= job.actor.path.address
+ completedJobs += job // Remember it in our history
+ for (exec <- job.executors.values) {
+
+ }
+ schedule()
}
- def newSlaveId(): Int = {
- nextSlaveId += 1
- nextSlaveId - 1
+ /** Generate a new job ID given a job's submission date */
+ def newJobId(submitDate: Date): String = {
+ val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
+ nextJobNumber += 1
+ jobId
}
}
diff --git a/core/src/main/scala/spark/deploy/master/WorkerInfo.scala b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
new file mode 100644
index 0000000000..af0be108ea
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/master/WorkerInfo.scala
@@ -0,0 +1,35 @@
+package spark.deploy.master
+
+import akka.actor.ActorRef
+import scala.collection.mutable
+
+class WorkerInfo(
+ val id: String,
+ val host: String,
+ val port: Int,
+ val cores: Int,
+ val memory: Int,
+ val actor: ActorRef) {
+
+ var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
+
+ var coresUsed = 0
+ var memoryUsed = 0
+
+ def coresFree: Int = cores - coresUsed
+ def memoryFree: Int = memory - memoryUsed
+
+ def addExecutor(exec: ExecutorInfo) {
+ executors(exec.fullId) = exec
+ coresUsed += exec.cores
+ memoryUsed += exec.memory
+ }
+
+ def removeExecutor(exec: ExecutorInfo) {
+ if (executors.contains(exec.fullId)) {
+ executors -= exec.fullId
+ coresUsed -= exec.cores
+ memoryUsed -= exec.memory
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 22b070658d..1f5854011f 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -2,22 +2,22 @@ package spark.deploy.worker
import akka.actor.{ActorRef, Terminated, Props, Actor}
-import akka.pattern.ask
-import akka.util.duration._
-import spark.{SparkException, Logging, Utils}
-import spark.util.{IntParam, AkkaUtils}
-import spark.deploy.{RegisterSlave, RegisteredSlave}
-import akka.dispatch.Await
+import spark.{Logging, Utils}
+import spark.util.AkkaUtils
+import spark.deploy.{RegisterWorkerFailed, RegisterWorker, RegisteredWorker}
import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import java.text.SimpleDateFormat
+import java.util.Date
class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String)
extends Actor with Logging {
+ val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For worker and executor IDs
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
- var clusterId: String = null
- var slaveId: Int = 0
+
+ val workerId = generateWorkerId()
var coresUsed = 0
var memoryUsed = 0
@@ -34,19 +34,20 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
def connectToMaster() {
masterUrl match {
- case MASTER_REGEX(masterHost, masterPort) =>
+ case MASTER_REGEX(masterHost, masterPort) => {
logInfo("Connecting to master spark://" + masterHost + ":" + masterPort)
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
- master ! RegisterSlave(ip, port, cores, memory)
+ master ! RegisterWorker(workerId, ip, port, cores, memory)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
- context.watch(master) // Doesn't work with remote actors, but useful for testing
+ context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
case e: Exception =>
logError("Failed to connect to master", e)
System.exit(1)
}
+ }
case _ =>
logError("Invalid master URL: " + masterUrl)
@@ -66,26 +67,27 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
}
override def receive = {
- case RegisteredSlave(clusterId_, slaveId_) =>
- this.clusterId = clusterId_
- this.slaveId = slaveId_
- logInfo("Registered with master, cluster ID = " + clusterId + ", slave ID = " + slaveId)
-
- case RemoteClientDisconnected(_, _) =>
- masterDisconnected()
+ case RegisteredWorker =>
+ logInfo("Successfully registered with master")
- case RemoteClientShutdown(_, _) =>
- masterDisconnected()
+ case RegisterWorkerFailed(message) =>
+ logError("Worker registration failed: " + message)
+ System.exit(1)
- case Terminated(_) =>
+ case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
}
def masterDisconnected() {
- // Not sure what to do here exactly, so just shut down for now.
+ // TODO: It would be nice to try to reconnect to the master, but just shut down for now.
+ // (Note that if reconnecting we would also need to assign IDs differently.)
logError("Connection to master failed! Shutting down.")
System.exit(1)
}
+
+ def generateWorkerId(): String = {
+ "worker-%s-%s-%d".format(DATE_FORMAT.format(new Date), ip, port)
+ }
}
object Worker {
diff --git a/core/src/main/scala/spark/util/AkkaUtils.scala b/core/src/main/scala/spark/util/AkkaUtils.scala
index 3cf12ebe0e..57d212e4ca 100644
--- a/core/src/main/scala/spark/util/AkkaUtils.scala
+++ b/core/src/main/scala/spark/util/AkkaUtils.scala
@@ -53,7 +53,7 @@ object AkkaUtils {
val server = actorSystem.actorOf(
Props(new HttpServer(ioWorker, SingletonHandler(rootService))), name = "HttpServer")
actorSystem.registerOnTermination { ioWorker.stop() }
- val timeout = 1.seconds
+ val timeout = 3.seconds
val future = server.ask(HttpServer.Bind(ip, port))(timeout)
try {
Await.result(future, timeout) match {