aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2016-07-21 18:29:00 +0100
committerSean Owen <sowen@cloudera.com>2016-07-21 18:29:00 +0100
commit235cb256d06653bcde4c3ed6b081503a94996321 (patch)
tree3c1f51a312cc92e120aad31607ddbb438aff5d1f /core/src/test/scala/org/apache
parent69626adddc0441a4834b70a32e2d95b11d69a219 (diff)
downloadspark-235cb256d06653bcde4c3ed6b081503a94996321.tar.gz
spark-235cb256d06653bcde4c3ed6b081503a94996321.tar.bz2
spark-235cb256d06653bcde4c3ed6b081503a94996321.zip
[SPARK-16194] Mesos Driver env vars
## What changes were proposed in this pull request? Added new configuration namespace: spark.mesos.env.* This allows a user submitting a job in cluster mode to set arbitrary environment variables on the driver. spark.mesos.driverEnv.KEY=VAL will result in the env var "KEY" being set to "VAL" I've also refactored the tests a bit so we can re-use code in MesosClusterScheduler. And I've refactored the command building logic in `buildDriverCommand`. Command builder values were very intertwined before, and now it's easier to determine exactly how each variable is set. ## How was this patch tested? unit tests Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #14167 from mgummelt/driver-env-vars.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala47
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala93
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala71
3 files changed, 138 insertions, 73 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index a32423dc4f..0260759027 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -31,17 +31,24 @@ import org.scalatest.mock.MockitoSugar
import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
-
+import org.apache.spark.scheduler.cluster.mesos.Utils
class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
+ private var driver: SchedulerDriver = _
private var scheduler: MesosClusterScheduler = _
- override def beforeEach(): Unit = {
+ private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
+
+ if (sparkConfVars != null) {
+ conf.setAll(sparkConfVars)
+ }
+
+ driver = mock[SchedulerDriver]
scheduler = new MesosClusterScheduler(
new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
override def start(): Unit = { ready = true }
@@ -50,9 +57,11 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can queue drivers") {
+ setScheduler()
+
val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
+ new MesosDriverDescription("d1", "jar", 1000, 1, true,
+ command, Map[String, String](), "s1", new Date()))
assert(response.success)
val response2 =
scheduler.submitDriver(new MesosDriverDescription(
@@ -65,6 +74,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can kill queued drivers") {
+ setScheduler()
+
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1000, 1, true,
command, Map[String, String](), "s1", new Date()))
@@ -76,6 +87,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can handle multiple roles") {
+ setScheduler()
+
val driver = mock[SchedulerDriver]
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
@@ -138,6 +151,8 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("escapes commandline args for the shell") {
+ setScheduler()
+
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
@@ -172,4 +187,28 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this"))
})
}
+
+ test("supports spark.mesos.driverEnv.*") {
+ setScheduler()
+
+ val mem = 1000
+ val cpu = 1
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", mem, cpu, true,
+ command,
+ Map("spark.mesos.executor.home" -> "test",
+ "spark.app.name" -> "test",
+ "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
+ "s1",
+ new Date()))
+ assert(response.success)
+
+ val offer = Utils.createOffer("o1", "s1", mem, cpu)
+ scheduler.resourceOffers(driver, List(offer).asJava)
+ val tasks = Utils.verifyTaskLaunched(driver, "o1")
+ val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
+ (v.getName, v.getValue)).toMap
+ assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 7f21d4c623..c2779d7b35 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -24,8 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.mock.MockitoSugar
@@ -35,6 +34,7 @@ import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkCon
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.TaskSchedulerImpl
+import org.apache.spark.scheduler.cluster.mesos.Utils._
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
@@ -59,7 +59,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// launches a task on a valid offer
offerResources(offers)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
// kills executors
backend.doRequestTotalExecutors(0)
@@ -74,7 +74,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// Launches a new task when requested executors is positive
backend.doRequestTotalExecutors(2)
offerResources(offers, 2)
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos supports killing and relaunching tasks with executors") {
@@ -86,7 +86,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = (minMem, minCpu)
val offer2 = (minMem, 1)
offerResources(List(offer1, offer2))
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
// accounts for a killed task
val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
@@ -95,7 +95,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
// Launches a new task on a valid offer from the same slave
offerResources(List(offer2))
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos supports spark.executor.cores") {
@@ -106,10 +106,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offers = List((executorMemory * 2, executorCores + 1))
offerResources(offers)
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == executorCores)
}
@@ -120,10 +120,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offerCores = 10
offerResources(List((executorMemory * 2, offerCores)))
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == offerCores)
}
@@ -134,10 +134,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val executorMemory = backend.executorMemory(sc)
offerResources(List((executorMemory, maxCores + 1)))
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 1)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 1)
- val cpus = backend.getResource(taskInfos.iterator().next().getResourcesList, "cpus")
+ val cpus = backend.getResource(taskInfos(0).getResourcesList, "cpus")
assert(cpus == maxCores)
}
@@ -156,7 +156,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
(executorMemory, maxCores + 1),
(executorMemory, maxCores + 1)))
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
verifyDeclinedOffer(driver, createOfferId("o2"), true)
}
@@ -171,8 +171,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
(executorMemory * 2, executorCores * 2),
(executorMemory * 2, executorCores * 2)))
- verifyTaskLaunched("o1")
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o1")
+ verifyTaskLaunched(driver, "o2")
}
test("mesos creates multiple executors on a single slave") {
@@ -184,8 +184,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
offerResources(List((executorMemory * 2, executorCores * 2)))
// verify two executors were started on a single offer
- val taskInfos = verifyTaskLaunched("o1")
- assert(taskInfos.size() == 2)
+ val taskInfos = verifyTaskLaunched(driver, "o1")
+ assert(taskInfos.length == 2)
}
test("mesos doesn't register twice with the same shuffle service") {
@@ -194,11 +194,11 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
val offer2 = createOffer("o2", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer2).asJava)
- verifyTaskLaunched("o2")
+ verifyTaskLaunched(driver, "o2")
val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
backend.statusUpdate(driver, status1)
@@ -216,7 +216,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
val offer1 = createOffer("o1", "s1", mem, cpu)
backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched("o1")
+ verifyTaskLaunched(driver, "o1")
backend.doKillExecutors(List("0"))
verify(driver, times(1)).killTask(createTaskId("0"))
@@ -269,14 +269,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.resourceOffers(driver, mesosOffers.asJava)
}
- private def verifyTaskLaunched(offerId: String): java.util.Collection[TaskInfo] = {
- val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(createOfferId(offerId))),
- captor.capture())
- captor.getValue
- }
-
private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
TaskStatus.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId).build())
@@ -285,41 +277,6 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.build
}
-
- private def createOfferId(offerId: String): OfferID = {
- OfferID.newBuilder().setValue(offerId).build()
- }
-
- private def createSlaveId(slaveId: String): SlaveID = {
- SlaveID.newBuilder().setValue(slaveId).build()
- }
-
- private def createExecutorId(executorId: String): ExecutorID = {
- ExecutorID.newBuilder().setValue(executorId).build()
- }
-
- private def createTaskId(taskId: String): TaskID = {
- TaskID.newBuilder().setValue(taskId).build()
- }
-
- private def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(createOfferId(offerId))
- .setFrameworkId(FrameworkID.newBuilder()
- .setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
- .setHostname(s"host${slaveId}")
- .build()
- }
-
private def createSchedulerBackend(
taskScheduler: TaskSchedulerImpl,
driver: SchedulerDriver,
@@ -364,9 +321,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.set("spark.mesos.driver.webui.url", "http://webui")
if (sparkConfVars != null) {
- for (attr <- sparkConfVars) {
- sparkConf.set(attr._1, attr._2)
- }
+ sparkConf.setAll(sparkConfVars)
}
sc = new SparkContext(sparkConf)
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
new file mode 100644
index 0000000000..ff26d14ef5
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler.cluster.mesos
+
+import java.util.Collections
+
+import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.SchedulerDriver
+import org.mockito.{ArgumentCaptor, Matchers}
+import org.mockito.Mockito._
+import scala.collection.JavaConverters._
+
+object Utils {
+ def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
+ val builder = Offer.newBuilder()
+ builder.addResourcesBuilder()
+ .setName("mem")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(mem))
+ builder.addResourcesBuilder()
+ .setName("cpus")
+ .setType(Value.Type.SCALAR)
+ .setScalar(Scalar.newBuilder().setValue(cpu))
+ builder.setId(createOfferId(offerId))
+ .setFrameworkId(FrameworkID.newBuilder()
+ .setValue("f1"))
+ .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
+ .setHostname(s"host${slaveId}")
+ .build()
+ }
+
+ def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
+ val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
+ verify(driver, times(1)).launchTasks(
+ Matchers.eq(Collections.singleton(createOfferId(offerId))),
+ captor.capture())
+ captor.getValue.asScala.toList
+ }
+
+ def createOfferId(offerId: String): OfferID = {
+ OfferID.newBuilder().setValue(offerId).build()
+ }
+
+ def createSlaveId(slaveId: String): SlaveID = {
+ SlaveID.newBuilder().setValue(slaveId).build()
+ }
+
+ def createExecutorId(executorId: String): ExecutorID = {
+ ExecutorID.newBuilder().setValue(executorId).build()
+ }
+
+ def createTaskId(taskId: String): TaskID = {
+ TaskID.newBuilder().setValue(taskId).build()
+ }
+}