aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorBrenden Matthews <brenden@diddyinc.com>2014-10-03 12:58:04 -0700
committerAndrew Or <andrewor14@gmail.com>2014-10-03 12:58:04 -0700
commita8c52d5343e19731909e73db5de151a324d31cd5 (patch)
tree909977e4bcc39fafd43de8c60d327bd224cdee7d /core
parent6a1d48f4f02c4498b64439c3dd5f671286a90e30 (diff)
downloadspark-a8c52d5343e19731909e73db5de151a324d31cd5.tar.gz
spark-a8c52d5343e19731909e73db5de151a324d31cd5.tar.bz2
spark-a8c52d5343e19731909e73db5de151a324d31cd5.zip
[SPARK-3535][Mesos] Fix resource handling.
Author: Brenden Matthews <brenden@diddyinc.com> Closes #2401 from brndnmtthws/master and squashes the following commits: 4abaa5d [Brenden Matthews] [SPARK-3535][Mesos] Fix resource handling.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala35
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala34
3 files changed, 68 insertions, 8 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
index 64568409db..3161f1ee9f 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala
@@ -198,7 +198,9 @@ private[spark] class CoarseMesosSchedulerBackend(
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
- if (totalCoresAcquired < maxCores && mem >= sc.executorMemory && cpus >= 1 &&
+ if (totalCoresAcquired < maxCores &&
+ mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ cpus >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
!slaveIdsWithExecutors.contains(slaveId)) {
// Launch an executor on the slave
@@ -214,7 +216,8 @@ private[spark] class CoarseMesosSchedulerBackend(
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
- .addResources(createResource("mem", sc.executorMemory))
+ .addResources(createResource("mem",
+ MemoryUtils.calculateTotalMemory(sc)))
.build()
d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
new file mode 100644
index 0000000000..5101ec8352
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import org.apache.spark.SparkContext
+
+private[spark] object MemoryUtils {
+ // These defaults copied from YARN
+ val OVERHEAD_FRACTION = 1.07
+ val OVERHEAD_MINIMUM = 384
+
+ def calculateTotalMemory(sc: SparkContext) = {
+ math.max(
+ sc.conf.getOption("spark.mesos.executor.memoryOverhead")
+ .getOrElse(OVERHEAD_MINIMUM.toString)
+ .toInt + sc.executorMemory,
+ OVERHEAD_FRACTION * sc.executorMemory
+ )
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index a9ef126f5d..4c49aa074e 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -124,15 +124,24 @@ private[spark] class MesosSchedulerBackend(
command.setValue("cd %s*; ./sbin/spark-executor".format(basename))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
+ val cpus = Resource.newBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Value.Scalar.newBuilder()
+ .setValue(scheduler.CPUS_PER_TASK).build())
+ .build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
- .setScalar(Value.Scalar.newBuilder().setValue(sc.executorMemory).build())
+ .setScalar(
+ Value.Scalar.newBuilder()
+ .setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
ExecutorInfo.newBuilder()
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
+ .addResources(cpus)
.addResources(memory)
.build()
}
@@ -204,18 +213,31 @@ private[spark] class MesosSchedulerBackend(
val offerableWorkers = new ArrayBuffer[WorkerOffer]
val offerableIndices = new HashMap[String, Int]
- def enoughMemory(o: Offer) = {
+ def sufficientOffer(o: Offer) = {
val mem = getResource(o.getResourcesList, "mem")
+ val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- mem >= sc.executorMemory || slaveIdsWithExecutors.contains(slaveId)
+ (mem >= MemoryUtils.calculateTotalMemory(sc) &&
+ // need at least 1 for executor, 1 for task
+ cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ (slaveIdsWithExecutors.contains(slaveId) &&
+ cpus >= scheduler.CPUS_PER_TASK)
}
- for ((offer, index) <- offers.zipWithIndex if enoughMemory(offer)) {
- offerableIndices.put(offer.getSlaveId.getValue, index)
+ for ((offer, index) <- offers.zipWithIndex if sufficientOffer(offer)) {
+ val slaveId = offer.getSlaveId.getValue
+ offerableIndices.put(slaveId, index)
+ val cpus = if (slaveIdsWithExecutors.contains(slaveId)) {
+ getResource(offer.getResourcesList, "cpus").toInt
+ } else {
+ // If the executor doesn't exist yet, subtract CPU for executor
+ getResource(offer.getResourcesList, "cpus").toInt -
+ scheduler.CPUS_PER_TASK
+ }
offerableWorkers += new WorkerOffer(
offer.getSlaveId.getValue,
offer.getHostname,
- getResource(offer.getResourcesList, "cpus").toInt)
+ cpus)
}
// Call into the TaskSchedulerImpl