aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/spark/SparkContext.scala16
-rw-r--r--core/src/main/scala/spark/deploy/master/JobInfo.scala13
-rw-r--r--core/src/main/scala/spark/deploy/master/Master.scala32
-rw-r--r--core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala (renamed from core/src/main/scala/spark/deploy/worker/ExecutorManager.scala)29
-rw-r--r--core/src/main/scala/spark/deploy/worker/Worker.scala14
-rw-r--r--core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala83
-rw-r--r--core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala6
-rwxr-xr-xrun33
-rwxr-xr-xspark-shell1
9 files changed, 182 insertions, 45 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala
index 22e1d52f65..4a6abf20b0 100644
--- a/core/src/main/scala/spark/SparkContext.scala
+++ b/core/src/main/scala/spark/SparkContext.scala
@@ -41,7 +41,7 @@ import spark.scheduler.ShuffleMapTask
import spark.scheduler.DAGScheduler
import spark.scheduler.TaskScheduler
import spark.scheduler.local.LocalScheduler
-import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler}
+import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler}
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.BlockManagerMaster
@@ -57,7 +57,7 @@ class SparkContext(
// Set Spark master host and port system properties
if (System.getProperty("spark.master.host") == null) {
- System.setProperty("spark.master.host", Utils.localIpAddress)
+ System.setProperty("spark.master.host", Utils.localIpAddress())
}
if (System.getProperty("spark.master.port") == null) {
System.setProperty("spark.master.port", "0")
@@ -80,13 +80,25 @@ class SparkContext(
val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r
+ // Regular expression for connecting to Spark deploy clusters
+ val SPARK_REGEX = """(spark://.*)""".r
+
master match {
case "local" =>
new LocalScheduler(1, 0)
+
case LOCAL_N_REGEX(threads) =>
new LocalScheduler(threads.toInt, 0)
+
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
new LocalScheduler(threads.toInt, maxFailures.toInt)
+
+ case SPARK_REGEX(sparkUrl) =>
+ val scheduler = new ClusterScheduler(this)
+ val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName)
+ scheduler.initialize(backend)
+ scheduler
+
case _ =>
MesosNativeLibrary.load()
val scheduler = new ClusterScheduler(this)
diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala
index e8502f0b8f..31d48b82b9 100644
--- a/core/src/main/scala/spark/deploy/master/JobInfo.scala
+++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala
@@ -8,8 +8,9 @@ import scala.collection.mutable
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
+ var coresGranted = 0
- var nextExecutorId = 0
+ private var nextExecutorId = 0
def newExecutorId(): Int = {
val id = nextExecutorId
@@ -17,9 +18,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va
id
}
- def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
+ def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = {
val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave)
executors(exec.id) = exec
+ coresGranted += cores
exec
}
+
+ def removeExecutor(exec: ExecutorInfo) {
+ executors -= exec.id
+ coresGranted -= exec.cores
+ }
+
+ def coresLeft: Int = desc.cores - coresGranted
}
diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala
index 89de3b1827..d691613b0d 100644
--- a/core/src/main/scala/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/spark/deploy/master/Master.scala
@@ -83,7 +83,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
if (ExecutorState.isFinished(state)) {
// Remove this executor from the worker and job
logInfo("Removing executor " + exec.fullId + " because it is " + state)
- idToJob(jobId).executors -= exec.id
+ idToJob(jobId).removeExecutor(exec)
exec.worker.removeExecutor(exec)
// TODO: the worker would probably want to restart the executor a few times
schedule()
@@ -119,26 +119,19 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
* every time a new job joins or resource availability changes.
*/
def schedule() {
- // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs
- // in order of submission time and launching the first one that fits in the cluster.
- // It's also not very efficient in terms of algorithmic complexity.
- for (job <- waitingJobs.clone()) {
- logInfo("Trying to schedule job " + job.id)
- // Figure out how many cores the job could use on the whole cluster
- val jobMemory = job.desc.memoryPerSlave
- val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum
- logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores)
- if (usableCores >= job.desc.cores) {
- // We can launch it! Let's just partition the workers into executors for this job.
- // TODO: Probably want to spread stuff out across nodes more.
- var coresLeft = job.desc.cores
- for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) {
- val coresToUse = math.min(worker.coresFree, coresLeft)
- val exec = job.newExecutor(worker, coresToUse)
+ // Right now this is a very simple FIFO scheduler. We keep looking through the jobs
+ // in order of submission time and launching the first one that fits on each node.
+ for (worker <- workers if worker.coresFree > 0) {
+ for (job <- waitingJobs.clone()) {
+ val jobMemory = job.desc.memoryPerSlave
+ if (worker.memoryFree >= jobMemory) {
+ val coresToUse = math.min(worker.coresFree, job.coresLeft)
+ val exec = job.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
- coresLeft -= coresToUse
}
- waitingJobs -= job
+ if (job.coresLeft == 0) {
+ waitingJobs -= job
+ }
}
}
}
@@ -188,6 +181,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
actorToJob -= job.actor
addressToWorker -= job.actor.path.address
completedJobs += job // Remember it in our history
+ waitingJobs -= job
for (exec <- job.executors.values) {
exec.worker.removeExecutor(exec)
exec.worker.actor ! KillExecutor(exec.job.id, exec.id)
diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
index ce17799648..ecd558546b 100644
--- a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala
+++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala
@@ -13,13 +13,15 @@ import spark.deploy.ExecutorStateChanged
/**
* Manages the execution of one executor process.
*/
-class ExecutorManager(
+class ExecutorRunner(
jobId: String,
execId: Int,
jobDesc: JobDescription,
cores: Int,
memory: Int,
worker: ActorRef,
+ workerId: String,
+ hostname: String,
sparkHome: File,
workDir: File)
extends Logging {
@@ -29,17 +31,22 @@ class ExecutorManager(
var process: Process = null
def start() {
- workerThread = new Thread("ExecutorManager for " + fullId) {
+ workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
}
- /** Stop this executor manager, including killing the process it launched */
+ /** Stop this executor runner, including killing the process it launched */
def kill() {
if (workerThread != null) {
workerThread.interrupt()
workerThread = null
+ if (process != null) {
+ logInfo("Killing process!")
+ process.destroy()
+ }
+ worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
}
}
@@ -75,10 +82,18 @@ class ExecutorManager(
}
}
+ /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */
+ def substituteVariables(argument: String): String = argument match {
+ case "{{SLAVEID}}" => workerId
+ case "{{HOSTNAME}}" => hostname
+ case "{{CORES}}" => cores.toString
+ case other => other
+ }
+
def buildCommandSeq(): Seq[String] = {
val command = jobDesc.command
val runScript = new File(sparkHome, "run").getCanonicalPath
- Seq(runScript, command.mainClass) ++ command.arguments
+ Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables)
}
/** Spawn a thread that will redirect a given stream to a file */
@@ -130,11 +145,7 @@ class ExecutorManager(
worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message))
} catch {
case interrupted: InterruptedException =>
- logInfo("Runner thread interrupted -- killing executor " + fullId)
- if (process != null) {
- process.destroy()
- }
- worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None)
+ logInfo("Runner thread for executor " + fullId + " interrupted")
case e: Exception => {
logError("Error running executor", e)
diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala
index fba44ca9b5..19ffc1e401 100644
--- a/core/src/main/scala/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/spark/deploy/worker/Worker.scala
@@ -26,7 +26,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
val workerId = generateWorkerId()
var sparkHome: File = null
var workDir: File = null
- val executors = new HashMap[String, ExecutorManager]
+ val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new ArrayBuffer[String]
var coresUsed = 0
@@ -104,8 +104,8 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) =>
logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name))
- val manager = new ExecutorManager(
- jobId, execId, jobDesc, cores_, memory_, self, sparkHome, workDir)
+ val manager = new ExecutorRunner(
+ jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir)
executors(jobId + "/" + execId) = manager
manager.start()
master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
@@ -118,6 +118,13 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
finishedExecutors += jobId + "/" + execId
}
+ case KillExecutor(jobId, execId) =>
+ val fullId = jobId + "/" + execId
+ logInfo("Asked to kill executor " + fullId)
+ executors(jobId + "/" + execId).kill()
+ executors -= fullId
+ finishedExecutors += fullId
+
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
}
@@ -126,6 +133,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
// TODO: It would be nice to try to reconnect to the master, but just shut down for now.
// (Note that if reconnecting we would also need to assign IDs differently.)
logError("Connection to master failed! Shutting down.")
+ executors.values.foreach(_.kill())
System.exit(1)
}
diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
new file mode 100644
index 0000000000..0bd2d15479
--- /dev/null
+++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -0,0 +1,83 @@
+package spark.scheduler.cluster
+
+import spark.{Utils, Logging, SparkContext}
+import spark.deploy.client.{Client, ClientListener}
+import spark.deploy.{Command, JobDescription}
+import scala.collection.mutable.HashMap
+
+class SparkDeploySchedulerBackend(
+ scheduler: ClusterScheduler,
+ sc: SparkContext,
+ master: String,
+ jobName: String)
+ extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem)
+ with ClientListener
+ with Logging {
+
+ var client: Client = null
+ var stopping = false
+
+ val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt
+
+ // Environment variables to pass to our executors
+ val ENV_VARS_TO_SEND_TO_EXECUTORS = Array(
+ "SPARK_MEM",
+ "SPARK_CLASSPATH",
+ "SPARK_LIBRARY_PATH",
+ "SPARK_JAVA_OPTS"
+ )
+
+ // Memory used by each executor (in megabytes)
+ val executorMemory = {
+ if (System.getenv("SPARK_MEM") != null) {
+ Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
+ // TODO: Might need to add some extra memory for the non-heap parts of the JVM
+ } else {
+ 512
+ }
+ }
+
+ override def start() {
+ super.start()
+
+ val environment = new HashMap[String, String]
+ for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) {
+ if (System.getenv(key) != null) {
+ environment(key) = System.getenv(key)
+ }
+ }
+ val masterUrl = "akka://spark@%s:%s/user/%s".format(
+ System.getProperty("spark.master.host"), System.getProperty("spark.master.port"),
+ StandaloneSchedulerBackend.ACTOR_NAME)
+ val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}")
+ val command = Command("spark.executor.StandaloneExecutorBackend", args, environment)
+ val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command)
+
+ client = new Client(sc.env.actorSystem, master, jobDesc, this)
+ client.start()
+ }
+
+ override def stop() {
+ stopping = true;
+ super.stop()
+ client.stop()
+ }
+
+ def connected(jobId: String) {
+ logInfo("Connected to Spark cluster with job ID " + jobId)
+ }
+
+ def disconnected() {
+ if (!stopping) {
+ logError("Disconnected from Spark cluster!")
+ scheduler.error("Disconnected from Spark cluster")
+ }
+ }
+
+ def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) {
+ logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format(
+ id, host, cores, Utils.memoryMegabytesToString(memory)))
+ }
+
+ def executorRemoved(id: String, message: String) {}
+}
diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
index 040cd6b335..62a0c5589c 100644
--- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala
@@ -42,7 +42,7 @@ class CoarseMesosSchedulerBackend(
)
// Memory used by each executor (in megabytes)
- val EXECUTOR_MEMORY = {
+ val executorMemory = {
if (System.getenv("SPARK_MEM") != null) {
Utils.memoryStringToMb(System.getenv("SPARK_MEM"))
// TODO: Might need to add some extra memory for the non-heap parts of the JVM
@@ -160,7 +160,7 @@ class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
@@ -171,7 +171,7 @@ class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", EXECUTOR_MEMORY))
+ .addResources(createResource("mem", executorMemory))
.build()
d.launchTasks(offer.getId, Collections.singletonList(task), filters)
} else {
diff --git a/run b/run
index 5ba94b3243..d386892b95 100755
--- a/run
+++ b/run
@@ -13,15 +13,21 @@ if [ -e $FWDIR/conf/spark-env.sh ] ; then
. $FWDIR/conf/spark-env.sh
fi
+# Check that SCALA_HOME has been specified
+if [ -z "$SCALA_HOME" ]; then
+ echo "SCALA_HOME is not set" >&2
+ exit 1
+fi
+
# If the user specifies a Mesos JAR, put it before our included one on the classpath
MESOS_CLASSPATH=""
-if [ "x$MESOS_JAR" != "x" ] ; then
+if [ -z "$MESOS_JAR" ] ; then
MESOS_CLASSPATH="$MESOS_JAR"
fi
# Figure out how much memory to use per executor and set it as an environment
# variable so that our process sees it and can report it to Mesos
-if [ "x$SPARK_MEM" == "x" ] ; then
+if [ -z "$SPARK_MEM" ] ; then
SPARK_MEM="512m"
fi
export SPARK_MEM
@@ -61,13 +67,26 @@ done
for jar in `find $REPL_DIR/lib -name '*jar'`; do
CLASSPATH+=":$jar"
done
-CLASSPATH+=:$BAGEL_DIR/target/scala-$SCALA_VERSION/classes
+CLASSPATH+=":$BAGEL_DIR/target/scala-$SCALA_VERSION/classes"
export CLASSPATH # Needed for spark-shell
-if [ -n "$SCALA_HOME" ]; then
- SCALA="${SCALA_HOME}/bin/scala"
+# Figure out whether to run our class with java or with the scala launcher.
+# In most cases, we'd prefer to execute our process with java because scala
+# creates a shell script as the parent of its Java process, which makes it
+# hard to kill the child with stuff like Process.destroy(). However, for
+# the Spark shell, the wrapper is necessary to properly reset the terminal
+# when we exit, so we allow it to set a variable to launch with scala.
+if [ "$SPARK_LAUNCH_WITH_SCALA" == "1" ]; then
+ RUNNER="${SCALA_HOME}/bin/scala"
else
- SCALA=scala
+ CLASSPATH+=":$SCALA_HOME/lib/scala-library.jar"
+ CLASSPATH+=":$SCALA_HOME/lib/scala-compiler.jar"
+ CLASSPATH+=":$SCALA_HOME/lib/jline.jar"
+ if [ -n "$JAVA_HOME" ]; then
+ RUNNER="${JAVA_HOME}/bin/java"
+ else
+ RUNNER=java
+ fi
fi
-exec "$SCALA" -cp "$CLASSPATH" "$@"
+exec "$RUNNER" -cp "$CLASSPATH" "$@"
diff --git a/spark-shell b/spark-shell
index 29e5e65da2..574ae2104d 100755
--- a/spark-shell
+++ b/spark-shell
@@ -1,3 +1,4 @@
#!/bin/sh
FWDIR="`dirname $0`"
+export SPARK_LAUNCH_WITH_SCALA=1
exec $FWDIR/run spark.repl.Main "$@"