aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJongyoul Lee <jongyoul@gmail.com>2015-04-17 18:30:55 -0700
committerAndrew Or <andrew@databricks.com>2015-04-17 18:30:55 -0700
commit6fbeb82e13db7117d8f216e6148632490a4bc5be (patch)
treea1bfbb961c6e0ab112bf78ea743b219d4f10dacd /core
parentc5ed510135aee3a1a0402057b3b5229892aa6f3a (diff)
downloadspark-6fbeb82e13db7117d8f216e6148632490a4bc5be.tar.gz
spark-6fbeb82e13db7117d8f216e6148632490a4bc5be.tar.bz2
spark-6fbeb82e13db7117d8f216e6148632490a4bc5be.zip
[SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode
- Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md Author: Jongyoul Lee <jongyoul@gmail.com> Closes #5063 from jongyoul/SPARK-6350 and squashes the following commits: 9238d6e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs - Changed configuration name - Made mesosExecutorCores private 2d41241 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 89edb4f [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 8ba7694 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 7549314 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed docs 4ae7b0c [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Removed TODO c27efce [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed Mesos*Suite for supporting integer WorkerOffers - Fixed Documentation 1fe4c03 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Change available resources of cpus to integer value beacuse WorkerOffer support the amount cpus as integer value 5f3767e [Jongyoul Lee] Revert "[SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode" 4b7c69e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Changed configruation name and description from "spark.mesos.executor.cores" to "spark.executor.frameworkCores" 0556792 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala14
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala4
2 files changed, 9 insertions, 9 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index b381436839..d9d62b0e28 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -67,6 +67,8 @@ private[spark] class MesosSchedulerBackend(
// The listener bus to publish executor added/removed events.
val listenerBus = sc.listenerBus
+
+ private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1)
@volatile var appId: String = _
@@ -139,7 +141,7 @@ private[spark] class MesosSchedulerBackend(
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
- .setValue(scheduler.CPUS_PER_TASK).build())
+ .setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
@@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend(
val mem = getResource(o.getResourcesList, "mem")
val cpus = getResource(o.getResourcesList, "cpus")
val slaveId = o.getSlaveId.getValue
- // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK?
(mem >= MemoryUtils.calculateTotalMemory(sc) &&
// need at least 1 for executor, 1 for task
- cpus >= 2 * scheduler.CPUS_PER_TASK) ||
+ cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) ||
(slaveIdsWithExecutors.contains(slaveId) &&
cpus >= scheduler.CPUS_PER_TASK)
}
@@ -232,10 +233,9 @@ private[spark] class MesosSchedulerBackend(
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
- // If the executor doesn't exist yet, subtract CPU for executor
- // TODO(pwendell): Should below just subtract "1"?
- getResource(o.getResourcesList, "cpus").toInt -
- scheduler.CPUS_PER_TASK
+ // If the Mesos executor has not been started on this slave yet, set aside a few
+ // cores for the Mesos executor by offering fewer cores to the Spark executor
+ (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt
}
new WorkerOffer(
o.getSlaveId.getValue,
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index a311512e82..cdd7be0fbe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -118,12 +118,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(0).getSlaveId.getValue,
mesosOffers.get(0).getHostname,
- 2
+ (minCpu - backend.mesosExecutorCores).toInt
))
expectedWorkerOffers.append(new WorkerOffer(
mesosOffers.get(2).getSlaveId.getValue,
mesosOffers.get(2).getHostname,
- 2
+ (minCpu - backend.mesosExecutorCores).toInt
))
val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))