aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/PipedRDD.scala9
-rw-r--r--core/src/main/scala/spark/Utils.scala46
-rw-r--r--core/src/main/scala/spark/deploy/DeployMessage.scala22
-rw-r--r--core/src/main/scala/spark/deploy/ExecutorState.scala8
-rw-r--r--core/src/main/scala/spark/deploy/JobDescription.scala6
-rw-r--r--core/src/main/scala/spark/deploy/client/Client.scala4
-rw-r--r--core/src/main/scala/spark/deploy/client/TestClient.scala2
-rw-r--r--core/src/main/scala/spark/deploy/client/TestExecutor.scala7
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala28
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala146
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala53
11 files changed, 298 insertions, 33 deletions
diff --git a/core/src/main/scala/spark/PipedRDD.scala b/core/src/main/scala/spark/PipedRDD.scala
index 9e0a01b5f9..4fcfd869cf 100644
--- a/core/src/main/scala/spark/PipedRDD.scala
+++ b/core/src/main/scala/spark/PipedRDD.scala
@@ -30,7 +30,7 @@ class PipedRDD[T: ClassManifest](
val pb = new ProcessBuilder(command)
// Add the environmental variables to the process.
val currentEnvVars = pb.environment()
- envVars.foreach { case(variable, value) => currentEnvVars.put(variable, value) }
+ envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) }
val proc = pb.start()
val env = SparkEnv.get
@@ -38,7 +38,7 @@ class PipedRDD[T: ClassManifest](
// Start a thread to print the process's stderr to ours
new Thread("stderr reader for " + command) {
override def run() {
- for(line <- Source.fromInputStream(proc.getErrorStream).getLines) {
+ for (line <- Source.fromInputStream(proc.getErrorStream).getLines) {
System.err.println(line)
}
}
@@ -49,7 +49,7 @@ class PipedRDD[T: ClassManifest](
override def run() {
SparkEnv.set(env)
val out = new PrintWriter(proc.getOutputStream)
- for(elem <- parent.iterator(split)) {
+ for (elem <- parent.iterator(split)) {
out.println(elem)
}
out.close()
@@ -66,8 +66,9 @@ object PipedRDD {
def tokenize(command: String): Seq[String] = {
val buf = new ArrayBuffer[String]
val tok = new StringTokenizer(command)
- while(tok.hasMoreElements)
+ while(tok.hasMoreElements) {
buf += tok.nextToken()
+ }
buf
}
}
diff --git a/core/src/main/scala/spark/Utils.scala b/core/src/main/scala/spark/Utils.scala
index 95c833ea5b..674ff9e298 100644
--- a/core/src/main/scala/spark/Utils.scala
+++ b/core/src/main/scala/spark/Utils.scala
@@ -7,6 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import java.util.{Locale, UUID}
+import scala.io.Source
/**
* Various utility methods used by Spark.
@@ -39,6 +40,7 @@ object Utils {
(c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')
}
+ /** Split a string into words at non-alphabetic characters */
def splitWords(s: String): Seq[String] = {
val buf = new ArrayBuffer[String]
var i = 0
@@ -58,7 +60,7 @@ object Utils {
return buf
}
- // Create a temporary directory inside the given parent directory
+ /** Create a temporary directory inside the given parent directory */
def createTempDir(root: String = System.getProperty("java.io.tmpdir")): File = {
var attempts = 0
val maxAttempts = 10
@@ -85,7 +87,7 @@ object Utils {
return dir
}
- // Copy all data from an InputStream to an OutputStream
+ /** Copy all data from an InputStream to an OutputStream */
def copyStream(in: InputStream,
out: OutputStream,
closeStreams: Boolean = false)
@@ -104,9 +106,11 @@ object Utils {
}
}
- // Shuffle the elements of a collection into a random order, returning the
- // result in a new collection. Unlike scala.util.Random.shuffle, this method
- // uses a local random number generator, avoiding inter-thread contention.
+ /**
+ * Shuffle the elements of a collection into a random order, returning the
+ * result in a new collection. Unlike scala.util.Random.shuffle, this method
+ * uses a local random number generator, avoiding inter-thread contention.
+ */
def randomize[T](seq: TraversableOnce[T]): Seq[T] = {
val buf = new ArrayBuffer[T]()
buf ++= seq
@@ -136,7 +140,7 @@ object Utils {
}
/**
- * Get the local machine's hostname
+ * Get the local machine's hostname.
*/
def localHostName(): String = {
customHostname.getOrElse(InetAddress.getLocalHost.getHostName)
@@ -250,4 +254,34 @@ object Utils {
def memoryMegabytesToString(megabytes: Long): String = {
memoryBytesToString(megabytes * 1024L * 1024L)
}
+
+ /**
+ * Execute a command in the given working directory, throwing an exception if it completes
+ * with an exit code other than 0.
+ */
+ def execute(command: Seq[String], workingDir: File) {
+ val process = new ProcessBuilder(command: _*)
+ .directory(workingDir)
+ .redirectErrorStream(true)
+ .start()
+ new Thread("read stdout for " + command(0)) {
+ override def run() {
+ for (line <- Source.fromInputStream(process.getInputStream).getLines) {
+ System.err.println(line)
+ }
+ }
+ }.start()
+ val exitCode = process.waitFor()
+ if (exitCode != 0) {
+ throw new SparkException("Process " + command + " exited with code " + exitCode)
+ }
+ }
+
+ /**
+ * Execute a command in the current working directory, throwing an exception if it completes
+ * with an exit code other than 0.
+ */
+ def execute(command: Seq[String]) {
+ execute(command, new File("."))
+ }
}
diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala
index c63f542bb0..14492ed552 100644
--- a/core/src/main/scala/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/spark/deploy/DeployMessage.scala
@@ -6,14 +6,29 @@ sealed trait DeployMessage extends Serializable
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int)
extends DeployMessage
-case class ExecutorStateChanged(jobId: String, execId: Int, state: ExecutorState.Value, message: String)
+
+case class ExecutorStateChanged(
+ jobId: String,
+ execId: Int,
+ state:
+ ExecutorState.Value,
+ message: Option[String])
extends DeployMessage
// Master to Worker
case object RegisteredWorker extends DeployMessage
case class RegisterWorkerFailed(message: String) extends DeployMessage
-case class LaunchExecutor(jobId: String, execId: Int, jobDesc: JobDescription) extends DeployMessage
+case class KillExecutor(jobId: String, execId: Int) extends DeployMessage
+
+case class LaunchExecutor(
+ jobId: String,
+ execId: Int,
+ jobDesc: JobDescription,
+ cores: Int,
+ memory: Int)
+ extends DeployMessage
+
// Client to Master
@@ -23,7 +38,8 @@ case class RegisterJob(jobDescription: JobDescription) extends DeployMessage
case class RegisteredJob(jobId: String) extends DeployMessage
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
-case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: String)
+case class ExecutorUpdated(id: Int, state: ExecutorState.Value, message: Option[String])
+case class JobKilled(message: String)
// Internal message in Client
diff --git a/core/src/main/scala/spark/deploy/ExecutorState.scala b/core/src/main/scala/spark/deploy/ExecutorState.scala
index f26609d8c9..ea73f7be29 100644
--- a/core/src/main/scala/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/spark/deploy/ExecutorState.scala
@@ -1,7 +1,9 @@
package spark.deploy
-object ExecutorState extends Enumeration("LAUNCHING", "RUNNING", "FINISHED", "FAILED") {
- val LAUNCHING, RUNNING, FINISHED, FAILED = Value
+object ExecutorState
+ extends Enumeration("LAUNCHING", "LOADING", "RUNNING", "KILLED", "FAILED", "LOST") {
- def isFinished(state: Value): Boolean = (state == FINISHED || state == FAILED)
+ val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
+
+ def isFinished(state: Value): Boolean = (state == KILLED || state == FAILED || state == LOST)
}
diff --git a/core/src/main/scala/spark/deploy/JobDescription.scala b/core/src/main/scala/spark/deploy/JobDescription.scala
index 545bcdcc74..3f91402a31 100644
--- a/core/src/main/scala/spark/deploy/JobDescription.scala
+++ b/core/src/main/scala/spark/deploy/JobDescription.scala
@@ -2,11 +2,13 @@ package spark.deploy
class JobDescription(
val name: String,
- val memoryPerSlave: Int,
val cores: Int,
- val resources: Seq[String],
+ val memoryPerSlave: Int,
+ val fileUrls: Seq[String],
val command: Command)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
+
+ override def toString: String = "JobDescription(" + name + ")"
}
diff --git a/core/src/main/scala/spark/deploy/client/Client.scala b/core/src/main/scala/spark/deploy/client/Client.scala
index f894804993..c7fa8a3874 100644
--- a/core/src/main/scala/spark/deploy/client/Client.scala
+++ b/core/src/main/scala/spark/deploy/client/Client.scala
@@ -66,10 +66,10 @@ class Client(
case ExecutorUpdated(id, state, message) =>
val fullId = jobId + "/" + id
- val messageText = if (message == null) "" else " (" + message + ")"
+ val messageText = message.map(s => " (" + s + ")").getOrElse("")
logInfo("Executor updated: %s is now %s%s".format(fullId, state, messageText))
if (ExecutorState.isFinished(state)) {
- listener.executorRemoved(fullId, message)
+ listener.executorRemoved(fullId, message.getOrElse(""))
}
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
diff --git a/core/src/main/scala/spark/deploy/client/TestClient.scala b/core/src/main/scala/spark/deploy/client/TestClient.scala
index e6d76f2751..b04f362997 100644
--- a/core/src/main/scala/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/spark/deploy/client/TestClient.scala
@@ -24,7 +24,7 @@ object TestClient {
def main(args: Array[String]) {
val url = args(0)
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress(), 0)
- val desc = new JobDescription("TestClient", 200, 1, Seq(),
+ val desc = new JobDescription("TestClient", 1, 512, Seq(),
Command("spark.deploy.client.TestExecutor", Seq(), Map()))
val listener = new TestListener
val client = new Client(actorSystem, url, desc, listener)
diff --git a/core/src/main/scala/spark/deploy/client/TestExecutor.scala b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
new file mode 100644
index 0000000000..1a74cc03cf
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/client/TestExecutor.scala
@@ -0,0 +1,7 @@
+package spark.deploy.client
+
+object TestExecutor {
+ def main(args: Array[String]) {
+ println("Hello world!")
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index a21f156a51..89de3b1827 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -24,7 +24,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
val actorToWorker = new HashMap[ActorRef, WorkerInfo]
val addressToWorker = new HashMap[Address, WorkerInfo]
- val jobs = new HashSet[WorkerInfo]
+ val jobs = new HashSet[JobInfo]
val idToJob = new HashMap[String, JobInfo]
val actorToJob = new HashMap[ActorRef, JobInfo]
val addressToJob = new HashMap[Address, JobInfo]
@@ -57,7 +57,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
- val worker = addWorker(id, host, workerPort, cores, memory)
+ addWorker(id, host, workerPort, cores, memory)
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker
schedule()
@@ -82,9 +82,11 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.job.actor ! ExecutorUpdated(execId, state, message)
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and job
- logInfo("Removing executor " + exec.fullId)
+ logInfo("Removing executor " + exec.fullId + " because it is " + state)
idToJob(jobId).executors -= exec.id
exec.worker.removeExecutor(exec)
+ // TODO: the worker would probably want to restart the executor a few times
+ schedule()
}
}
case None =>
@@ -120,10 +122,12 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
// Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
// in order of submission time and launching the first one that fits in the cluster.
// It's also not very efficient in terms of algorithmic complexity.
- for (job <- waitingJobs) {
+ for (job <- waitingJobs.clone()) {
+ logInfo("Trying to schedule job " + job.id)
// Figure out how many cores the job could use on the whole cluster
val jobMemory = job.desc.memoryPerSlave
val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
+ logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores)
if (usableCores >= job.desc.cores) {
// We can launch it! Let's just partition the workers into executors for this job.
// TODO: Probably want to spread stuff out across nodes more.
@@ -134,6 +138,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
launchExecutor(worker, exec)
coresLeft -= coresToUse
}
+ waitingJobs -= job
}
}
}
@@ -141,12 +146,13 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
- worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc)
+ worker.actor ! LaunchExecutor(exec.job.id, exec.id, exec.job.desc, exec.cores, exec.memory)
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender)
+ workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker
addressToWorker(sender.path.address) = worker
@@ -155,14 +161,20 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def removeWorker(worker: WorkerInfo) {
logInfo("Removing worker " + worker.id + " on " + worker.host + ":" + worker.port)
+ workers -= worker
idToWorker -= worker.id
actorToWorker -= worker.actor
addressToWorker -= worker.actor.path.address
+ for (exec <- worker.executors.values) {
+ exec.job.actor ! ExecutorStateChanged(exec.job.id, exec.id, ExecutorState.LOST, None)
+ exec.job.executors -= exec.id
+ }
}
def addJob(desc: JobDescription, actor: ActorRef): JobInfo = {
val date = new Date
val job = new JobInfo(newJobId(date), desc, date, actor)
+ jobs += job
idToJob(job.id) = job
actorToJob(sender) = job
addressToJob(sender.path.address) = job
@@ -171,19 +183,21 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
def removeJob(job: JobInfo) {
logInfo("Removing job " + job.id)
+ jobs -= job
idToJob -= job.id
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
for (exec <- job.executors.values) {
-
+ exec.worker.removeExecutor(exec)
+ exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
}
schedule()
}
/** Generate a new job ID given a job's submission date */
def newJobId(submitDate: Date): String = {
- val jobId = "job-%s-%4d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
+ val jobId = "job-%s-%04d".format(DATE_FORMAT.format(submitDate), nextJobNumber)
nextJobNumber += 1
jobId
}
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
new file mode 100644
index 0000000000..ec58f576e7
--- /dev/null
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -0,0 +1,146 @@
+package spark.deploy.worker
+
+import java.io._
+import spark.deploy.{ExecutorState, ExecutorStateChanged, JobDescription}
+import akka.actor.ActorRef
+import spark.{Utils, Logging}
+import java.net.{URI, URL}
+import org.apache.hadoop.fs.{Path, FileSystem}
+import org.apache.hadoop.conf.Configuration
+import scala.Some
+import spark.deploy.ExecutorStateChanged
+
+class ExecutorRunner(
+ jobId: String,
+ execId: Int,
+ jobDesc: JobDescription,
+ cores: Int,
+ memory: Int,
+ worker: ActorRef,
+ sparkHome: File,
+ workDir: File)
+ extends Logging {
+
+ val fullId = jobId + "/" + execId
+ var workerThread: Thread = null
+ var process: Process = null
+
+ def start() {
+ workerThread = new Thread("ExecutorRunner for " + fullId) {
+ override def run() { fetchAndRunExecutor() }
+ }
+ workerThread.start()
+ }
+
+ /** Stop this executor runner, including killing the process it launched */
+ def kill() {
+ if (workerThread != null) {
+ workerThread.interrupt()
+ workerThread = null
+ }
+ }
+
+ /**
+ * Download a file requested by the executor. Supports fetching the file in a variety of ways,
+ * including HTTP, HDFS and files on a standard filesystem, based on the URL parameter.
+ */
+ def fetchFile(url: String, targetDir: File) {
+ val filename = url.split("/").last
+ val targetFile = new File(targetDir, filename)
+ if (url.startsWith("http://") || url.startsWith("https://") || url.startsWith("ftp://")) {
+ // Use the java.net library to fetch it
+ logInfo("Fetching " + url + " to " + targetFile)
+ val in = new URL(url).openStream()
+ val out = new FileOutputStream(targetFile)
+ Utils.copyStream(in, out, true)
+ } else {
+ // Use the Hadoop filesystem library, which supports file://, hdfs://, s3://, and others
+ val uri = new URI(url)
+ val conf = new Configuration()
+ val fs = FileSystem.get(uri, conf)
+ val in = fs.open(new Path(uri))
+ val out = new FileOutputStream(targetFile)
+ Utils.copyStream(in, out, true)
+ }
+ // Decompress the file if it's a .tar or .tar.gz
+ if (filename.endsWith(".tar.gz") || filename.endsWith(".tgz")) {
+ logInfo("Untarring " + filename)
+ Utils.execute(Seq("tar", "-xzf", filename), targetDir)
+ } else if (filename.endsWith(".tar")) {
+ logInfo("Untarring " + filename)
+ Utils.execute(Seq("tar", "-xf", filename), targetDir)
+ }
+ }
+
+ def buildCommandSeq(): Seq[String] = {
+ val command = jobDesc.command
+ val runScript = new File(sparkHome, "run").getCanonicalPath
+ Seq(runScript, command.mainClass) ++ command.arguments
+ }
+
+ /** Spawn a thread that will redirect a given stream to a file */
+ def redirectStream(in: InputStream, file: File) {
+ val out = new FileOutputStream(file)
+ new Thread("redirect output to " + file) {
+ override def run() {
+ Utils.copyStream(in, out, true)
+ }
+ }.start()
+ }
+
+ /**
+ * Download and run the executor described in our JobDescription
+ */
+ def fetchAndRunExecutor() {
+ try {
+ // Create the executor's working directory
+ val executorDir = new File(workDir, jobId + "/" + execId)
+ if (!executorDir.mkdirs()) {
+ throw new IOException("Failed to create directory " + executorDir)
+ }
+
+ // Download the files it depends on into it (disabled for now)
+ //for (url <- jobDesc.fileUrls) {
+ // fetchFile(url, executorDir)
+ //}
+
+ // Launch the process
+ val command = buildCommandSeq()
+ val builder = new ProcessBuilder(command: _*).directory(executorDir)
+ val env = builder.environment()
+ for ((key, value) <- jobDesc.command.environment) {
+ env.put(key, value)
+ }
+ env.put("SPARK_CORES", cores.toString)
+ env.put("SPARK_MEMORY", memory.toString)
+ process = builder.start()
+
+ // Redirect its stdout and stderr to files
+ redirectStream(process.getInputStream, new File(executorDir, "stdout"))
+ redirectStream(process.getErrorStream, new File(executorDir, "stderr"))
+
+ // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
+ // long-lived processes only. However, in the future, we might restart the executor a few
+ // times on the same machine.
+ val exitCode = process.waitFor()
+ val message = "Command exited with code " + exitCode
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ } catch {
+ case interrupted: InterruptedException =>
+ logInfo("Runner thread interrupted -- killing executor " + fullId)
+ if (process != null) {
+ process.destroy()
+ }
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+
+ case e: Exception => {
+ logError("Error running executor", e)
+ if (process != null) {
+ process.destroy()
+ }
+ val message = e.getClass + ": " + e.getMessage
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
+ }
+ }
+ }
+}
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index 1f5854011f..e5da181e9a 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -1,13 +1,20 @@
package spark.deploy.worker
-
-import akka.actor.{ActorRef, Terminated, Props, Actor}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
+import akka.actor.{ActorRef, Props, Actor}
import spark.{Logging, Utils}
import spark.util.AkkaUtils
-import spark.deploy.{RegisterWorkerFailed, RegisterWorker, RegisteredWorker}
-import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClientLifeCycleEvent}
+import spark.deploy._
+import akka.remote.RemoteClientLifeCycleEvent
import java.text.SimpleDateFormat
import java.util.Date
+import akka.remote.RemoteClientShutdown
+import akka.remote.RemoteClientDisconnected
+import spark.deploy.RegisterWorker
+import spark.deploy.LaunchExecutor
+import spark.deploy.RegisterWorkerFailed
+import akka.actor.Terminated
+import java.io.File
class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String)
extends Actor with Logging {
@@ -16,8 +23,11 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
val MASTER_REGEX = "spark://([^:]+):([0-9]+)".r
var master: ActorRef = null
-
val workerId = generateWorkerId()
+ var sparkHome: File = null
+ var workDir: File = null
+ val executors = new HashMap[String, ExecutorRunner]
+ val finishedExecutors = new ArrayBuffer[String]
var coresUsed = 0
var memoryUsed = 0
@@ -25,9 +35,27 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed
+ def createWorkDir() {
+ workDir = new File(sparkHome, "work")
+ try {
+ if (!workDir.exists() && !workDir.mkdirs()) {
+ logError("Failed to create work directory " + workDir)
+ System.exit(1)
+ }
+ } catch {
+ case e: Exception =>
+ logError("Failed to create work directory " + workDir, e)
+ System.exit(1)
+ }
+ }
+
override def preStart() {
logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
ip, port, cores, Utils.memoryMegabytesToString(memory)))
+ val envVar = System.getenv("SPARK_HOME")
+ sparkHome = new File(if (envVar == null) "." else envVar)
+ logInfo("Spark home: " + sparkHome)
+ createWorkDir()
connectToMaster()
startWebUi()
}
@@ -74,6 +102,21 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
logError("Worker registration failed: " + message)
System.exit(1)
+ case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) =>
+ logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name))
+ val er = new ExecutorRunner(jobId, execId, jobDesc, cores_, memory_, self, sparkHome, workDir)
+ executors(jobId + "/" + execId) = er
+ er.start()
+ master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
+
+ case ExecutorStateChanged(jobId, execId, state, message) =>
+ master ! ExecutorStateChanged(jobId, execId, state, message)
+ if (ExecutorState.isFinished(state)) {
+ logInfo("Executor " + jobId + "/" + execId + " finished with state " + state)
+ executors -= jobId + "/" + execId
+ finishedExecutors += jobId + "/" + execId
+ }
+
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
}