aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBrenden Matthews <brenden@diddyinc.com>2014-10-03 12:58:04 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 12:58:04 -0700
commita8c52d5343e19731909e73db5de151a324d31cd5 (patch)
tree909977e4bcc39fafd43de8c60d327bd224cdee7d
parent6a1d48f4f02c4498b64439c3dd5f671286a90e30 (diff)
downloadspark-a8c52d5343e19731909e73db5de151a324d31cd5.tar.gz
spark-a8c52d5343e19731909e73db5de151a324d31cd5.tar.bz2
spark-a8c52d5343e19731909e73db5de151a324d31cd5.zip
[SPARK-3535][Mesos] Fix resource handling.
Author: Brenden Matthews <brenden@diddyinc.com> Closes #2401 from brndnmtthws/master and squashes the following commits: 4abaa5d [Brenden Matthews] [SPARK-3535][Mesos] Fix resource handling.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala34
-rw-r--r--docs/configuration.md11
4 files changed, 79 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 64568409db..3161f1ee9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores &&
+ mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
@@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", sc.executorMemory))
+ .addResources(createResource("mem",
+ MemoryUtils.calculateTotalMemory(sc)))
.build()
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
new file mode 100644
index 0000000000..5101ec8352
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkContext
+
+private[spark] object MemoryUtils {
+ // These defaults copied from YARN
+ val OVERHEAD_FRACTION = 1.07
+ val OVERHEAD_MINIMUM = 384
+
+ def calculateTotalMemory(sc: SparkContext) = {
+ math.max(
+ sc.conf.getOption("spark.mesos.executor.memoryOverhead")
+ .getOrElse(OVERHEAD_MINIMUM.toString)
+ .toInt + sc.executorMemory,
+ OVERHEAD_FRACTION * sc.executorMemory
+ )
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index a9ef126f5d..4c49aa074e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend(
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
+ val cpus = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(scheduler.CPUS_PER_TASK).build())
+ .build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
+ .setScalar(
+ Value.Scalar.newBuilder()
+ .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
+ .addResources(cpus)
.addResources(memory)
.build()
}
@@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend(
val offerableWorkers = new ArrayBuffer[WorkerOffer]
val offerableIndices = new HashMap[String, Int]
- def enoughMemory(o: Offer) = {
+ def sufficientOffer(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
+ val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ (mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ // need at least 1 for executor, 1 for task
+ cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ (slaveIdsWithExecutors.contains(slaveId) &&
+ cpus >= scheduler.CPUS_PER_TASK)
}
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices.put(offer.getSlaveId.getValue, index)
+ for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
+ val slaveId = offer.getSlaveId.getValue
+ offerableIndices.put(slaveId, index)
+ val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
+ getResource(offer.getResourcesList, "cpus").toInt
+ } else {
+ // If the executor doesn't exist yet, subtract CPU for executor
+ getResource(offer.getResourcesList, "cpus").toInt -
+ scheduler.CPUS_PER_TASK
+ }
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
+ cpus)
}
// Call into the TaskSchedulerImpl
diff --git a/docs/configuration.md b/docs/configuration.md
index a782809a55..1c33855365 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -253,6 +253,17 @@ Apart from these, the following properties are also available, and may be useful
<code>spark.executor.uri</code>.
</td>
</tr>
+<tr>
+ <td><code>spark.mesos.executor.memoryOverhead</code></td>
+ <td>executor memory * 0.07, with minimum of 384</td>
+ <td>
+ This value is an additive for <code>spark.executor.memory</code>, specified in MiB,
+ which is used to calculate the total Mesos task memory. A value of <code>384</code>
+ implies a 384MiB overhead. Additionally, there is a hard-coded 7% minimum
+ overhead. The final overhead will be the larger of either
+ `spark.mesos.executor.memoryOverhead` or 7% of `spark.executor.memory`.
+ </td>
+</tr>
</table>
#### Shuffle Behavior