aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala9
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala5
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala117
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala2
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala7
-rw-r--r--docs/configuration.md11
10 files changed, 96 insertions, 73 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index b7ae9c1fc0..ae99432f5c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,12 +22,13 @@ import java.net.URI
private[spark] class ApplicationDescription(
val name: String,
val maxCores: Option[Int],
- val memoryPerSlave: Int,
+ val memoryPerExecutorMB: Int,
val command: Command,
var appUiUrl: String,
val eventLogDir: Option[URI] = None,
// short name of compression codec used when writing event logs, if any (e.g. lzf)
- val eventLogCodec: Option[String] = None)
+ val eventLogCodec: Option[String] = None,
+ val coresPerExecutor: Option[Int] = None)
extends Serializable {
val user = System.getProperty("user.name", "<unknown>")
@@ -35,13 +36,13 @@ private[spark] class ApplicationDescription(
def copy(
name: String = name,
maxCores: Option[Int] = maxCores,
- memoryPerSlave: Int = memoryPerSlave,
+ memoryPerExecutorMB: Int = memoryPerExecutorMB,
command: Command = command,
appUiUrl: String = appUiUrl,
eventLogDir: Option[URI] = eventLogDir,
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
new ApplicationDescription(
- name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)
+ name, maxCores, memoryPerExecutorMB, command, appUiUrl, eventLogDir, eventLogCodec)
override def toString: String = "ApplicationDescription(" + name + ")"
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
index dfc5b97e6a..2954f932b4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/JsonProtocol.scala
@@ -46,7 +46,7 @@ private[deploy] object JsonProtocol {
("name" -> obj.desc.name) ~
("cores" -> obj.desc.maxCores) ~
("user" -> obj.desc.user) ~
- ("memoryperslave" -> obj.desc.memoryPerSlave) ~
+ ("memoryperslave" -> obj.desc.memoryPerExecutorMB) ~
("submitdate" -> obj.submitDate.toString) ~
("state" -> obj.state.toString) ~
("duration" -> obj.duration)
@@ -55,7 +55,7 @@ private[deploy] object JsonProtocol {
def writeApplicationDescription(obj: ApplicationDescription): JObject = {
("name" -> obj.name) ~
("cores" -> obj.maxCores) ~
- ("memoryperslave" -> obj.memoryPerSlave) ~
+ ("memoryperslave" -> obj.memoryPerExecutorMB) ~
("user" -> obj.user) ~
("command" -> obj.command.toString)
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
index 60bc243ebf..296a0764b8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
@@ -406,6 +406,8 @@ object SparkSubmit {
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),
// Other options
+ OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
+ sysProp = "spark.executor.cores"),
OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, ALL_DEPLOY_MODES,
sysProp = "spark.executor.memory"),
OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, ALL_DEPLOY_MODES,
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index 03ecf3fd99..faa8780288 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -482,10 +482,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| Spark standalone and Mesos only:
| --total-executor-cores NUM Total cores for all executors.
|
+ | Spark standalone and YARN only:
+ | --executor-cores NUM Number of cores per executor. (Default: 1 in YARN mode,
+ | or all available cores on the worker in standalone mode)
+ |
| YARN-only:
| --driver-cores NUM Number of cores used by the driver, only in cluster mode
| (Default: 1).
- | --executor-cores NUM Number of cores per executor (Default: 1).
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
index bc5b293379..f59d550d4f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ApplicationInfo.scala
@@ -75,9 +75,11 @@ private[deploy] class ApplicationInfo(
}
}
- private[master] def addExecutor(worker: WorkerInfo, cores: Int, useID: Option[Int] = None):
- ExecutorDesc = {
- val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerSlave)
+ private[master] def addExecutor(
+ worker: WorkerInfo,
+ cores: Int,
+ useID: Option[Int] = None): ExecutorDesc = {
+ val exec = new ExecutorDesc(newExecutorId(useID), this, worker, cores, desc.memoryPerExecutorMB)
executors(exec.id) = exec
coresGranted += cores
exec
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9a5d5877da..c5a6b1beac 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -524,52 +524,28 @@ private[master] class Master(
}
/**
- * Can an app use the given worker? True if the worker has enough memory and we haven't already
- * launched an executor for the app on it (right now the standalone backend doesn't like having
- * two executors on the same worker).
- */
- private def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
- }
-
- /**
- * Schedule the currently available resources among waiting apps. This method will be called
- * every time a new app joins or resource availability changes.
+ * Schedule executors to be launched on the workers.
+ *
+ * There are two modes of launching executors. The first attempts to spread out an application's
+ * executors on as many workers as possible, while the second does the opposite (i.e. launch them
+ * on as few workers as possible). The former is usually better for data locality purposes and is
+ * the default.
+ *
+ * The number of cores assigned to each executor is configurable. When this is explicitly set,
+ * multiple executors from the same application may be launched on the same worker if the worker
+ * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
+ * worker by default, in which case only one executor may be launched on each worker.
*/
- private def schedule() {
- if (state != RecoveryState.ALIVE) { return }
-
- // First schedule drivers, they take strict precedence over applications
- // Randomization helps balance drivers
- val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
- val numWorkersAlive = shuffledAliveWorkers.size
- var curPos = 0
-
- for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
- // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
- // start from the last worker that was assigned a driver, and continue onwards until we have
- // explored all alive workers.
- var launched = false
- var numWorkersVisited = 0
- while (numWorkersVisited < numWorkersAlive && !launched) {
- val worker = shuffledAliveWorkers(curPos)
- numWorkersVisited += 1
- if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
- launchDriver(worker, driver)
- waitingDrivers -= driver
- launched = true
- }
- curPos = (curPos + 1) % numWorkersAlive
- }
- }
-
+ private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
if (spreadOutApps) {
- // Try to spread out each app among all the nodes, until it has all its cores
+ // Try to spread out each app among all the workers, until it has all its cores
for (app <- waitingApps if app.coresLeft > 0) {
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse
+ .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
+ worker.coresFree >= app.desc.coresPerExecutor.getOrElse(1))
+ .sortBy(_.coresFree).reverse
val numUsable = usableWorkers.length
val assigned = new Array[Int](numUsable) // Number of cores to give on each node
var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
@@ -582,32 +558,61 @@ private[master] class Master(
pos = (pos + 1) % numUsable
}
// Now that we've decided how many cores to give on each node, let's actually give them
- for (pos <- 0 until numUsable) {
- if (assigned(pos) > 0) {
- val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
- launchExecutor(usableWorkers(pos), exec)
- app.state = ApplicationState.RUNNING
- }
+ for (pos <- 0 until numUsable if assigned(pos) > 0) {
+ allocateWorkerResourceToExecutors(app, assigned(pos), usableWorkers(pos))
}
}
} else {
- // Pack each app into as few nodes as possible until we've assigned all its cores
+ // Pack each app into as few workers as possible until we've assigned all its cores
for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
- if (canUse(app, worker)) {
- val coresToUse = math.min(worker.coresFree, app.coresLeft)
- if (coresToUse > 0) {
- val exec = app.addExecutor(worker, coresToUse)
- launchExecutor(worker, exec)
- app.state = ApplicationState.RUNNING
- }
- }
+ allocateWorkerResourceToExecutors(app, app.coresLeft, worker)
+ }
+ }
+ }
+ }
+
+ /**
+ * Allocate a worker's resources to one or more executors.
+ * @param app the info of the application which the executors belong to
+ * @param coresToAllocate cores on this worker to be allocated to this application
+ * @param worker the worker info
+ */
+ private def allocateWorkerResourceToExecutors(
+ app: ApplicationInfo,
+ coresToAllocate: Int,
+ worker: WorkerInfo): Unit = {
+ val memoryPerExecutor = app.desc.memoryPerExecutorMB
+ val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(coresToAllocate)
+ var coresLeft = coresToAllocate
+ while (coresLeft >= coresPerExecutor && worker.memoryFree >= memoryPerExecutor) {
+ val exec = app.addExecutor(worker, coresPerExecutor)
+ coresLeft -= coresPerExecutor
+ launchExecutor(worker, exec)
+ app.state = ApplicationState.RUNNING
+ }
+ }
+
+ /**
+ * Schedule the currently available resources among waiting apps. This method will be called
+ * every time a new app joins or resource availability changes.
+ */
+ private def schedule(): Unit = {
+ if (state != RecoveryState.ALIVE) { return }
+ // Drivers take strict precedence over executors
+ val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
+ for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
+ for (driver <- waitingDrivers) {
+ if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
+ launchDriver(worker, driver)
+ waitingDrivers -= driver
}
}
}
+ startExecutorsOnWorkers()
}
- private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
+ private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
index 761aa8f7b1..273f077bd8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/ApplicationPage.scala
@@ -94,7 +94,7 @@ private[ui] class ApplicationPage(parent: MasterWebUI) extends WebUIPage("app")
</li>
<li>
<strong>Executor Memory:</strong>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
</li>
<li><strong>Submit Date:</strong> {app.submitDate}</li>
<li><strong>State:</strong> {app.state}</li>
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
index 45412a35e9..399f07399a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/MasterPage.scala
@@ -208,8 +208,8 @@ private[ui] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
<td>
{app.coresGranted}
</td>
- <td sorttable_customkey={app.desc.memoryPerSlave.toString}>
- {Utils.megabytesToString(app.desc.memoryPerSlave)}
+ <td sorttable_customkey={app.desc.memoryPerExecutorMB.toString}>
+ {Utils.megabytesToString(app.desc.memoryPerExecutorMB)}
</td>
<td>{UIUtils.formatDate(app.submitDate)}</td>
<td>{app.desc.user}</td>
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index 7eb3fdc19b..ed5b7c1088 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -82,12 +82,11 @@ private[spark] class SparkDeploySchedulerBackend(
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
- val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
- appUIAddress, sc.eventLogDir, sc.eventLogCodec)
-
+ val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
+ val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
+ command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
client.start()
-
waitForRegistration()
}
diff --git a/docs/configuration.md b/docs/configuration.md
index 7169ec295e..d9e9e67026 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -724,6 +724,17 @@ Apart from these, the following properties are also available, and may be useful
</td>
</tr>
<tr>
+ <td><code>spark.executor.cores</code></td>
+ <td>1 in YARN mode, all the available cores on the worker in standalone mode.</td>
+ <td>
+ The number of cores to use on each executor. For YARN and standalone mode only.
+
+ In standalone mode, setting this parameter allows an application to run multiple executors on
+ the same worker, provided that there are enough cores on that worker. Otherwise, only one
+ executor per application will run on each worker.
+ </td>
+</tr>
+<tr>
<td><code>spark.default.parallelism</code></td>
<td>
For distributed shuffle operations like <code>reduceByKey</code> and <code>join</code>, the