diff options
author | CodingCat <zhunansjtu@gmail.com> | 2015-04-14 13:32:06 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-04-14 13:32:06 -0700 |
commit | 8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd (patch) | |
tree | ad4d534fed5b6140dfeb65d189ccc44239ff5f02 /core | |
parent | 25998e4d73bcc95ac85d9af71adfdc726ec89568 (diff) | |
download | spark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.tar.gz spark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.tar.bz2 spark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.zip |
SPARK-1706: Allow multiple executors per worker in Standalone mode
resubmit of https://github.com/apache/spark/pull/636 for a totally different algorithm
https://issues.apache.org/jira/browse/SPARK-1706
In current implementation, the user has to start multiple workers in a server for starting multiple executors in a server, which introduces additional overhead due to the more JVM processes...
In this patch, I changed the scheduling logic in master to enable the user to start multiple executor processes within the same JVM process.
1. user configure spark.executor.maxCoreNumPerExecutor to suggest the maximum core he/she would like to allocate to each executor
2. Master assigns the executors to the workers with the major consideration on the memoryPerExecutor and the worker.freeMemory, and tries to allocate as many as possible cores to the executor ```min(min(memoryPerExecutor, worker.freeCore), maxLeftCoreToAssign)``` where ```maxLeftCoreToAssign = maxExecutorCanAssign * maxCoreNumPerExecutor```
---------------------------------------
Other small changes include
change memoryPerSlave in ApplicationDescription to memoryPerExecutor, as "Slave" is overrided to represent both worker and executor in the documents... (we have some discussion on this before?)
Author: CodingCat <zhunansjtu@gmail.com>
Closes #731 from CodingCat/SPARK-1706-2 and squashes the following commits:
6dee808 [CodingCat] change filter predicate
fbeb7e5 [CodingCat] address the comments
940cb42 [CodingCat] avoid unnecessary allocation
b8ca561 [CodingCat] revert a change
45967b4 [CodingCat] remove unused method
2eeff77 [CodingCat] stylistic fixes
12a1b32 [CodingCat] change the semantic of coresPerExecutor to exact core number
f035423 [CodingCat] stylistic fix
d9c1685 [CodingCat] remove unused var
f595bd6 [CodingCat] recover some unintentional changes
63b3df9 [CodingCat] change the description of the parameter in the submit script
4cf61f1 [CodingCat] improve the code and docs
ff011e2 [CodingCat] start multiple executors on the worker by rewriting startExeuctor logic
2c2bcc5 [CodingCat] fix wrong usage info
497ec2c [CodingCat] address andrew's comments
878402c [CodingCat] change the launching executor code
f64a28d [CodingCat] typo fix
387f4ec [CodingCat] bug fix
35c462c [CodingCat] address Andrew's comments
0b64fea [CodingCat] fix compilation issue
19d3da7 [CodingCat] address the comments
5b81466 [CodingCat] remove outdated comments
ec7d421 [CodingCat] test commit
e5efabb [CodingCat] more java docs and consolidate canUse function
a26096d [CodingCat] stylistic fix
a5d629a [CodingCat] java doc
b34ec0c [CodingCat] make master support multiple executors per worker
Diffstat (limited to 'core')
9 files changed, 85 insertions, 73 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala index b7ae9c1fc0..ae99432f5c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala @@ -22,12 +22,13 @@ import java.net.URI private[spark] class ApplicationDescription( val name: String, val maxCores: Option[Int], - val memoryPerSlave: Int, + val memoryPerExecutorMB: Int, val command: Command, var appUiUrl: String, val eventLogDir: Option[URI] = None, // short name of compression codec used when writing event logs, if any (e.g. lzf) - val eventLogCodec: Option[String] = None) + val eventLogCodec: Option[String] = None, + val coresPerExecutor: Option[Int] = None) extends Serializable { val user = System.getProperty("user.name", "<unknown>") @@ -35,13 +36,13 @@ private[spark] class ApplicationDescription( def copy( name: String = name, maxCores: Option[Int] = maxCores, - memoryPerSlave: Int = memoryPerSlave, + memoryPerExecutorMB: Int = memoryPerExecutorMB, command: Command = command, appUiUrl: String = appUiUrl, eventLogDir: Option[URI] = eventLogDir, eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription = new ApplicationDescription( - name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec) + name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec) override def toString: String = "ApplicationDescription(" + name + ")" } diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala index dfc5b97e6a..2954f932b4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala @@ -46,7 +46,7 @@ private[deploy] object JsonProtocol { ("name" -> obj.desc.name) ~ ("cores" -> obj.desc.maxCores) ~ ("user" -> obj.desc.user) ~ - ("memoryperslave" -> obj.desc.memoryPerSlave) ~ + ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~ ("submitdate" -> obj.submitDate.toString) ~ ("state" -> obj.state.toString) ~ ("duration" -> obj.duration) @@ -55,7 +55,7 @@ private[deploy] object JsonProtocol { def writeApplicationDescription(obj: ApplicationDescription): JObject = { ("name" -> obj.name) ~ ("cores" -> obj.maxCores) ~ - ("memoryperslave" -> obj.memoryPerSlave) ~ + ("memoryperslave" -> obj.memoryPerExecutorMB) ~ ("user" -> obj.user) ~ ("command" -> obj.command.toString) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 60bc243ebf..296a0764b8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -406,6 +406,8 @@ object SparkSubmit { OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"), // Other options + OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES, + sysProp = "spark.executor.cores"), OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES, sysProp = "spark.executor.memory"), OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES, diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 03ecf3fd99..faa8780288 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S | Spark standalone and Mesos only: | --total-executor-cores NUM Total cores for all executors. | + | Spark standalone and YARN only: + | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode, + | or all available cores on the worker in standalone mode) + | | YARN-only: | --driver-cores NUM Number of cores used by the driver, only in cluster mode | (Default: 1). - | --executor-cores NUM Number of cores per executor (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala index bc5b293379..f59d550d4f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala @@ -75,9 +75,11 @@ private[deploy] class ApplicationInfo( } } - private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None): - ExecutorDesc = { - val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave) + private[master] def addExecutor( + worker: WorkerInfo, + cores: Int, + useID: Option[Int] = None): ExecutorDesc = { + val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB) executors(exec.id) = exec coresGranted += cores exec diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 9a5d5877da..c5a6b1beac 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -524,52 +524,28 @@ private[master] class Master( } /** - * Can an app use the given worker? True if the worker has enough memory and we haven't already - * launched an executor for the app on it (right now the standalone backend doesn't like having - * two executors on the same worker). - */ - private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = { - worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) - } - - /** - * Schedule the currently available resources among waiting apps. This method will be called - * every time a new app joins or resource availability changes. + * Schedule executors to be launched on the workers. + * + * There are two modes of launching executors. The first attempts to spread out an application's + * executors on as many workers as possible, while the second does the opposite (i.e. launch them + * on as few workers as possible). The former is usually better for data locality purposes and is + * the default. + * + * The number of cores assigned to each executor is configurable. When this is explicitly set, + * multiple executors from the same application may be launched on the same worker if the worker + * has enough cores and memory. Otherwise, each executor grabs all the cores available on the + * worker by default, in which case only one executor may be launched on each worker. */ - private def schedule() { - if (state != RecoveryState.ALIVE) { return } - - // First schedule drivers, they take strict precedence over applications - // Randomization helps balance drivers - val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) - val numWorkersAlive = shuffledAliveWorkers.size - var curPos = 0 - - for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers - // We assign workers to each waiting driver in a round-robin fashion. For each driver, we - // start from the last worker that was assigned a driver, and continue onwards until we have - // explored all alive workers. - var launched = false - var numWorkersVisited = 0 - while (numWorkersVisited < numWorkersAlive && !launched) { - val worker = shuffledAliveWorkers(curPos) - numWorkersVisited += 1 - if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { - launchDriver(worker, driver) - waitingDrivers -= driver - launched = true - } - curPos = (curPos + 1) % numWorkersAlive - } - } - + private def startExecutorsOnWorkers(): Unit = { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { - // Try to spread out each app among all the nodes, until it has all its cores + // Try to spread out each app among all the workers, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB && + worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1)) + .sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) @@ -582,32 +558,61 @@ private[master] class Master( pos = (pos + 1) % numUsable } // Now that we've decided how many cores to give on each node, let's actually give them - for (pos <- 0 until numUsable) { - if (assigned(pos) > 0) { - val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) - launchExecutor(usableWorkers(pos), exec) - app.state = ApplicationState.RUNNING - } + for (pos <- 0 until numUsable if assigned(pos) > 0) { + allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos)) } } } else { - // Pack each app into as few nodes as possible until we've assigned all its cores + // Pack each app into as few workers as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { - if (canUse(app, worker)) { - val coresToUse = math.min(worker.coresFree, app.coresLeft) - if (coresToUse > 0) { - val exec = app.addExecutor(worker, coresToUse) - launchExecutor(worker, exec) - app.state = ApplicationState.RUNNING - } - } + allocateWorkerResourceToExecutors(app, app.coresLeft, worker) + } + } + } + } + + /** + * Allocate a worker's resources to one or more executors. + * @param app the info of the application which the executors belong to + * @param coresToAllocate cores on this worker to be allocated to this application + * @param worker the worker info + */ + private def allocateWorkerResourceToExecutors( + app: ApplicationInfo, + coresToAllocate: Int, + worker: WorkerInfo): Unit = { + val memoryPerExecutor = app.desc.memoryPerExecutorMB + val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate) + var coresLeft = coresToAllocate + while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) { + val exec = app.addExecutor(worker, coresPerExecutor) + coresLeft -= coresPerExecutor + launchExecutor(worker, exec) + app.state = ApplicationState.RUNNING + } + } + + /** + * Schedule the currently available resources among waiting apps. This method will be called + * every time a new app joins or resource availability changes. + */ + private def schedule(): Unit = { + if (state != RecoveryState.ALIVE) { return } + // Drivers take strict precedence over executors + val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers + for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { + for (driver <- waitingDrivers) { + if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { + launchDriver(worker, driver) + waitingDrivers -= driver } } } + startExecutorsOnWorkers() } - private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) { + private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala index 761aa8f7b1..273f077bd8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala @@ -94,7 +94,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app") </li> <li> <strong>Executor Memory:</strong> - {Utils.megabytesToString(app.desc.memoryPerSlave)} + {Utils.megabytesToString(app.desc.memoryPerExecutorMB)} </li> <li><strong>Submit Date:</strong> {app.submitDate}</li> <li><strong>State:</strong> {app.state}</li> diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala index 45412a35e9..399f07399a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala @@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") { <td> {app.coresGranted} </td> - <td sorttable_customkey={app.desc.memoryPerSlave.toString}> - {Utils.megabytesToString(app.desc.memoryPerSlave)} + <td sorttable_customkey={app.desc.memoryPerExecutorMB.toString}> + {Utils.megabytesToString(app.desc.memoryPerExecutorMB)} </td> <td>{UIUtils.formatDate(app.submitDate)}</td> <td>{app.desc.user}</td> diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 7eb3fdc19b..ed5b7c1088 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -82,12 +82,11 @@ private[spark] class SparkDeploySchedulerBackend( val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") - val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command, - appUIAddress, sc.eventLogDir, sc.eventLogCodec) - + val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt) + val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, + command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor) client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf) client.start() - waitForRegistration() } |