aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCodingCat <zhunansjtu@gmail.com>2015-04-14 13:32:06 -0700
committerAndrew Or <andrew@databricks.com>2015-04-14 13:32:06 -0700
commit8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd (patch)
treead4d534fed5b6140dfeb65d189ccc44239ff5f02 /core
parent25998e4d73bcc95ac85d9af71adfdc726ec89568 (diff)
downloadspark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.tar.gz
spark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.tar.bz2
spark-8f8dc45f6d4c8d7b740eaa3d2ea09d0b531af9dd.zip
SPARK-1706: Allow multiple executors per worker in Standalone mode
resubmit of https://github.com/apache/spark/pull/636 for a totally different algorithm https://issues.apache.org/jira/browse/SPARK-1706 In current implementation, the user has to start multiple workers in a server for starting multiple executors in a server, which introduces additional overhead due to the more JVM processes... In this patch, I changed the scheduling logic in master to enable the user to start multiple executor processes within the same JVM process. 1. user configure spark.executor.maxCoreNumPerExecutor to suggest the maximum core he/she would like to allocate to each executor 2. Master assigns the executors to the workers with the major consideration on the memoryPerExecutor and the worker.freeMemory, and tries to allocate as many as possible cores to the executor ```min(min(memoryPerExecutor, worker.freeCore), maxLeftCoreToAssign)``` where ```maxLeftCoreToAssign = maxExecutorCanAssign * maxCoreNumPerExecutor``` --------------------------------------- Other small changes include change memoryPerSlave in ApplicationDescription to memoryPerExecutor, as "Slave" is overrided to represent both worker and executor in the documents... (we have some discussion on this before?) Author: CodingCat <zhunansjtu@gmail.com> Closes #731 from CodingCat/SPARK-1706-2 and squashes the following commits: 6dee808 [CodingCat] change filter predicate fbeb7e5 [CodingCat] address the comments 940cb42 [CodingCat] avoid unnecessary allocation b8ca561 [CodingCat] revert a change 45967b4 [CodingCat] remove unused method 2eeff77 [CodingCat] stylistic fixes 12a1b32 [CodingCat] change the semantic of coresPerExecutor to exact core number f035423 [CodingCat] stylistic fix d9c1685 [CodingCat] remove unused var f595bd6 [CodingCat] recover some unintentional changes 63b3df9 [CodingCat] change the description of the parameter in the submit script 4cf61f1 [CodingCat] improve the code and docs ff011e2 [CodingCat] start multiple executors on the worker by rewriting startExeuctor logic 2c2bcc5 [CodingCat] fix wrong usage info 497ec2c [CodingCat] address andrew's comments 878402c [CodingCat] change the launching executor code f64a28d [CodingCat] typo fix 387f4ec [CodingCat] bug fix 35c462c [CodingCat] address Andrew's comments 0b64fea [CodingCat] fix compilation issue 19d3da7 [CodingCat] address the comments 5b81466 [CodingCat] remove outdated comments ec7d421 [CodingCat] test commit e5efabb [CodingCat] more java docs and consolidate canUse function a26096d [CodingCat] stylistic fix a5d629a [CodingCat] java doc b34ec0c [CodingCat] make master support multiple executors per worker
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala117
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala7
9 files changed, 85 insertions, 73 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index b7ae9c1fc0..ae99432f5c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,12 +22,13 @@ import java.net.URI
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
- val memoryPerSlave: Int,
+ val memoryPerExecutorMB: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
- val eventLogCodec: Option[String] = None)
+ val eventLogCodec: Option[String] = None,
+ val coresPerExecutor: Option[Int] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
@@ -35,13 +36,13 @@ private[spark] class ApplicationDescription(
def copy(
name: String = name,
maxCores: Option[Int] = maxCores,
- memoryPerSlave: Int = memoryPerSlave,
+ memoryPerExecutorMB: Int = memoryPerExecutorMB,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
- name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
+ name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index dfc5b97e6a..2954f932b4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -46,7 +46,7 @@ private[deploy] object JsonProtocol {
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
- ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+ ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
@@ -55,7 +55,7 @@ private[deploy] object JsonProtocol {
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
- ("memoryperslave" -> obj.memoryPerSlave) ~
+ ("memoryperslave" -> obj.memoryPerExecutorMB) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 60bc243ebf..296a0764b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -406,6 +406,8 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
// Other options
+ OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
+ sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 03ecf3fd99..faa8780288 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| Spark standalone and Mesos only:
| --total-executor-cores NUM Total cores for all executors.
|
+ | Spark standalone and YARN only:
+ | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
+ | or all available cores on the worker in standalone mode)
+ |
| YARN-only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
| (Default: 1).
- | --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bc5b293379..f59d550d4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -75,9 +75,11 @@ private[deploy] class ApplicationInfo(
}
}
- private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None):
- ExecutorDesc = {
- val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+ private[master] def addExecutor(
+ worker: WorkerInfo,
+ cores: Int,
+ useID: Option[Int] = None): ExecutorDesc = {
+ val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
executors(exec.id) = exec
coresGranted += cores
exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9a5d5877da..c5a6b1beac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -524,52 +524,28 @@ private[master] class Master(
}
/**
- * Can an app use the given worker? True if the worker has enough memory and we haven't already
- * launched an executor for the app on it (right now the standalone backend doesn't like having
- * two executors on the same worker).
- */
- private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
- }
-
- /**
- * Schedule the currently available resources among waiting apps. This method will be called
- * every time a new app joins or resource availability changes.
+ * Schedule executors to be launched on the workers.
+ *
+ * There are two modes of launching executors. The first attempts to spread out an application's
+ * executors on as many workers as possible, while the second does the opposite (i.e. launch them
+ * on as few workers as possible). The former is usually better for data locality purposes and is
+ * the default.
+ *
+ * The number of cores assigned to each executor is configurable. When this is explicitly set,
+ * multiple executors from the same application may be launched on the same worker if the worker
+ * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
+ * worker by default, in which case only one executor may be launched on each worker.
*/
- private def schedule() {
- if (state != RecoveryState.ALIVE) { return }
-
- // First schedule drivers, they take strict precedence over applications
- // Randomization helps balance drivers
- val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
- val numWorkersAlive = shuffledAliveWorkers.size
- var curPos = 0
-
- for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
- // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
- // start from the last worker that was assigned a driver, and continue onwards until we have
- // explored all alive workers.
- var launched = false
- var numWorkersVisited = 0
- while (numWorkersVisited < numWorkersAlive && !launched) {
- val worker = shuffledAliveWorkers(curPos)
- numWorkersVisited += 1
- if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
- launchDriver(worker, driver)
- waitingDrivers -= driver
- launched = true
- }
- curPos = (curPos + 1) % numWorkersAlive
- }
- }
-
+ private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
- // Try to spread out each app among all the nodes, until it has all its cores
+ // Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+ worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
+ .sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -582,32 +558,61 @@ private[master] class Master(
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
- for (pos <- 0 until numUsable) {
- if (assigned(pos) > 0) {
- val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec)
- app.state = ApplicationState.RUNNING
- }
+ for (pos <- 0 until numUsable if assigned(pos) > 0) {
+ allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
} else {
- // Pack each app into as few nodes as possible until we've assigned all its cores
+ // Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
- if (canUse(app, worker)) {
- val coresToUse = math.min(worker.coresFree, app.coresLeft)
- if (coresToUse > 0) {
- val exec = app.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
- app.state = ApplicationState.RUNNING
- }
- }
+ allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
+ }
+ }
+ }
+ }
+
+ /**
+ * Allocate a worker's resources to one or more executors.
+ * @param app the info of the application which the executors belong to
+ * @param coresToAllocate cores on this worker to be allocated to this application
+ * @param worker the worker info
+ */
+ private def allocateWorkerResourceToExecutors(
+ app: ApplicationInfo,
+ coresToAllocate: Int,
+ worker: WorkerInfo): Unit = {
+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
+ var coresLeft = coresToAllocate
+ while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
+ val exec = app.addExecutor(worker, coresPerExecutor)
+ coresLeft -= coresPerExecutor
+ launchExecutor(worker, exec)
+ app.state = ApplicationState.RUNNING
+ }
+ }
+
+ /**
+ * Schedule the currently available resources among waiting apps. This method will be called
+ * every time a new app joins or resource availability changes.
+ */
+ private def schedule(): Unit = {
+ if (state != RecoveryState.ALIVE) { return }
+ // Drivers take strict precedence over executors
+ val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
+ for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
+ for (driver <- waitingDrivers) {
+ if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
+ launchDriver(worker, driver)
+ waitingDrivers -= driver
}
}
}
+ startExecutorsOnWorkers()
}
- private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
+ private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 761aa8f7b1..273f077bd8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -94,7 +94,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li>
<strong>Executor Memory:</strong>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 45412a35e9..399f07399a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
{app.coresGranted}
</td>
- <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ <td sorttable_customkey={app.desc.memoryPerExecutorMB.toString}>
+ {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb3fdc19b..ed5b7c1088 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -82,12 +82,11 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
- val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, sc.eventLogDir, sc.eventLogCodec)
-
+ val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+ val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
+ command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
-
waitForRegistration()
}