aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCarson Wang <carson.wang@intel.com>2015-08-07 23:36:26 -0700
committerAndrew Or <andrew@databricks.com>2015-08-07 23:36:26 -0700
commitef062c15992b0d08554495b8ea837bef3fabf6e9 (patch)
tree617fad193916589da5194f829cff96b0800e5b4e /core
parentc564b27447ed99e55b359b3df1d586d5766b85ea (diff)
downloadspark-ef062c15992b0d08554495b8ea837bef3fabf6e9.tar.gz
spark-ef062c15992b0d08554495b8ea837bef3fabf6e9.tar.bz2
spark-ef062c15992b0d08554495b8ea837bef3fabf6e9.zip
[SPARK-9731] Standalone scheduling incorrect cores if spark.executor.cores is not set
The issue only happens if `spark.executor.cores` is not set and executor memory is set to a high value. For example, if we have a worker with 4G and 10 cores and we set `spark.executor.memory` to 3G, then only 1 core is assigned to the executor. The correct number should be 10 cores. I've added a unit test to illustrate the issue. Author: Carson Wang <carson.wang@intel.com> Closes #8017 from carsonwang/SPARK-9731 and squashes the following commits: d09ec48 [Carson Wang] Fix code style 86b651f [Carson Wang] Simplify the code 943cc4c [Carson Wang] fix scheduling correct cores to executors
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala26
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala15
2 files changed, 29 insertions, 12 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index e38e437fe1..9217202b69 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -581,20 +581,22 @@ private[deploy] class Master(
/** Return whether the specified worker can launch an executor for this app. */
def canLaunchExecutor(pos: Int): Boolean = {
+ val keepScheduling = coresToAssign >= minCoresPerExecutor
+ val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
+
// If we allow multiple executors per worker, then we can always launch new executors.
- // Otherwise, we may have already started assigning cores to the executor on this worker.
+ // Otherwise, if there is already an executor on this worker, just give it more cores.
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
- val underLimit =
- if (launchingNewExecutor) {
- assignedExecutors.sum + app.executors.size < app.executorLimit
- } else {
- true
- }
- val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
- usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor &&
- usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor &&
- coresToAssign >= minCoresPerExecutor &&
- underLimit
+ if (launchingNewExecutor) {
+ val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
+ val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
+ val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
+ keepScheduling && enoughCores && enoughMemory && underLimit
+ } else {
+ // We're adding cores to an existing executor, so no need
+ // to check memory and executor limits
+ keepScheduling && enoughCores
+ }
}
// Keep launching executors until no more workers can accommodate any
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index ae0e037d82..20d0201a36 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -151,6 +151,14 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
basicScheduling(spreadOut = false)
}
+ test("basic scheduling with more memory - spread out") {
+ basicSchedulingWithMoreMemory(spreadOut = true)
+ }
+
+ test("basic scheduling with more memory - no spread out") {
+ basicSchedulingWithMoreMemory(spreadOut = false)
+ }
+
test("scheduling with max cores - spread out") {
schedulingWithMaxCores(spreadOut = true)
}
@@ -214,6 +222,13 @@ class MasterSuite extends SparkFunSuite with Matchers with Eventually with Priva
assert(scheduledCores === Array(10, 10, 10))
}
+ private def basicSchedulingWithMoreMemory(spreadOut: Boolean): Unit = {
+ val master = makeMaster()
+ val appInfo = makeAppInfo(3072)
+ val scheduledCores = scheduleExecutorsOnWorkers(master, appInfo, workerInfos, spreadOut)
+ assert(scheduledCores === Array(10, 10, 10))
+ }
+
private def schedulingWithMaxCores(spreadOut: Boolean): Unit = {
val master = makeMaster()
val appInfo1 = makeAppInfo(1024, maxCores = Some(8))