diff options
author | Kalvin Chau <kalvin.chau@viasat.com> | 2017-04-06 09:14:31 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2017-04-06 09:14:31 +0100 |
commit | c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1 (patch) | |
tree | 155b0488f6716f6490710c6b4130fdddc0d8bf97 /resource-managers | |
parent | e156b5dd39dc1992077fe06e0f8be810c49c8255 (diff) | |
download | spark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.tar.gz spark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.tar.bz2 spark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.zip |
[SPARK-20085][MESOS] Configurable mesos labels for executors
## What changes were proposed in this pull request?
Add spark.mesos.task.labels configuration option to add mesos key:value labels to the executor.
"k1:v1,k2:v2" as the format, colons separating key-value and commas to list out more than one.
Discussion of labels with mgummelt at #17404
## How was this patch tested?
Added unit tests to verify labels were added correctly, with incorrect labels being ignored and added a test to test the name of the executor.
Tested with: `./build/sbt -Pmesos mesos/test`
Please review http://spark.apache.org/contributing.html before opening a pull request.
Author: Kalvin Chau <kalvin.chau@viasat.com>
Closes #17413 from kalvinnchau/mesos-labels.
Diffstat (limited to 'resource-managers')
2 files changed, 70 insertions, 0 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 5bdc2a2b84..2a36ec4fa8 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0) + private val taskLabels = conf.get("spark.mesos.task.labels", "") + private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s") .ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0") @@ -408,6 +410,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( taskBuilder.addAllResources(resourcesToUse.asJava) taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf)) + val labelsBuilder = taskBuilder.getLabelsBuilder + val labels = buildMesosLabels().asJava + + labelsBuilder.addAllLabels(labels) + + taskBuilder.setLabels(labelsBuilder) + tasks(offer.getId) ::= taskBuilder.build() remainingResources(offerId) = resourcesLeft.asJava totalCoresAcquired += taskCPUs @@ -422,6 +431,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( tasks.toMap } + private def buildMesosLabels(): List[Label] = { + taskLabels.split(",").flatMap(label => + label.split(":") match { + case Array(key, value) => + Some(Label.newBuilder() + .setKey(key) + .setValue(value) + .build()) + case _ => + logWarning(s"Unable to parse $label into a key:value label for the task.") + None + } + ).toList + } + /** Extracts task needed resources from a list of available resources. */ private def partitionTaskResources( resources: JList[Resource], diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index eb83926ae4..c040f05d93 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0") } + test("mesos sets configurable labels on tasks") { + val taskLabelsString = "mesos:test,label:test" + setBackend(Map( + "spark.mesos.task.labels" -> taskLabelsString + )) + + // Build up the labels + val taskLabels = Protos.Labels.newBuilder() + .addLabels(Protos.Label.newBuilder() + .setKey("mesos").setValue("test").build()) + .addLabels(Protos.Label.newBuilder() + .setKey("label").setValue("test").build()) + .build() + + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + + val labels = launchedTasks.head.getLabels + + assert(launchedTasks.head.getLabels.equals(taskLabels)) + } + + test("mesos ignored invalid labels and sets configurable labels on tasks") { + val taskLabelsString = "mesos:test,label:test,incorrect:label:here" + setBackend(Map( + "spark.mesos.task.labels" -> taskLabelsString + )) + + // Build up the labels + val taskLabels = Protos.Labels.newBuilder() + .addLabels(Protos.Label.newBuilder() + .setKey("mesos").setValue("test").build()) + .addLabels(Protos.Label.newBuilder() + .setKey("label").setValue("test").build()) + .build() + + val offers = List(Resources(backend.executorMemory(sc), 1)) + offerResources(offers) + val launchedTasks = verifyTaskLaunched(driver, "o1") + + val labels = launchedTasks.head.getLabels + + assert(launchedTasks.head.getLabels.equals(taskLabels)) + } + test("mesos supports spark.mesos.network.name") { setBackend(Map( "spark.mesos.network.name" -> "test-network-name" |