From 235cb256d06653bcde4c3ed6b081503a94996321 Mon Sep 17 00:00:00 2001 From: Michael Gummelt Date: Thu, 21 Jul 2016 18:29:00 +0100 Subject: [SPARK-16194] Mesos Driver env vars ## What changes were proposed in this pull request? Added new configuration namespace: spark.mesos.env.* This allows a user submitting a job in cluster mode to set arbitrary environment variables on the driver. spark.mesos.driverEnv.KEY=VAL will result in the env var "KEY" being set to "VAL" I've also refactored the tests a bit so we can re-use code in MesosClusterScheduler. And I've refactored the command building logic in `buildDriverCommand`. Command builder values were very intertwined before, and now it's easier to determine exactly how each variable is set. ## How was this patch tested? unit tests Author: Michael Gummelt Closes #14167 from mgummelt/driver-env-vars. --- .../cluster/mesos/MesosClusterSchedulerSuite.scala | 47 ++++++++++- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 93 ++++++---------------- .../spark/scheduler/cluster/mesos/Utils.scala | 71 +++++++++++++++++ 3 files changed, 138 insertions(+), 73 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala (limited to 'core/src/test/scala/org/apache') diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index a32423dc4f..0260759027 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -31,17 +31,24 @@ import org.scalatest.mock.MockitoSugar import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} import org.apache.spark.deploy.Command import org.apache.spark.deploy.mesos.MesosDriverDescription - +import org.apache.spark.scheduler.cluster.mesos.Utils class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq()) + private var driver: SchedulerDriver = _ private var scheduler: MesosClusterScheduler = _ - override def beforeEach(): Unit = { + private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = { val conf = new SparkConf() conf.setMaster("mesos://localhost:5050") conf.setAppName("spark mesos") + + if (sparkConfVars != null) { + conf.setAll(sparkConfVars) + } + + driver = mock[SchedulerDriver] scheduler = new MesosClusterScheduler( new BlackHoleMesosClusterPersistenceEngineFactory, conf) { override def start(): Unit = { ready = true } @@ -50,9 +57,11 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("can queue drivers") { + setScheduler() + val response = scheduler.submitDriver( - new MesosDriverDescription("d1", "jar", 1000, 1, true, - command, Map[String, String](), "s1", new Date())) + new MesosDriverDescription("d1", "jar", 1000, 1, true, + command, Map[String, String](), "s1", new Date())) assert(response.success) val response2 = scheduler.submitDriver(new MesosDriverDescription( @@ -65,6 +74,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("can kill queued drivers") { + setScheduler() + val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 1000, 1, true, command, Map[String, String](), "s1", new Date())) @@ -76,6 +87,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("can handle multiple roles") { + setScheduler() + val driver = mock[SchedulerDriver] val response = scheduler.submitDriver( new MesosDriverDescription("d1", "jar", 1200, 1.5, true, @@ -138,6 +151,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi } test("escapes commandline args for the shell") { + setScheduler() + val conf = new SparkConf() conf.setMaster("mesos://localhost:5050") conf.setAppName("spark mesos") @@ -172,4 +187,28 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this")) }) } + + test("supports spark.mesos.driverEnv.*") { + setScheduler() + + val mem = 1000 + val cpu = 1 + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test", + "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"), + "s1", + new Date())) + assert(response.success) + + val offer = Utils.createOffer("o1", "s1", mem, cpu) + scheduler.resourceOffers(driver, List(offer).asJava) + val tasks = Utils.verifyTaskLaunched(driver, "o1") + val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v => + (v.getName, v.getValue)).toMap + assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 7f21d4c623..c2779d7b35 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -24,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar -import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Matchers import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.mock.MockitoSugar @@ -35,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.RpcEndpointRef import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.mesos.Utils._ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext @@ -59,7 +59,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // launches a task on a valid offer offerResources(offers) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") // kills executors backend.doRequestTotalExecutors(0) @@ -74,7 +74,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // Launches a new task when requested executors is positive backend.doRequestTotalExecutors(2) offerResources(offers, 2) - verifyTaskLaunched("o2") + verifyTaskLaunched(driver, "o2") } test("mesos supports killing and relaunching tasks with executors") { @@ -86,7 +86,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val offer1 = (minMem, minCpu) val offer2 = (minMem, 1) offerResources(List(offer1, offer2)) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") // accounts for a killed task val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED) @@ -95,7 +95,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite // Launches a new task on a valid offer from the same slave offerResources(List(offer2)) - verifyTaskLaunched("o2") + verifyTaskLaunched(driver, "o2") } test("mesos supports spark.executor.cores") { @@ -106,10 +106,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val offers = List((executorMemory * 2, executorCores + 1)) offerResources(offers) - val taskInfos = verifyTaskLaunched("o1") - assert(taskInfos.size() == 1) + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) - val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus") assert(cpus == executorCores) } @@ -120,10 +120,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val offerCores = 10 offerResources(List((executorMemory * 2, offerCores))) - val taskInfos = verifyTaskLaunched("o1") - assert(taskInfos.size() == 1) + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) - val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus") assert(cpus == offerCores) } @@ -134,10 +134,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val executorMemory = backend.executorMemory(sc) offerResources(List((executorMemory, maxCores + 1))) - val taskInfos = verifyTaskLaunched("o1") - assert(taskInfos.size() == 1) + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) - val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus") + val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus") assert(cpus == maxCores) } @@ -156,7 +156,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite (executorMemory, maxCores + 1), (executorMemory, maxCores + 1))) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") verifyDeclinedOffer(driver, createOfferId("o2"), true) } @@ -171,8 +171,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite (executorMemory * 2, executorCores * 2), (executorMemory * 2, executorCores * 2))) - verifyTaskLaunched("o1") - verifyTaskLaunched("o2") + verifyTaskLaunched(driver, "o1") + verifyTaskLaunched(driver, "o2") } test("mesos creates multiple executors on a single slave") { @@ -184,8 +184,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite offerResources(List((executorMemory * 2, executorCores * 2))) // verify two executors were started on a single offer - val taskInfos = verifyTaskLaunched("o1") - assert(taskInfos.size() == 2) + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 2) } test("mesos doesn't register twice with the same shuffle service") { @@ -194,11 +194,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val offer1 = createOffer("o1", "s1", mem, cpu) backend.resourceOffers(driver, List(offer1).asJava) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") val offer2 = createOffer("o2", "s1", mem, cpu) backend.resourceOffers(driver, List(offer2).asJava) - verifyTaskLaunched("o2") + verifyTaskLaunched(driver, "o2") val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING) backend.statusUpdate(driver, status1) @@ -216,7 +216,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite val offer1 = createOffer("o1", "s1", mem, cpu) backend.resourceOffers(driver, List(offer1).asJava) - verifyTaskLaunched("o1") + verifyTaskLaunched(driver, "o1") backend.doKillExecutors(List("0")) verify(driver, times(1)).killTask(createTaskId("0")) @@ -269,14 +269,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.resourceOffers(driver, mesosOffers.asJava) } - private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = { - val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) - verify(driver, times(1)).launchTasks( - Matchers.eq(Collections.singleton(createOfferId(offerId))), - captor.capture()) - captor.getValue - } - private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = { TaskStatus.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId).build()) @@ -285,41 +277,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .build } - - private def createOfferId(offerId: String): OfferID = { - OfferID.newBuilder().setValue(offerId).build() - } - - private def createSlaveId(slaveId: String): SlaveID = { - SlaveID.newBuilder().setValue(slaveId).build() - } - - private def createExecutorId(executorId: String): ExecutorID = { - ExecutorID.newBuilder().setValue(executorId).build() - } - - private def createTaskId(taskId: String): TaskID = { - TaskID.newBuilder().setValue(taskId).build() - } - - private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { - val builder = Offer.newBuilder() - builder.addResourcesBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(mem)) - builder.addResourcesBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(createOfferId(offerId)) - .setFrameworkId(FrameworkID.newBuilder() - .setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) - .setHostname(s"host${slaveId}") - .build() - } - private def createSchedulerBackend( taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver, @@ -364,9 +321,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .set("spark.mesos.driver.webui.url", "http://webui") if (sparkConfVars != null) { - for (attr <- sparkConfVars) { - sparkConf.set(attr._1, attr._2) - } + sparkConf.setAll(sparkConfVars) } sc = new SparkContext(sparkConf) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala new file mode 100644 index 0000000000..ff26d14ef5 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util.Collections + +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.SchedulerDriver +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Mockito._ +import scala.collection.JavaConverters._ + +object Utils { + def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(createOfferId(offerId)) + .setFrameworkId(FrameworkID.newBuilder() + .setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) + .setHostname(s"host${slaveId}") + .build() + } + + def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = { + val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(createOfferId(offerId))), + captor.capture()) + captor.getValue.asScala.toList + } + + def createOfferId(offerId: String): OfferID = { + OfferID.newBuilder().setValue(offerId).build() + } + + def createSlaveId(slaveId: String): SlaveID = { + SlaveID.newBuilder().setValue(slaveId).build() + } + + def createExecutorId(executorId: String): ExecutorID = { + ExecutorID.newBuilder().setValue(executorId).build() + } + + def createTaskId(taskId: String): TaskID = { + TaskID.newBuilder().setValue(taskId).build() + } +} -- cgit v1.2.3