aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJongyoul Lee <jongyoul@gmail.com>2015-04-17 18:30:55 -0700
committerAndrew Or <andrew@databricks.com>2015-04-17 18:30:55 -0700
commit6fbeb82e13db7117d8f216e6148632490a4bc5be (patch)
treea1bfbb961c6e0ab112bf78ea743b219d4f10dacd
parentc5ed510135aee3a1a0402057b3b5229892aa6f3a (diff)
downloadspark-6fbeb82e13db7117d8f216e6148632490a4bc5be.tar.gz
spark-6fbeb82e13db7117d8f216e6148632490a4bc5be.tar.bz2
spark-6fbeb82e13db7117d8f216e6148632490a4bc5be.zip
[SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode
- Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md Author: Jongyoul Lee <jongyoul@gmail.com> Closes #5063 from jongyoul/SPARK-6350 and squashes the following commits: 9238d6e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs - Changed configuration name - Made mesosExecutorCores private 2d41241 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 89edb4f [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 8ba7694 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 7549314 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed docs 4ae7b0c [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Removed TODO c27efce [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed Mesos*Suite for supporting integer WorkerOffers - Fixed Documentation 1fe4c03 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Change available resources of cpus to integer value beacuse WorkerOffer support the amount cpus as integer value 5f3767e [Jongyoul Lee] Revert "[SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode" 4b7c69e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Changed configruation name and description from "spark.mesos.executor.cores" to "spark.executor.frameworkCores" 0556792 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala4
-rw-r--r--docs/running-on-mesos.md10
3 files changed, 19 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index b381436839..d9d62b0e28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -67,6 +67,8 @@ private[spark] class MesosSchedulerBackend(
// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus
+
+ private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
@volatile var appId: String = _
@@ -139,7 +141,7 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
- .setValue(scheduler.CPUS_PER_TASK).build())
+ .setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
@@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
- cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}
@@ -232,10 +233,9 @@ private[spark] class MesosSchedulerBackend(
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
- // If the executor doesn't exist yet, subtract CPU for executor
- // TODO(pwendell): Should below just subtract "1"?
- getResource(o.getResourcesList, "cpus").toInt -
- scheduler.CPUS_PER_TASK
+ // If the Mesos executor has not been started on this slave yet, set aside a few
+ // cores for the Mesos executor by offering fewer cores to the Spark executor
+ (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
}
new WorkerOffer(
o.getSlaveId.getValue,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index a311512e82..cdd7be0fbe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -118,12 +118,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
- 2
+ (minCpu - backend.mesosExecutorCores).toInt
))
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
- 2
+ (minCpu - backend.mesosExecutorCores).toInt
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index c984639bd3..594bf78b67 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -211,6 +211,16 @@ See the [configuration page](configuration.html) for information on Spark config
</td>
</tr>
<tr>
+ <td><code>spark.mesos.mesosExecutor.cores</code></td>
+ <td>1.0</td>
+ <td>
+ (Fine-grained mode only) Number of cores to give each Mesos executor. This does not
+ include the cores used to run the Spark tasks. In other words, even if no Spark task
+ is being run, each Mesos executor will occupy the number of cores configured here.
+ The value can be a floating point number.
+ </td>
+</tr>
+<tr>
<td><code>spark.mesos.executor.home</code></td>
<td>driver side <code>SPARK_HOME</code></td>
<td>