From 6fbeb82e13db7117d8f216e6148632490a4bc5be Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Fri, 17 Apr 2015 18:30:55 -0700 Subject: [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md Author: Jongyoul Lee Closes #5063 from jongyoul/SPARK-6350 and squashes the following commits: 9238d6e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs - Changed configuration name - Made mesosExecutorCores private 2d41241 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 89edb4f [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 8ba7694 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Fixed docs 7549314 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed docs 4ae7b0c [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Removed TODO c27efce [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Fixed Mesos*Suite for supporting integer WorkerOffers - Fixed Documentation 1fe4c03 [Jongyoul Lee] [SPARK-6453][Mesos] Some Mesos*Suite have a different package with their classes - Change available resources of cpus to integer value beacuse WorkerOffer support the amount cpus as integer value 5f3767e [Jongyoul Lee] Revert "[SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode" 4b7c69e [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Changed configruation name and description from "spark.mesos.executor.cores" to "spark.executor.frameworkCores" 0556792 [Jongyoul Lee] [SPARK-6350][Mesos] Make mesosExecutorCores configurable in mesos "fine-grained" mode - Defined executorCores from "spark.mesos.executor.cores" - Changed the amount of mesosExecutor's cores to executorCores. - Added new configuration option on running-on-mesos.md --- .../scheduler/cluster/mesos/MesosSchedulerBackend.scala | 14 +++++++------- .../cluster/mesos/MesosSchedulerBackendSuite.scala | 4 ++-- 2 files changed, 9 insertions(+), 9 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index b381436839..d9d62b0e28 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -67,6 +67,8 @@ private[spark] class MesosSchedulerBackend( // The listener bus to publish executor added/removed events. val listenerBus = sc.listenerBus + + private[mesos] val mesosExecutorCores = sc.conf.getDouble("spark.mesos.mesosExecutor.cores", 1) @volatile var appId: String = _ @@ -139,7 +141,7 @@ private[spark] class MesosSchedulerBackend( .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Value.Scalar.newBuilder() - .setValue(scheduler.CPUS_PER_TASK).build()) + .setValue(mesosExecutorCores).build()) .build() val memory = Resource.newBuilder() .setName("mem") @@ -220,10 +222,9 @@ private[spark] class MesosSchedulerBackend( val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue - // TODO(pwendell): Should below be 1 + scheduler.CPUS_PER_TASK? (mem >= MemoryUtils.calculateTotalMemory(sc) && // need at least 1 for executor, 1 for task - cpus >= 2 * scheduler.CPUS_PER_TASK) || + cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)) || (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) } @@ -232,10 +233,9 @@ private[spark] class MesosSchedulerBackend( val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { getResource(o.getResourcesList, "cpus").toInt } else { - // If the executor doesn't exist yet, subtract CPU for executor - // TODO(pwendell): Should below just subtract "1"? - getResource(o.getResourcesList, "cpus").toInt - - scheduler.CPUS_PER_TASK + // If the Mesos executor has not been started on this slave yet, set aside a few + // cores for the Mesos executor by offering fewer cores to the Spark executor + (getResource(o.getResourcesList, "cpus") - mesosExecutorCores).toInt } new WorkerOffer( o.getSlaveId.getValue, diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index a311512e82..cdd7be0fbe 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -118,12 +118,12 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(0).getSlaveId.getValue, mesosOffers.get(0).getHostname, - 2 + (minCpu - backend.mesosExecutorCores).toInt )) expectedWorkerOffers.append(new WorkerOffer( mesosOffers.get(2).getSlaveId.getValue, mesosOffers.get(2).getHostname, - 2 + (minCpu - backend.mesosExecutorCores).toInt )) val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) -- cgit v1.2.3