diff options
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/spark/SparkContext.scala | 16 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/master/JobInfo.scala | 13 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/master/Master.scala | 32 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala (renamed from core/src/main/scala/spark/deploy/worker/ExecutorManager.scala) | 29 | ||||
-rw-r--r-- | core/src/main/scala/spark/deploy/worker/Worker.scala | 14 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala | 83 | ||||
-rw-r--r-- | core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala | 6 |
7 files changed, 155 insertions, 38 deletions
diff --git a/core/src/main/scala/spark/SparkContext.scala b/core/src/main/scala/spark/SparkContext.scala index 22e1d52f65..4a6abf20b0 100644 --- a/core/src/main/scala/spark/SparkContext.scala +++ b/core/src/main/scala/spark/SparkContext.scala @@ -41,7 +41,7 @@ import spark.scheduler.ShuffleMapTask import spark.scheduler.DAGScheduler import spark.scheduler.TaskScheduler import spark.scheduler.local.LocalScheduler -import spark.scheduler.cluster.{SchedulerBackend, ClusterScheduler} +import spark.scheduler.cluster.{SparkDeploySchedulerBackend, SchedulerBackend, ClusterScheduler} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.storage.BlockManagerMaster @@ -57,7 +57,7 @@ class SparkContext( // Set Spark master host and port system properties if (System.getProperty("spark.master.host") == null) { - System.setProperty("spark.master.host", Utils.localIpAddress) + System.setProperty("spark.master.host", Utils.localIpAddress()) } if (System.getProperty("spark.master.port") == null) { System.setProperty("spark.master.port", "0") @@ -80,13 +80,25 @@ class SparkContext( val LOCAL_N_REGEX = """local\[([0-9]+)\]""".r // Regular expression for local[N, maxRetries], used in tests with failing tasks val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+),([0-9]+)\]""".r + // Regular expression for connecting to Spark deploy clusters + val SPARK_REGEX = """(spark://.*)""".r + master match { case "local" => new LocalScheduler(1, 0) + case LOCAL_N_REGEX(threads) => new LocalScheduler(threads.toInt, 0) + case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => new LocalScheduler(threads.toInt, maxFailures.toInt) + + case SPARK_REGEX(sparkUrl) => + val scheduler = new ClusterScheduler(this) + val backend = new SparkDeploySchedulerBackend(scheduler, this, sparkUrl, frameworkName) + scheduler.initialize(backend) + scheduler + case _ => MesosNativeLibrary.load() val scheduler = new ClusterScheduler(this) diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index e8502f0b8f..31d48b82b9 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -8,8 +8,9 @@ import scala.collection.mutable class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] + var coresGranted = 0 - var nextExecutorId = 0 + private var nextExecutorId = 0 def newExecutorId(): Int = { val id = nextExecutorId @@ -17,9 +18,17 @@ class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, va id } - def newExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = { + def addExecutor(worker: WorkerInfo, cores: Int): ExecutorInfo = { val exec = new ExecutorInfo(newExecutorId(), this, worker, cores, desc.memoryPerSlave) executors(exec.id) = exec + coresGranted += cores exec } + + def removeExecutor(exec: ExecutorInfo) { + executors -= exec.id + coresGranted -= exec.cores + } + + def coresLeft: Int = desc.cores - coresGranted } diff --git a/core/src/main/scala/spark/deploy/master/Master.scala b/core/src/main/scala/spark/deploy/master/Master.scala index 89de3b1827..d691613b0d 100644 --- a/core/src/main/scala/spark/deploy/master/Master.scala +++ b/core/src/main/scala/spark/deploy/master/Master.scala @@ -83,7 +83,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { if (ExecutorState.isFinished(state)) { // Remove this executor from the worker and job logInfo("Removing executor " + exec.fullId + " because it is " + state) - idToJob(jobId).executors -= exec.id + idToJob(jobId).removeExecutor(exec) exec.worker.removeExecutor(exec) // TODO: the worker would probably want to restart the executor a few times schedule() @@ -119,26 +119,19 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { * every time a new job joins or resource availability changes. */ def schedule() { - // Right now this is a very simple FIFO with backfilling. We keep looking through the jobs - // in order of submission time and launching the first one that fits in the cluster. - // It's also not very efficient in terms of algorithmic complexity. - for (job <- waitingJobs.clone()) { - logInfo("Trying to schedule job " + job.id) - // Figure out how many cores the job could use on the whole cluster - val jobMemory = job.desc.memoryPerSlave - val usableCores = workers.filter(_.memoryFree >= jobMemory).map(_.coresFree).sum - logInfo("jobMemory: " + jobMemory + ", usableCores: " + usableCores) - if (usableCores >= job.desc.cores) { - // We can launch it! Let's just partition the workers into executors for this job. - // TODO: Probably want to spread stuff out across nodes more. - var coresLeft = job.desc.cores - for (worker <- workers if worker.memoryFree >= jobMemory && coresLeft > 0) { - val coresToUse = math.min(worker.coresFree, coresLeft) - val exec = job.newExecutor(worker, coresToUse) + // Right now this is a very simple FIFO scheduler. We keep looking through the jobs + // in order of submission time and launching the first one that fits on each node. + for (worker <- workers if worker.coresFree > 0) { + for (job <- waitingJobs.clone()) { + val jobMemory = job.desc.memoryPerSlave + if (worker.memoryFree >= jobMemory) { + val coresToUse = math.min(worker.coresFree, job.coresLeft) + val exec = job.addExecutor(worker, coresToUse) launchExecutor(worker, exec) - coresLeft -= coresToUse } - waitingJobs -= job + if (job.coresLeft == 0) { + waitingJobs -= job + } } } } @@ -188,6 +181,7 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging { actorToJob -= job.actor addressToWorker -= job.actor.path.address completedJobs += job // Remember it in our history + waitingJobs -= job for (exec <- job.executors.values) { exec.worker.removeExecutor(exec) exec.worker.actor ! KillExecutor(exec.job.id, exec.id) diff --git a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala index ce17799648..ecd558546b 100644 --- a/core/src/main/scala/spark/deploy/worker/ExecutorManager.scala +++ b/core/src/main/scala/spark/deploy/worker/ExecutorRunner.scala @@ -13,13 +13,15 @@ import spark.deploy.ExecutorStateChanged /** * Manages the execution of one executor process. */ -class ExecutorManager( +class ExecutorRunner( jobId: String, execId: Int, jobDesc: JobDescription, cores: Int, memory: Int, worker: ActorRef, + workerId: String, + hostname: String, sparkHome: File, workDir: File) extends Logging { @@ -29,17 +31,22 @@ class ExecutorManager( var process: Process = null def start() { - workerThread = new Thread("ExecutorManager for " + fullId) { + workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } } workerThread.start() } - /** Stop this executor manager, including killing the process it launched */ + /** Stop this executor runner, including killing the process it launched */ def kill() { if (workerThread != null) { workerThread.interrupt() workerThread = null + if (process != null) { + logInfo("Killing process!") + process.destroy() + } + worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) } } @@ -75,10 +82,18 @@ class ExecutorManager( } } + /** Replace variables such as {{SLAVEID}} and {{CORES}} in a command argument passed to us */ + def substituteVariables(argument: String): String = argument match { + case "{{SLAVEID}}" => workerId + case "{{HOSTNAME}}" => hostname + case "{{CORES}}" => cores.toString + case other => other + } + def buildCommandSeq(): Seq[String] = { val command = jobDesc.command val runScript = new File(sparkHome, "run").getCanonicalPath - Seq(runScript, command.mainClass) ++ command.arguments + Seq(runScript, command.mainClass) ++ command.arguments.map(substituteVariables) } /** Spawn a thread that will redirect a given stream to a file */ @@ -130,11 +145,7 @@ class ExecutorManager( worker ! ExecutorStateChanged(jobId, execId, ExecutorState.FAILED, Some(message)) } catch { case interrupted: InterruptedException => - logInfo("Runner thread interrupted -- killing executor " + fullId) - if (process != null) { - process.destroy() - } - worker ! ExecutorStateChanged(jobId, execId, ExecutorState.KILLED, None) + logInfo("Runner thread for executor " + fullId + " interrupted") case e: Exception => { logError("Error running executor", e) diff --git a/core/src/main/scala/spark/deploy/worker/Worker.scala b/core/src/main/scala/spark/deploy/worker/Worker.scala index fba44ca9b5..19ffc1e401 100644 --- a/core/src/main/scala/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/spark/deploy/worker/Worker.scala @@ -26,7 +26,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas val workerId = generateWorkerId() var sparkHome: File = null var workDir: File = null - val executors = new HashMap[String, ExecutorManager] + val executors = new HashMap[String, ExecutorRunner] val finishedExecutors = new ArrayBuffer[String] var coresUsed = 0 @@ -104,8 +104,8 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas case LaunchExecutor(jobId, execId, jobDesc, cores_, memory_) => logInfo("Asked to launch executor %s/%d for %s".format(jobId, execId, jobDesc.name)) - val manager = new ExecutorManager( - jobId, execId, jobDesc, cores_, memory_, self, sparkHome, workDir) + val manager = new ExecutorRunner( + jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir) executors(jobId + "/" + execId) = manager manager.start() master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None) @@ -118,6 +118,13 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas finishedExecutors += jobId + "/" + execId } + case KillExecutor(jobId, execId) => + val fullId = jobId + "/" + execId + logInfo("Asked to kill executor " + fullId) + executors(jobId + "/" + execId).kill() + executors -= fullId + finishedExecutors += fullId + case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected() } @@ -126,6 +133,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas // TODO: It would be nice to try to reconnect to the master, but just shut down for now. // (Note that if reconnecting we would also need to assign IDs differently.) logError("Connection to master failed! Shutting down.") + executors.values.foreach(_.kill()) System.exit(1) } diff --git a/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala new file mode 100644 index 0000000000..0bd2d15479 --- /dev/null +++ b/core/src/main/scala/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -0,0 +1,83 @@ +package spark.scheduler.cluster + +import spark.{Utils, Logging, SparkContext} +import spark.deploy.client.{Client, ClientListener} +import spark.deploy.{Command, JobDescription} +import scala.collection.mutable.HashMap + +class SparkDeploySchedulerBackend( + scheduler: ClusterScheduler, + sc: SparkContext, + master: String, + jobName: String) + extends StandaloneSchedulerBackend(scheduler, sc.env.actorSystem) + with ClientListener + with Logging { + + var client: Client = null + var stopping = false + + val maxCores = System.getProperty("spark.cores.max", Int.MaxValue.toString).toInt + + // Environment variables to pass to our executors + val ENV_VARS_TO_SEND_TO_EXECUTORS = Array( + "SPARK_MEM", + "SPARK_CLASSPATH", + "SPARK_LIBRARY_PATH", + "SPARK_JAVA_OPTS" + ) + + // Memory used by each executor (in megabytes) + val executorMemory = { + if (System.getenv("SPARK_MEM") != null) { + Utils.memoryStringToMb(System.getenv("SPARK_MEM")) + // TODO: Might need to add some extra memory for the non-heap parts of the JVM + } else { + 512 + } + } + + override def start() { + super.start() + + val environment = new HashMap[String, String] + for (key <- ENV_VARS_TO_SEND_TO_EXECUTORS) { + if (System.getenv(key) != null) { + environment(key) = System.getenv(key) + } + } + val masterUrl = "akka://spark@%s:%s/user/%s".format( + System.getProperty("spark.master.host"), System.getProperty("spark.master.port"), + StandaloneSchedulerBackend.ACTOR_NAME) + val args = Seq(masterUrl, "{{SLAVEID}}", "{{HOSTNAME}}", "{{CORES}}") + val command = Command("spark.executor.StandaloneExecutorBackend", args, environment) + val jobDesc = new JobDescription(jobName, maxCores, executorMemory, command) + + client = new Client(sc.env.actorSystem, master, jobDesc, this) + client.start() + } + + override def stop() { + stopping = true; + super.stop() + client.stop() + } + + def connected(jobId: String) { + logInfo("Connected to Spark cluster with job ID " + jobId) + } + + def disconnected() { + if (!stopping) { + logError("Disconnected from Spark cluster!") + scheduler.error("Disconnected from Spark cluster") + } + } + + def executorAdded(id: String, workerId: String, host: String, cores: Int, memory: Int) { + logInfo("Granted executor ID %s on host %s with %d cores, %s RAM".format( + id, host, cores, Utils.memoryMegabytesToString(memory))) + } + + def executorRemoved(id: String, message: String) {} +} diff --git a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala index 040cd6b335..62a0c5589c 100644 --- a/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/mesos/CoarseMesosSchedulerBackend.scala @@ -42,7 +42,7 @@ class CoarseMesosSchedulerBackend( ) // Memory used by each executor (in megabytes) - val EXECUTOR_MEMORY = { + val executorMemory = { if (System.getenv("SPARK_MEM") != null) { Utils.memoryStringToMb(System.getenv("SPARK_MEM")) // TODO: Might need to add some extra memory for the non-heap parts of the JVM @@ -160,7 +160,7 @@ class CoarseMesosSchedulerBackend( val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && mem >= EXECUTOR_MEMORY && cpus >= 1 && + if (totalCoresAcquired < maxCores && mem >= executorMemory && cpus >= 1 && !slaveIdsWithExecutors.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) @@ -171,7 +171,7 @@ class CoarseMesosSchedulerBackend( .setCommand(createCommand(offer, cpusToUse)) .setName("Task " + taskId) .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", EXECUTOR_MEMORY)) + .addResources(createResource("mem", executorMemory)) .build() d.launchTasks(offer.getId, Collections.singletonList(task), filters) } else { |