aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala/org/apache
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2016-08-26 12:25:22 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-26 12:25:22 -0700
commit8e5475be3c9a620f18f6712631b093464a7d0ee7 (patch)
tree417e25ea8798c0f9313285623a664fe7ac4fc003 /core/src/test/scala/org/apache
parentc0949dc944b7e2fc8a4465acc68a8f2713b3fa13 (diff)
downloadspark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.gz
spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.bz2
spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.zip
[SPARK-16967] move mesos to module
## What changes were proposed in this pull request? Move Mesos code into a mvn module ## How was this patch tested? unit tests manually submitting a client mode and cluster mode job spark/mesos integration test suite Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #14637 from mgummelt/mesos-module.
Diffstat (limited to 'core/src/test/scala/org/apache')
-rw-r--r--core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala28
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala213
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala517
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala385
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala255
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala36
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala85
7 files changed, 0 insertions, 1519 deletions
diff --git a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
index 7d75a93ff6..f8938dfede 100644
--- a/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkContextSchedulerCreationSuite.scala
@@ -22,7 +22,6 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SchedulerBackend, TaskScheduler, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.StandaloneSchedulerBackend
-import org.apache.spark.scheduler.cluster.mesos.{MesosCoarseGrainedSchedulerBackend, MesosFineGrainedSchedulerBackend}
import org.apache.spark.scheduler.local.LocalSchedulerBackend
@@ -130,31 +129,4 @@ class SparkContextSchedulerCreationSuite
case _ => fail()
}
}
-
- def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
- val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
- try {
- val sched = createTaskScheduler(master, "client", conf)
- assert(sched.backend.getClass === expectedClass)
- } catch {
- case e: UnsatisfiedLinkError =>
- assert(e.getMessage.contains("mesos"))
- logWarning("Mesos not available, could not test actual Mesos scheduler creation")
- case e: Throwable => fail(e)
- }
- }
-
- test("mesos fine-grained") {
- testMesos("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false)
- }
-
- test("mesos coarse-grained") {
- testMesos("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true)
- }
-
- test("mesos with zookeeper") {
- testMesos("mesos://zk://localhost:1234,localhost:2345",
- classOf[MesosFineGrainedSchedulerBackend], coarse = false)
- }
-
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
deleted file mode 100644
index 87d9080de5..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.{Collection, Collections, Date}
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Scalar, Type}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite}
-import org.apache.spark.deploy.Command
-import org.apache.spark.deploy.mesos.MesosDriverDescription
-
-class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar {
-
- private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq())
- private var driver: SchedulerDriver = _
- private var scheduler: MesosClusterScheduler = _
-
- private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = {
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
-
- if (sparkConfVars != null) {
- conf.setAll(sparkConfVars)
- }
-
- driver = mock[SchedulerDriver]
- scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- scheduler.start()
- }
-
- test("can queue drivers") {
- setScheduler()
-
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val response2 =
- scheduler.submitDriver(new MesosDriverDescription(
- "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
- assert(response2.success)
- val state = scheduler.getSchedulerState()
- val queuedDrivers = state.queuedDrivers.toList
- assert(queuedDrivers(0).submissionId == response.submissionId)
- assert(queuedDrivers(1).submissionId == response2.submissionId)
- }
-
- test("can kill queued drivers") {
- setScheduler()
-
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
- assert(response.success)
- val killResponse = scheduler.killDriver(response.submissionId)
- assert(killResponse.success)
- val state = scheduler.getSchedulerState()
- assert(state.queuedDrivers.isEmpty)
- }
-
- test("can handle multiple roles") {
- setScheduler()
-
- val driver = mock[SchedulerDriver]
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1200, 1.5, true,
- command,
- Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")),
- "s1",
- new Date()))
- assert(response.success)
- val offer = Offer.newBuilder()
- .addResources(
- Resource.newBuilder().setRole("*")
- .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("*")
- .setScalar(Scalar.newBuilder().setValue(1000).build())
- .setName("mem")
- .setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("role2")
- .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
- .addResources(
- Resource.newBuilder().setRole("role2")
- .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR))
- .setId(OfferID.newBuilder().setValue("o1").build())
- .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
- .setSlaveId(SlaveID.newBuilder().setValue("s1").build())
- .setHostname("host1")
- .build()
-
- val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
-
- when(
- driver.launchTasks(
- Matchers.eq(Collections.singleton(offer.getId)),
- capture.capture())
- ).thenReturn(Status.valueOf(1))
-
- scheduler.resourceOffers(driver, Collections.singletonList(offer))
-
- val taskInfos = capture.getValue
- assert(taskInfos.size() == 1)
- val taskInfo = taskInfos.iterator().next()
- val resources = taskInfo.getResourcesList
- assert(scheduler.getResource(resources, "cpus") == 1.5)
- assert(scheduler.getResource(resources, "mem") == 1200)
- val resourcesSeq: Seq[Resource] = resources.asScala
- val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList
- assert(cpus.size == 2)
- assert(cpus.exists(_.getRole().equals("role2")))
- assert(cpus.exists(_.getRole().equals("*")))
- val mem = resourcesSeq.filter(_.getName.equals("mem")).toList
- assert(mem.size == 2)
- assert(mem.exists(_.getRole().equals("role2")))
- assert(mem.exists(_.getRole().equals("*")))
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(offer.getId)),
- capture.capture()
- )
- }
-
- test("escapes commandline args for the shell") {
- setScheduler()
-
- val conf = new SparkConf()
- conf.setMaster("mesos://localhost:5050")
- conf.setAppName("spark mesos")
- val scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = { ready = true }
- }
- val escape = scheduler.shellEscape _
- def wrapped(str: String): String = "\"" + str + "\""
-
- // Wrapped in quotes
- assert(escape("'should be left untouched'") === "'should be left untouched'")
- assert(escape("\"should be left untouched\"") === "\"should be left untouched\"")
-
- // Harmless
- assert(escape("") === "")
- assert(escape("harmless") === "harmless")
- assert(escape("har-m.l3ss") === "har-m.l3ss")
-
- // Special Chars escape
- assert(escape("should escape this \" quote") === wrapped("should escape this \\\" quote"))
- assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote"))
- assert(escape("should escape this $ dollar") === wrapped("should escape this \\$ dollar"))
- assert(escape("should escape this ` backtick") === wrapped("should escape this \\` backtick"))
- assert(escape("""should escape this \ backslash""")
- === wrapped("""should escape this \\ backslash"""))
- assert(escape("""\"?""") === wrapped("""\\\"?"""))
-
-
- // Special Chars no escape only wrap
- List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", ")").foreach(char => {
- assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this"))
- })
- }
-
- test("supports spark.mesos.driverEnv.*") {
- setScheduler()
-
- val mem = 1000
- val cpu = 1
-
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", mem, cpu, true,
- command,
- Map("spark.mesos.executor.home" -> "test",
- "spark.app.name" -> "test",
- "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"),
- "s1",
- new Date()))
- assert(response.success)
-
- val offer = Utils.createOffer("o1", "s1", mem, cpu)
- scheduler.resourceOffers(driver, List(offer).asJava)
- val tasks = Utils.verifyTaskLaunched(driver, "o1")
- val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v =>
- (v.getName, v.getValue)).toMap
- assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL")
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
deleted file mode 100644
index c06379707a..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ /dev/null
@@ -1,517 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
-import scala.reflect.ClassTag
-
-import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos._
-import org.mockito.Matchers
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-import org.scalatest.BeforeAndAfter
-
-import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
-import org.apache.spark.rpc.RpcEndpointRef
-import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
-import org.apache.spark.scheduler.TaskSchedulerImpl
-import org.apache.spark.scheduler.cluster.mesos.Utils._
-
-class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
- with LocalSparkContext
- with MockitoSugar
- with BeforeAndAfter {
-
- private var sparkConf: SparkConf = _
- private var driver: SchedulerDriver = _
- private var taskScheduler: TaskSchedulerImpl = _
- private var backend: MesosCoarseGrainedSchedulerBackend = _
- private var externalShuffleClient: MesosExternalShuffleClient = _
- private var driverEndpoint: RpcEndpointRef = _
- @volatile private var stopCalled = false
-
- test("mesos supports killing and limiting executors") {
- setBackend()
- sparkConf.set("spark.driver.host", "driverHost")
- sparkConf.set("spark.driver.port", "1234")
-
- val minMem = backend.executorMemory(sc)
- val minCpu = 4
- val offers = List((minMem, minCpu))
-
- // launches a task on a valid offer
- offerResources(offers)
- verifyTaskLaunched(driver, "o1")
-
- // kills executors
- backend.doRequestTotalExecutors(0)
- assert(backend.doKillExecutors(Seq("0")))
- val taskID0 = createTaskId("0")
- verify(driver, times(1)).killTask(taskID0)
-
- // doesn't launch a new task when requested executors == 0
- offerResources(offers, 2)
- verifyDeclinedOffer(driver, createOfferId("o2"))
-
- // Launches a new task when requested executors is positive
- backend.doRequestTotalExecutors(2)
- offerResources(offers, 2)
- verifyTaskLaunched(driver, "o2")
- }
-
- test("mesos supports killing and relaunching tasks with executors") {
- setBackend()
-
- // launches a task on a valid offer
- val minMem = backend.executorMemory(sc) + 1024
- val minCpu = 4
- val offer1 = (minMem, minCpu)
- val offer2 = (minMem, 1)
- offerResources(List(offer1, offer2))
- verifyTaskLaunched(driver, "o1")
-
- // accounts for a killed task
- val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED)
- backend.statusUpdate(driver, status)
- verify(driver, times(1)).reviveOffers()
-
- // Launches a new task on a valid offer from the same slave
- offerResources(List(offer2))
- verifyTaskLaunched(driver, "o2")
- }
-
- test("mesos supports spark.executor.cores") {
- val executorCores = 4
- setBackend(Map("spark.executor.cores" -> executorCores.toString))
-
- val executorMemory = backend.executorMemory(sc)
- val offers = List((executorMemory * 2, executorCores + 1))
- offerResources(offers)
-
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.length == 1)
-
- val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
- assert(cpus == executorCores)
- }
-
- test("mesos supports unset spark.executor.cores") {
- setBackend()
-
- val executorMemory = backend.executorMemory(sc)
- val offerCores = 10
- offerResources(List((executorMemory * 2, offerCores)))
-
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.length == 1)
-
- val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
- assert(cpus == offerCores)
- }
-
- test("mesos does not acquire more than spark.cores.max") {
- val maxCores = 10
- setBackend(Map("spark.cores.max" -> maxCores.toString))
-
- val executorMemory = backend.executorMemory(sc)
- offerResources(List((executorMemory, maxCores + 1)))
-
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.length == 1)
-
- val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus")
- assert(cpus == maxCores)
- }
-
- test("mesos declines offers that violate attribute constraints") {
- setBackend(Map("spark.mesos.constraints" -> "x:true"))
- offerResources(List((backend.executorMemory(sc), 4)))
- verifyDeclinedOffer(driver, createOfferId("o1"), true)
- }
-
- test("mesos declines offers with a filter when reached spark.cores.max") {
- val maxCores = 3
- setBackend(Map("spark.cores.max" -> maxCores.toString))
-
- val executorMemory = backend.executorMemory(sc)
- offerResources(List(
- (executorMemory, maxCores + 1),
- (executorMemory, maxCores + 1)))
-
- verifyTaskLaunched(driver, "o1")
- verifyDeclinedOffer(driver, createOfferId("o2"), true)
- }
-
- test("mesos assigns tasks round-robin on offers") {
- val executorCores = 4
- val maxCores = executorCores * 2
- setBackend(Map("spark.executor.cores" -> executorCores.toString,
- "spark.cores.max" -> maxCores.toString))
-
- val executorMemory = backend.executorMemory(sc)
- offerResources(List(
- (executorMemory * 2, executorCores * 2),
- (executorMemory * 2, executorCores * 2)))
-
- verifyTaskLaunched(driver, "o1")
- verifyTaskLaunched(driver, "o2")
- }
-
- test("mesos creates multiple executors on a single slave") {
- val executorCores = 4
- setBackend(Map("spark.executor.cores" -> executorCores.toString))
-
- // offer with room for two executors
- val executorMemory = backend.executorMemory(sc)
- offerResources(List((executorMemory * 2, executorCores * 2)))
-
- // verify two executors were started on a single offer
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.length == 2)
- }
-
- test("mesos doesn't register twice with the same shuffle service") {
- setBackend(Map("spark.shuffle.service.enabled" -> "true"))
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched(driver, "o1")
-
- val offer2 = createOffer("o2", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer2).asJava)
- verifyTaskLaunched(driver, "o2")
-
- val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING)
- backend.statusUpdate(driver, status1)
-
- val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING)
- backend.statusUpdate(driver, status2)
- verify(externalShuffleClient, times(1))
- .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
- }
-
- test("Port offer decline when there is no appropriate range") {
- setBackend(Map("spark.blockManager.port" -> "30100"))
- val offeredPorts = (31100L, 31200L)
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
- backend.resourceOffers(driver, List(offer1).asJava)
- verify(driver, times(1)).declineOffer(offer1.getId)
- }
-
- test("Port offer accepted when ephemeral ports are used") {
- setBackend()
- val offeredPorts = (31100L, 31200L)
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
- backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched(driver, "o1")
- }
-
- test("Port offer accepted with user defined port numbers") {
- val port = 30100
- setBackend(Map("spark.blockManager.port" -> s"$port"))
- val offeredPorts = (30000L, 31000L)
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
- backend.resourceOffers(driver, List(offer1).asJava)
- val taskInfo = verifyTaskLaunched(driver, "o1")
-
- val taskPortResources = taskInfo.head.getResourcesList.asScala.
- find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
-
- val isPortInOffer = (r: Resource) => {
- r.getRanges().getRangeList
- .asScala.exists(range => range.getBegin == port && range.getEnd == port)
- }
- assert(taskPortResources.exists(isPortInOffer))
- }
-
- test("mesos kills an executor when told") {
- setBackend()
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
- verifyTaskLaunched(driver, "o1")
-
- backend.doKillExecutors(List("0"))
- verify(driver, times(1)).killTask(createTaskId("0"))
- }
-
- test("weburi is set in created scheduler driver") {
- setBackend()
- val taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.sc).thenReturn(sc)
- val driver = mock[SchedulerDriver]
- when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
- val securityManager = mock[SecurityManager]
-
- val backend = new MesosCoarseGrainedSchedulerBackend(
- taskScheduler, sc, "master", securityManager) {
- override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = {
- markRegistered()
- assert(webuiUrl.isDefined)
- assert(webuiUrl.get.equals("http://webui"))
- driver
- }
- }
-
- backend.start()
- }
-
- test("honors unset spark.mesos.containerizer") {
- setBackend(Map("spark.mesos.executor.docker.image" -> "test"))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER)
- }
-
- test("honors spark.mesos.containerizer=\"mesos\"") {
- setBackend(Map(
- "spark.mesos.executor.docker.image" -> "test",
- "spark.mesos.containerizer" -> "mesos"))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val taskInfos = verifyTaskLaunched(driver, "o1")
- assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS)
- }
-
- test("docker settings are reflected in created tasks") {
- setBackend(Map(
- "spark.mesos.executor.docker.image" -> "some_image",
- "spark.mesos.executor.docker.forcePullImage" -> "true",
- "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro",
- "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp"
- ))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val launchedTasks = verifyTaskLaunched(driver, "o1")
- assert(launchedTasks.size == 1)
-
- val containerInfo = launchedTasks.head.getContainer
- assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
- val volumes = containerInfo.getVolumesList.asScala
- assert(volumes.size == 1)
-
- val volume = volumes.head
- assert(volume.getHostPath == "/host_vol")
- assert(volume.getContainerPath == "/container_vol")
- assert(volume.getMode == Volume.Mode.RO)
-
- val dockerInfo = containerInfo.getDocker
-
- assert(dockerInfo.getImage == "some_image")
- assert(dockerInfo.getForcePullImage)
-
- val portMappings = dockerInfo.getPortMappingsList.asScala
- assert(portMappings.size == 1)
-
- val portMapping = portMappings.head
- assert(portMapping.getHostPort == 8080)
- assert(portMapping.getContainerPort == 80)
- assert(portMapping.getProtocol == "tcp")
- }
-
- test("force-pull-image option is disabled by default") {
- setBackend(Map(
- "spark.mesos.executor.docker.image" -> "some_image"
- ))
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val launchedTasks = verifyTaskLaunched(driver, "o1")
- assert(launchedTasks.size == 1)
-
- val containerInfo = launchedTasks.head.getContainer
- assert(containerInfo.getType == ContainerInfo.Type.DOCKER)
-
- val dockerInfo = containerInfo.getDocker
-
- assert(dockerInfo.getImage == "some_image")
- assert(!dockerInfo.getForcePullImage)
- }
-
- test("Do not call removeExecutor() after backend is stopped") {
- setBackend()
-
- // launches a task on a valid offer
- val offers = List((backend.executorMemory(sc), 1))
- offerResources(offers)
- verifyTaskLaunched(driver, "o1")
-
- // launches a thread simulating status update
- val statusUpdateThread = new Thread {
- override def run(): Unit = {
- while (!stopCalled) {
- Thread.sleep(100)
- }
-
- val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
- backend.statusUpdate(driver, status)
- }
- }.start
-
- backend.stop()
- // Any method of the backend involving sending messages to the driver endpoint should not
- // be called after the backend is stopped.
- verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
- }
-
- test("mesos supports spark.executor.uri") {
- val url = "spark.spark.spark.com"
- setBackend(Map(
- "spark.executor.uri" -> url
- ), false)
-
- val (mem, cpu) = (backend.executorMemory(sc), 4)
-
- val offer1 = createOffer("o1", "s1", mem, cpu)
- backend.resourceOffers(driver, List(offer1).asJava)
-
- val launchedTasks = verifyTaskLaunched(driver, "o1")
- assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url)
- }
-
- private def verifyDeclinedOffer(driver: SchedulerDriver,
- offerId: OfferID,
- filter: Boolean = false): Unit = {
- if (filter) {
- verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters])
- } else {
- verify(driver, times(1)).declineOffer(Matchers.eq(offerId))
- }
- }
-
- private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = {
- val mesosOffers = offers.zipWithIndex.map {case (offer, i) =>
- createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)}
-
- backend.resourceOffers(driver, mesosOffers.asJava)
- }
-
- private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = {
- TaskStatus.newBuilder()
- .setTaskId(TaskID.newBuilder().setValue(taskId).build())
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
- .setState(state)
- .build
- }
-
- private def createSchedulerBackend(
- taskScheduler: TaskSchedulerImpl,
- driver: SchedulerDriver,
- shuffleClient: MesosExternalShuffleClient,
- endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = {
- val securityManager = mock[SecurityManager]
-
- val backend = new MesosCoarseGrainedSchedulerBackend(
- taskScheduler, sc, "master", securityManager) {
- override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = driver
-
- override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
-
- override protected def createDriverEndpointRef(
- properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint
-
- // override to avoid race condition with the driver thread on `mesosDriver`
- override def startScheduler(newDriver: SchedulerDriver): Unit = {
- mesosDriver = newDriver
- }
-
- override def stopExecutors(): Unit = {
- stopCalled = true
- }
-
- markRegistered()
- }
- backend.start()
- backend
- }
-
- private def setBackend(sparkConfVars: Map[String, String] = null,
- setHome: Boolean = true) {
- sparkConf = (new SparkConf)
- .setMaster("local[*]")
- .setAppName("test-mesos-dynamic-alloc")
- .set("spark.mesos.driver.webui.url", "http://webui")
-
- if (setHome) {
- sparkConf.setSparkHome("/path")
- }
-
- if (sparkConfVars != null) {
- sparkConf.setAll(sparkConfVars)
- }
-
- sc = new SparkContext(sparkConf)
-
- driver = mock[SchedulerDriver]
- when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
- taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.sc).thenReturn(sc)
- externalShuffleClient = mock[MesosExternalShuffleClient]
- driverEndpoint = mock[RpcEndpointRef]
-
- backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint)
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
deleted file mode 100644
index fcf39f6391..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-import java.util.Arrays
-import java.util.Collection
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-import scala.collection.mutable.ArrayBuffer
-
-import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Matchers._
-import org.mockito.Mockito._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite}
-import org.apache.spark.executor.MesosExecutorBackend
-import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded,
- TaskDescription, TaskSchedulerImpl, WorkerOffer}
-import org.apache.spark.scheduler.cluster.ExecutorInfo
-
-class MesosFineGrainedSchedulerBackendSuite
- extends SparkFunSuite with LocalSparkContext with MockitoSugar {
-
- test("weburi is set in created scheduler driver") {
- val conf = new SparkConf
- conf.set("spark.mesos.driver.webui.url", "http://webui")
- conf.set("spark.app.name", "name1")
-
- val sc = mock[SparkContext]
- when(sc.conf).thenReturn(conf)
- when(sc.sparkUser).thenReturn("sparkUser1")
- when(sc.appName).thenReturn("appName1")
-
- val taskScheduler = mock[TaskSchedulerImpl]
- val driver = mock[SchedulerDriver]
- when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)
-
- val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") {
- override protected def createSchedulerDriver(
- masterUrl: String,
- scheduler: Scheduler,
- sparkUser: String,
- appName: String,
- conf: SparkConf,
- webuiUrl: Option[String] = None,
- checkpoint: Option[Boolean] = None,
- failoverTimeout: Option[Double] = None,
- frameworkId: Option[String] = None): SchedulerDriver = {
- markRegistered()
- assert(webuiUrl.isDefined)
- assert(webuiUrl.get.equals("http://webui"))
- driver
- }
- }
-
- backend.start()
- }
-
- test("Use configured mesosExecutor.cores for ExecutorInfo") {
- val mesosExecutorCores = 3
- val conf = new SparkConf
- conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString)
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-
- when(sc.conf).thenReturn(conf)
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.executorMemory).thenReturn(100)
- when(sc.listenerBus).thenReturn(listenerBus)
- val taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
- val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
- val resources = Arrays.asList(
- mesosSchedulerBackend.createResource("cpus", 4),
- mesosSchedulerBackend.createResource("mem", 1024))
- // uri is null.
- val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
- val executorResources = executorInfo.getResourcesList
- val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue
-
- assert(cpus === mesosExecutorCores)
- }
-
- test("check spark-class location correctly") {
- val conf = new SparkConf
- conf.set("spark.mesos.executor.home", "/mesos-home")
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
-
- when(sc.conf).thenReturn(conf)
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.executorMemory).thenReturn(100)
- when(sc.listenerBus).thenReturn(listenerBus)
- val taskScheduler = mock[TaskSchedulerImpl]
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
- val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
- val resources = Arrays.asList(
- mesosSchedulerBackend.createResource("cpus", 4),
- mesosSchedulerBackend.createResource("mem", 1024))
- // uri is null.
- val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
- assert(executorInfo.getCommand.getValue ===
- s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}")
-
- // uri exists.
- conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz")
- val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id")
- assert(executorInfo1.getCommand.getValue ===
- s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}")
- }
-
- test("spark docker properties correctly populate the DockerInfo message") {
- val taskScheduler = mock[TaskSchedulerImpl]
-
- val conf = new SparkConf()
- .set("spark.mesos.executor.docker.image", "spark/mock")
- .set("spark.mesos.executor.docker.forcePullImage", "true")
- .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro")
- .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp")
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.executorMemory).thenReturn(100)
- when(sc.getSparkHome()).thenReturn(Option("/spark-home"))
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.conf).thenReturn(conf)
- when(sc.listenerBus).thenReturn(listenerBus)
-
- val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
- val (execInfo, _) = backend.createExecutorInfo(
- Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor")
- assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock"))
- assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true))
- val portmaps = execInfo.getContainer.getDocker.getPortMappingsList
- assert(portmaps.get(0).getHostPort.equals(80))
- assert(portmaps.get(0).getContainerPort.equals(8080))
- assert(portmaps.get(0).getProtocol.equals("tcp"))
- assert(portmaps.get(1).getHostPort.equals(53))
- assert(portmaps.get(1).getContainerPort.equals(53))
- assert(portmaps.get(1).getProtocol.equals("tcp"))
- val volumes = execInfo.getContainer.getVolumesList
- assert(volumes.get(0).getContainerPath.equals("/a"))
- assert(volumes.get(0).getMode.equals(Volume.Mode.RW))
- assert(volumes.get(1).getContainerPath.equals("/b"))
- assert(volumes.get(1).getHostPath.equals("/b"))
- assert(volumes.get(1).getMode.equals(Volume.Mode.RW))
- assert(volumes.get(2).getContainerPath.equals("/c"))
- assert(volumes.get(2).getHostPath.equals("/c"))
- assert(volumes.get(2).getMode.equals(Volume.Mode.RW))
- assert(volumes.get(3).getContainerPath.equals("/d"))
- assert(volumes.get(3).getMode.equals(Volume.Mode.RO))
- assert(volumes.get(4).getContainerPath.equals("/e"))
- assert(volumes.get(4).getHostPath.equals("/e"))
- assert(volumes.get(4).getMode.equals(Volume.Mode.RO))
- }
-
- test("mesos resource offers result in launching tasks") {
- def createOffer(id: Int, mem: Int, cpu: Int): Offer = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
- builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
- .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
- .setHostname(s"host${id.toString}").build()
- }
-
- val driver = mock[SchedulerDriver]
- val taskScheduler = mock[TaskSchedulerImpl]
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.executorMemory).thenReturn(100)
- when(sc.getSparkHome()).thenReturn(Option("/path"))
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.conf).thenReturn(new SparkConf)
- when(sc.listenerBus).thenReturn(listenerBus)
-
- val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
- val minMem = backend.executorMemory(sc)
- val minCpu = 4
-
- val mesosOffers = new java.util.ArrayList[Offer]
- mesosOffers.add(createOffer(1, minMem, minCpu))
- mesosOffers.add(createOffer(2, minMem - 1, minCpu))
- mesosOffers.add(createOffer(3, minMem, minCpu))
-
- val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2)
- expectedWorkerOffers.append(new WorkerOffer(
- mesosOffers.get(0).getSlaveId.getValue,
- mesosOffers.get(0).getHostname,
- (minCpu - backend.mesosExecutorCores).toInt
- ))
- expectedWorkerOffers.append(new WorkerOffer(
- mesosOffers.get(2).getSlaveId.getValue,
- mesosOffers.get(2).getHostname,
- (minCpu - backend.mesosExecutorCores).toInt
- ))
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
- when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
-
- val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
- when(
- driver.launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
- ).thenReturn(Status.valueOf(1))
- when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1))
- when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1))
-
- backend.resourceOffers(driver, mesosOffers)
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
- verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId)
- verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId)
- assert(capture.getValue.size() === 1)
- val taskInfo = capture.getValue.iterator().next()
- assert(taskInfo.getName.equals("n1"))
- val cpus = taskInfo.getResourcesList.get(0)
- assert(cpus.getName.equals("cpus"))
- assert(cpus.getScalar.getValue.equals(2.0))
- assert(taskInfo.getSlaveId.getValue.equals("s1"))
-
- // Unwanted resources offered on an existing node. Make sure they are declined
- val mesosOffers2 = new java.util.ArrayList[Offer]
- mesosOffers2.add(createOffer(1, minMem, minCpu))
- reset(taskScheduler)
- reset(driver)
- when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq()))
- when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
- when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))
-
- backend.resourceOffers(driver, mesosOffers2)
- verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId)
- }
-
- test("can handle multiple roles") {
- val driver = mock[SchedulerDriver]
- val taskScheduler = mock[TaskSchedulerImpl]
-
- val listenerBus = mock[LiveListenerBus]
- listenerBus.post(
- SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty)))
-
- val sc = mock[SparkContext]
- when(sc.executorMemory).thenReturn(100)
- when(sc.getSparkHome()).thenReturn(Option("/path"))
- when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String])
- when(sc.conf).thenReturn(new SparkConf)
- when(sc.listenerBus).thenReturn(listenerBus)
-
- val id = 1
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setRole("prod")
- .setScalar(Scalar.newBuilder().setValue(500))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setRole("prod")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(1))
- builder.addResourcesBuilder()
- .setName("mem")
- .setRole("dev")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(600))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setRole("dev")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(2))
- val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build())
- .setFrameworkId(FrameworkID.newBuilder().setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}"))
- .setHostname(s"host${id.toString}").build()
-
- val mesosOffers = new java.util.ArrayList[Offer]
- mesosOffers.add(offer)
-
- val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master")
-
- val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1)
- expectedWorkerOffers.append(new WorkerOffer(
- mesosOffers.get(0).getSlaveId.getValue,
- mesosOffers.get(0).getHostname,
- 2 // Deducting 1 for executor
- ))
-
- val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0)))
- when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc)))
- when(taskScheduler.CPUS_PER_TASK).thenReturn(1)
-
- val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]])
- when(
- driver.launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
- ).thenReturn(Status.valueOf(1))
-
- backend.resourceOffers(driver, mesosOffers)
-
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)),
- capture.capture(),
- any(classOf[Filters])
- )
-
- assert(capture.getValue.size() === 1)
- val taskInfo = capture.getValue.iterator().next()
- assert(taskInfo.getName.equals("n1"))
- assert(taskInfo.getResourcesCount === 1)
- val cpusDev = taskInfo.getResourcesList.get(0)
- assert(cpusDev.getName.equals("cpus"))
- assert(cpusDev.getScalar.getValue.equals(1.0))
- assert(cpusDev.getRole.equals("dev"))
- val executorResources = taskInfo.getExecutor.getResourcesList.asScala
- assert(executorResources.exists { r =>
- r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod")
- })
- assert(executorResources.exists { r =>
- r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod")
- })
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
deleted file mode 100644
index e3d794931a..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ /dev/null
@@ -1,255 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import scala.collection.JavaConverters._
-import scala.language.reflectiveCalls
-
-import org.apache.mesos.Protos.{Resource, Value}
-import org.mockito.Mockito._
-import org.scalatest._
-import org.scalatest.mock.MockitoSugar
-
-import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
-
-class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar {
-
- // scalastyle:off structural.type
- // this is the documented way of generating fixtures in scalatest
- def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new {
- val sparkConf = new SparkConf
- val sc = mock[SparkContext]
- when(sc.conf).thenReturn(sparkConf)
- }
-
- private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
- val rangeValue = Value.Range.newBuilder()
- rangeValue.setBegin(range._1)
- rangeValue.setEnd(range._2)
- val builder = Resource.newBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
-
- role.foreach { r => builder.setRole(r) }
- builder.build()
- }
-
- private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
- resources.flatMap{resource => resource.getRanges.getRangeList
- .asScala.map(range => (range.getBegin, range.getEnd))}
- }
-
- def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
- : Boolean = {
- array1.sortBy(identity).deep == array2.sortBy(identity).deep
- }
-
- def arePortsEqual(array1: Array[Long], array2: Array[Long])
- : Boolean = {
- array1.sortBy(identity).deep == array2.sortBy(identity).deep
- }
-
- def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
- resources.flatMap{ resource =>
- resource.getRanges.getRangeList.asScala.toList.map{
- range => (range.getBegin, range.getEnd)}}
- }
-
- val utils = new MesosSchedulerUtils { }
- // scalastyle:on structural.type
-
- test("use at-least minimum overhead") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(512)
- utils.executorMemory(f.sc) shouldBe 896
- }
-
- test("use overhead if it is greater than minimum value") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(4096)
- utils.executorMemory(f.sc) shouldBe 4505
- }
-
- test("use spark.mesos.executor.memoryOverhead (if set)") {
- val f = fixture
- when(f.sc.executorMemory).thenReturn(1024)
- f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512")
- utils.executorMemory(f.sc) shouldBe 1536
- }
-
- test("parse a non-empty constraint string correctly") {
- val expectedMap = Map(
- "os" -> Set("centos7"),
- "zone" -> Set("us-east-1a", "us-east-1b")
- )
- utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap)
- }
-
- test("parse an empty constraint string correctly") {
- utils.parseConstraintString("") shouldBe Map()
- }
-
- test("throw an exception when the input is malformed") {
- an[IllegalArgumentException] should be thrownBy
- utils.parseConstraintString("os;zone:us-east")
- }
-
- test("empty values for attributes' constraints matches all values") {
- val constraintsStr = "os:"
- val parsedConstraints = utils.parseConstraintString(constraintsStr)
-
- parsedConstraints shouldBe Map("os" -> Set())
-
- val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build()
- val noOsOffer = Map("zone" -> zoneSet)
- val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build())
- val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build())
-
- utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false
- utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true
- utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true
- }
-
- test("subset match is performed for set attributes") {
- val supersetConstraint = Map(
- "os" -> Value.Text.newBuilder().setValue("ubuntu").build(),
- "zone" -> Value.Set.newBuilder()
- .addItem("us-east-1a")
- .addItem("us-east-1b")
- .addItem("us-east-1c")
- .build())
-
- val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c"
- val parsedConstraints = utils.parseConstraintString(zoneConstraintStr)
-
- utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true
- }
-
- test("less than equal match is performed on scalar attributes") {
- val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build())
-
- val ltConstraint = utils.parseConstraintString("gpus:2")
- val eqConstraint = utils.parseConstraintString("gpus:3")
- val gtConstraint = utils.parseConstraintString("gpus:4")
-
- utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
- }
-
- test("contains match is performed for range attributes") {
- val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build())
- val ltConstraint = utils.parseConstraintString("ports:6000")
- val eqConstraint = utils.parseConstraintString("ports:7500")
- val gtConstraint = utils.parseConstraintString("ports:8002")
- val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300")
-
- utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false
- utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false
- utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true
- }
-
- test("equality match is performed for text attributes") {
- val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build())
-
- val trueConstraint = utils.parseConstraintString("os:centos7")
- val falseConstraint = utils.parseConstraintString("os:ubuntu")
-
- utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true
- utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
- }
-
- test("Port reservation is done correctly with user specified ports only") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "3000" )
- conf.set("spark.blockManager.port", "4000")
- val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3000, 4000), List(portResource))
- resourcesToBeUsed.length shouldBe 2
-
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
-
- portsToUse.length shouldBe 2
- arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
-
- val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
- val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
-
- arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
- }
-
- test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "3100" )
- val portResource = createTestPortResource((3000, 5000), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(3100), List(portResource))
-
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.length shouldBe 1
- portsToUse.contains(3100) shouldBe true
- }
-
- test("Port reservation is done correctly with all random ports") {
- val conf = new SparkConf()
- val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
-
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(), List(portResource))
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.isEmpty shouldBe true
- }
-
- test("Port reservation is done correctly with user specified ports only - multiple ranges") {
- val conf = new SparkConf()
- conf.set("spark.executor.port", "2100" )
- conf.set("spark.blockManager.port", "4000")
- val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
- createTestPortResource((2000, 2500), Some("other_role")))
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(2100, 4000), portResourceList)
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
-
- portsToUse.length shouldBe 2
- val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
- val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
-
- val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
-
- arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
- arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
- }
-
- test("Port reservation is done correctly with all random ports - multiple ranges") {
- val conf = new SparkConf()
- val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
- createTestPortResource((2000, 2500), Some("other_role")))
- val (resourcesLeft, resourcesToBeUsed) = utils
- .partitionPortResources(List(), portResourceList)
- val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
- portsToUse.isEmpty shouldBe true
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
deleted file mode 100644
index 5a81bb335f..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.nio.ByteBuffer
-
-import org.apache.spark.SparkFunSuite
-
-class MesosTaskLaunchDataSuite extends SparkFunSuite {
- test("serialize and deserialize data must be same") {
- val serializedTask = ByteBuffer.allocate(40)
- (Range(100, 110).map(serializedTask.putInt(_)))
- serializedTask.rewind
- val attemptNumber = 100
- val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString
- serializedTask.rewind
- val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString)
- assert(mesosTaskLaunchData.attemptNumber == attemptNumber)
- assert(mesosTaskLaunchData.serializedTask.equals(serializedTask))
- }
-}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
deleted file mode 100644
index fa9406f5f0..0000000000
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.scheduler.cluster.mesos
-
-import java.util.Collections
-
-import scala.collection.JavaConverters._
-
-import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
-import org.apache.mesos.SchedulerDriver
-import org.mockito.{ArgumentCaptor, Matchers}
-import org.mockito.Mockito._
-
-object Utils {
- def createOffer(
- offerId: String,
- slaveId: String,
- mem: Int,
- cpu: Int,
- ports: Option[(Long, Long)] = None): Offer = {
- val builder = Offer.newBuilder()
- builder.addResourcesBuilder()
- .setName("mem")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(mem))
- builder.addResourcesBuilder()
- .setName("cpus")
- .setType(Value.Type.SCALAR)
- .setScalar(Scalar.newBuilder().setValue(cpu))
- ports.foreach { resourcePorts =>
- builder.addResourcesBuilder()
- .setName("ports")
- .setType(Value.Type.RANGES)
- .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
- .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
- }
- builder.setId(createOfferId(offerId))
- .setFrameworkId(FrameworkID.newBuilder()
- .setValue("f1"))
- .setSlaveId(SlaveID.newBuilder().setValue(slaveId))
- .setHostname(s"host${slaveId}")
- .build()
- }
-
- def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = {
- val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]])
- verify(driver, times(1)).launchTasks(
- Matchers.eq(Collections.singleton(createOfferId(offerId))),
- captor.capture())
- captor.getValue.asScala.toList
- }
-
- def createOfferId(offerId: String): OfferID = {
- OfferID.newBuilder().setValue(offerId).build()
- }
-
- def createSlaveId(slaveId: String): SlaveID = {
- SlaveID.newBuilder().setValue(slaveId).build()
- }
-
- def createExecutorId(executorId: String): ExecutorID = {
- ExecutorID.newBuilder().setValue(executorId).build()
- }
-
- def createTaskId(taskId: String): TaskID = {
- TaskID.newBuilder().setValue(taskId).build()
- }
-}
-