aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorIulian Dragos <jaguarul@gmail.com>2015-09-10 12:00:21 -0700
committerAndrew Or <andrew@databricks.com>2015-09-10 12:00:21 -0700
commitf0562e8cdbab7ce40f3186da98595312252f8b5c (patch)
tree4fe13052a9b7deec4cf66a98a0da67d79bca5c9a /core
parentaf3bc59d1f5d9d952c2d7ad1af599c49f1dbdaf0 (diff)
downloadspark-f0562e8cdbab7ce40f3186da98595312252f8b5c.tar.gz
spark-f0562e8cdbab7ce40f3186da98595312252f8b5c.tar.bz2
spark-f0562e8cdbab7ce40f3186da98595312252f8b5c.zip
[SPARK-6350] [MESOS] Fine-grained mode scheduler respects mesosExecutor.cores
This is a regression introduced in #4960, this commit fixes it and adds a test. tnachen andrewor14 please review, this should be an easy one. Author: Iulian Dragos <jaguarul@gmail.com> Closes #8653 from dragos/issue/mesos/fine-grained-maxExecutorCores.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala33
2 files changed, 33 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 18da6d2491..8edf7007a5 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -32,7 +32,6 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils
-
/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
@@ -127,7 +126,7 @@ private[spark] class MesosSchedulerBackend(
}
val builder = MesosExecutorInfo.newBuilder()
val (resourcesAfterCpu, usedCpuResources) =
- partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
+ partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
index 319b3173e7..c4dc560031 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala
@@ -42,6 +42,38 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui
class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
+ test("Use configured mesosExecutor.cores for ExecutorInfo") {
+ val mesosExecutorCores = 3
+ val conf = new SparkConf
+ conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
+
+ val listenerBus = mock[LiveListenerBus]
+ listenerBus.post(
+ SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
+
+ val sc = mock[SparkContext]
+ when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
+
+ when(sc.conf).thenReturn(conf)
+ when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
+ when(sc.executorMemory).thenReturn(100)
+ when(sc.listenerBus).thenReturn(listenerBus)
+ val taskScheduler = mock[TaskSchedulerImpl]
+ when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
+
+ val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master")
+
+ val resources = Arrays.asList(
+ mesosSchedulerBackend.createResource("cpus", 4),
+ mesosSchedulerBackend.createResource("mem", 1024))
+ // uri is null.
+ val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
+ val executorResources = executorInfo.getResourcesList
+ val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
+
+ assert(cpus === mesosExecutorCores)
+ }
+
test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")
@@ -263,7 +295,6 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi
.setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
.setHostname(s"host${id.toString}").build()
-
val mesosOffers = new java.util.ArrayList[Offer]
mesosOffers.add(offer)