aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorKalvin Chau <kalvin.chau@viasat.com>2017-04-06 09:14:31 +0100
committerSean Owen <sowen@cloudera.com>2017-04-06 09:14:31 +0100
commitc8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1 (patch)
tree155b0488f6716f6490710c6b4130fdddc0d8bf97 /resource-managers
parente156b5dd39dc1992077fe06e0f8be810c49c8255 (diff)
downloadspark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.tar.gz
spark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.tar.bz2
spark-c8fc1f3badf61bcfc4bd8eeeb61f73078ca068d1.zip
[SPARK-20085][MESOS] Configurable mesos labels for executors
## What changes were proposed in this pull request? Add spark.mesos.task.labels configuration option to add mesos key:value labels to the executor. "k1:v1,k2:v2" as the format, colons separating key-value and commas to list out more than one. Discussion of labels with mgummelt at #17404 ## How was this patch tested? Added unit tests to verify labels were added correctly, with incorrect labels being ignored and added a test to test the name of the executor. Tested with: `./build/sbt -Pmesos mesos/test` Please review http://spark.apache.org/contributing.html before opening a pull request. Author: Kalvin Chau <kalvin.chau@viasat.com> Closes #17413 from kalvinnchau/mesos-labels.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala24
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala46
2 files changed, 70 insertions, 0 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 5bdc2a2b84..2a36ec4fa8 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -67,6 +67,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val maxGpus = conf.getInt("spark.mesos.gpus.max", 0)
+ private val taskLabels = conf.get("spark.mesos.task.labels", "")
+
private[this] val shutdownTimeoutMS =
conf.getTimeAsMs("spark.mesos.coarse.shutdownTimeout", "10s")
.ensuring(_ >= 0, "spark.mesos.coarse.shutdownTimeout must be >= 0")
@@ -408,6 +410,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskBuilder.addAllResources(resourcesToUse.asJava)
taskBuilder.setContainer(MesosSchedulerBackendUtil.containerInfo(sc.conf))
+ val labelsBuilder = taskBuilder.getLabelsBuilder
+ val labels = buildMesosLabels().asJava
+
+ labelsBuilder.addAllLabels(labels)
+
+ taskBuilder.setLabels(labelsBuilder)
+
tasks(offer.getId) ::= taskBuilder.build()
remainingResources(offerId) = resourcesLeft.asJava
totalCoresAcquired += taskCPUs
@@ -422,6 +431,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}
+ private def buildMesosLabels(): List[Label] = {
+ taskLabels.split(",").flatMap(label =>
+ label.split(":") match {
+ case Array(key, value) =>
+ Some(Label.newBuilder()
+ .setKey(key)
+ .setValue(value)
+ .build())
+ case _ =>
+ logWarning(s"Unable to parse $label into a key:value label for the task.")
+ None
+ }
+ ).toList
+ }
+
/** Extracts task needed resources from a list of available resources. */
private def partitionTaskResources(
resources: JList[Resource],
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index eb83926ae4..c040f05d93 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -475,6 +475,52 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(launchedTasks.head.getName == "test-mesos-dynamic-alloc 0")
}
+ test("mesos sets configurable labels on tasks") {
+ val taskLabelsString = "mesos:test,label:test"
+ setBackend(Map(
+ "spark.mesos.task.labels" -> taskLabelsString
+ ))
+
+ // Build up the labels
+ val taskLabels = Protos.Labels.newBuilder()
+ .addLabels(Protos.Label.newBuilder()
+ .setKey("mesos").setValue("test").build())
+ .addLabels(Protos.Label.newBuilder()
+ .setKey("label").setValue("test").build())
+ .build()
+
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+
+ val labels = launchedTasks.head.getLabels
+
+ assert(launchedTasks.head.getLabels.equals(taskLabels))
+ }
+
+ test("mesos ignored invalid labels and sets configurable labels on tasks") {
+ val taskLabelsString = "mesos:test,label:test,incorrect:label:here"
+ setBackend(Map(
+ "spark.mesos.task.labels" -> taskLabelsString
+ ))
+
+ // Build up the labels
+ val taskLabels = Protos.Labels.newBuilder()
+ .addLabels(Protos.Label.newBuilder()
+ .setKey("mesos").setValue("test").build())
+ .addLabels(Protos.Label.newBuilder()
+ .setKey("label").setValue("test").build())
+ .build()
+
+ val offers = List(Resources(backend.executorMemory(sc), 1))
+ offerResources(offers)
+ val launchedTasks = verifyTaskLaunched(driver, "o1")
+
+ val labels = launchedTasks.head.getLabels
+
+ assert(launchedTasks.head.getLabels.equals(taskLabels))
+ }
+
test("mesos supports spark.mesos.network.name") {
setBackend(Map(
"spark.mesos.network.name" -> "test-network-name"