diff options
author | Michael Gummelt <mgummelt@mesosphere.io> | 2016-08-26 12:25:22 -0700 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2016-08-26 12:25:22 -0700 |
commit | 8e5475be3c9a620f18f6712631b093464a7d0ee7 (patch) | |
tree | 417e25ea8798c0f9313285623a664fe7ac4fc003 /mesos/src/test/scala | |
parent | c0949dc944b7e2fc8a4465acc68a8f2713b3fa13 (diff) | |
download | spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.gz spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.tar.bz2 spark-8e5475be3c9a620f18f6712631b093464a7d0ee7.zip |
[SPARK-16967] move mesos to module
## What changes were proposed in this pull request?
Move Mesos code into a mvn module
## How was this patch tested?
unit tests
manually submitting a client mode and cluster mode job
spark/mesos integration test suite
Author: Michael Gummelt <mgummelt@mesosphere.io>
Closes #14637 from mgummelt/mesos-module.
Diffstat (limited to 'mesos/src/test/scala')
7 files changed, 1538 insertions, 0 deletions
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala new file mode 100644 index 0000000000..6fce06632c --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManagerSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} + +class MesosClusterManagerSuite extends SparkFunSuite with LocalSparkContext { + def testURL(masterURL: String, expectedClass: Class[_], coarse: Boolean) { + val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString) + sc = new SparkContext("local", "test", conf) + val clusterManager = new MesosClusterManager() + + assert(clusterManager.canCreate(masterURL)) + val taskScheduler = clusterManager.createTaskScheduler(sc, masterURL) + val sched = clusterManager.createSchedulerBackend(sc, masterURL, taskScheduler) + assert(sched.getClass === expectedClass) + } + + test("mesos fine-grained") { + testURL("mesos://localhost:1234", classOf[MesosFineGrainedSchedulerBackend], coarse = false) + } + + test("mesos coarse-grained") { + testURL("mesos://localhost:1234", classOf[MesosCoarseGrainedSchedulerBackend], coarse = true) + } + + test("mesos with zookeeper") { + testURL("mesos://zk://localhost:1234,localhost:2345", + classOf[MesosFineGrainedSchedulerBackend], + coarse = false) + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala new file mode 100644 index 0000000000..87d9080de5 --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util.{Collection, Collections, Date} + +import scala.collection.JavaConverters._ + +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.{Scalar, Type} +import org.apache.mesos.SchedulerDriver +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkFunSuite} +import org.apache.spark.deploy.Command +import org.apache.spark.deploy.mesos.MesosDriverDescription + +class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + private val command = new Command("mainClass", Seq("arg"), Map(), Seq(), Seq(), Seq()) + private var driver: SchedulerDriver = _ + private var scheduler: MesosClusterScheduler = _ + + private def setScheduler(sparkConfVars: Map[String, String] = null): Unit = { + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + + if (sparkConfVars != null) { + conf.setAll(sparkConfVars) + } + + driver = mock[SchedulerDriver] + scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { ready = true } + } + scheduler.start() + } + + test("can queue drivers") { + setScheduler() + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1000, 1, true, + command, Map[String, String](), "s1", new Date())) + assert(response.success) + val response2 = + scheduler.submitDriver(new MesosDriverDescription( + "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date())) + assert(response2.success) + val state = scheduler.getSchedulerState() + val queuedDrivers = state.queuedDrivers.toList + assert(queuedDrivers(0).submissionId == response.submissionId) + assert(queuedDrivers(1).submissionId == response2.submissionId) + } + + test("can kill queued drivers") { + setScheduler() + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1000, 1, true, + command, Map[String, String](), "s1", new Date())) + assert(response.success) + val killResponse = scheduler.killDriver(response.submissionId) + assert(killResponse.success) + val state = scheduler.getSchedulerState() + assert(state.queuedDrivers.isEmpty) + } + + test("can handle multiple roles") { + setScheduler() + + val driver = mock[SchedulerDriver] + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 1200, 1.5, true, + command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), + "s1", + new Date())) + assert(response.success) + val offer = Offer.newBuilder() + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1000).build()) + .setName("mem") + .setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("role2") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("role2") + .setScalar(Scalar.newBuilder().setValue(500).build()).setName("mem").setType(Type.SCALAR)) + .setId(OfferID.newBuilder().setValue("o1").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) + .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) + .setHostname("host1") + .build() + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(offer.getId)), + capture.capture()) + ).thenReturn(Status.valueOf(1)) + + scheduler.resourceOffers(driver, Collections.singletonList(offer)) + + val taskInfos = capture.getValue + assert(taskInfos.size() == 1) + val taskInfo = taskInfos.iterator().next() + val resources = taskInfo.getResourcesList + assert(scheduler.getResource(resources, "cpus") == 1.5) + assert(scheduler.getResource(resources, "mem") == 1200) + val resourcesSeq: Seq[Resource] = resources.asScala + val cpus = resourcesSeq.filter(_.getName.equals("cpus")).toList + assert(cpus.size == 2) + assert(cpus.exists(_.getRole().equals("role2"))) + assert(cpus.exists(_.getRole().equals("*"))) + val mem = resourcesSeq.filter(_.getName.equals("mem")).toList + assert(mem.size == 2) + assert(mem.exists(_.getRole().equals("role2"))) + assert(mem.exists(_.getRole().equals("*"))) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(offer.getId)), + capture.capture() + ) + } + + test("escapes commandline args for the shell") { + setScheduler() + + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + val scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { ready = true } + } + val escape = scheduler.shellEscape _ + def wrapped(str: String): String = "\"" + str + "\"" + + // Wrapped in quotes + assert(escape("'should be left untouched'") === "'should be left untouched'") + assert(escape("\"should be left untouched\"") === "\"should be left untouched\"") + + // Harmless + assert(escape("") === "") + assert(escape("harmless") === "harmless") + assert(escape("har-m.l3ss") === "har-m.l3ss") + + // Special Chars escape + assert(escape("should escape this \" quote") === wrapped("should escape this \\\" quote")) + assert(escape("shouldescape\"quote") === wrapped("shouldescape\\\"quote")) + assert(escape("should escape this $ dollar") === wrapped("should escape this \\$ dollar")) + assert(escape("should escape this ` backtick") === wrapped("should escape this \\` backtick")) + assert(escape("""should escape this \ backslash""") + === wrapped("""should escape this \\ backslash""")) + assert(escape("""\"?""") === wrapped("""\\\"?""")) + + + // Special Chars no escape only wrap + List(" ", "'", "<", ">", "&", "|", "?", "*", ";", "!", "#", "(", ")").foreach(char => { + assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this")) + }) + } + + test("supports spark.mesos.driverEnv.*") { + setScheduler() + + val mem = 1000 + val cpu = 1 + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", mem, cpu, true, + command, + Map("spark.mesos.executor.home" -> "test", + "spark.app.name" -> "test", + "spark.mesos.driverEnv.TEST_ENV" -> "TEST_VAL"), + "s1", + new Date())) + assert(response.success) + + val offer = Utils.createOffer("o1", "s1", mem, cpu) + scheduler.resourceOffers(driver, List(offer).asJava) + val tasks = Utils.verifyTaskLaunched(driver, "o1") + val env = tasks.head.getCommand.getEnvironment.getVariablesList.asScala.map(v => + (v.getName, v.getValue)).toMap + assert(env.getOrElse("TEST_ENV", null) == "TEST_VAL") + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala new file mode 100644 index 0000000000..c06379707a --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -0,0 +1,517 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + +import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.mockito.Matchers +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar +import org.scalatest.BeforeAndAfter + +import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.scheduler.cluster.mesos.Utils._ + +class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite + with LocalSparkContext + with MockitoSugar + with BeforeAndAfter { + + private var sparkConf: SparkConf = _ + private var driver: SchedulerDriver = _ + private var taskScheduler: TaskSchedulerImpl = _ + private var backend: MesosCoarseGrainedSchedulerBackend = _ + private var externalShuffleClient: MesosExternalShuffleClient = _ + private var driverEndpoint: RpcEndpointRef = _ + @volatile private var stopCalled = false + + test("mesos supports killing and limiting executors") { + setBackend() + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = backend.executorMemory(sc) + val minCpu = 4 + val offers = List((minMem, minCpu)) + + // launches a task on a valid offer + offerResources(offers) + verifyTaskLaunched(driver, "o1") + + // kills executors + backend.doRequestTotalExecutors(0) + assert(backend.doKillExecutors(Seq("0"))) + val taskID0 = createTaskId("0") + verify(driver, times(1)).killTask(taskID0) + + // doesn't launch a new task when requested executors == 0 + offerResources(offers, 2) + verifyDeclinedOffer(driver, createOfferId("o2")) + + // Launches a new task when requested executors is positive + backend.doRequestTotalExecutors(2) + offerResources(offers, 2) + verifyTaskLaunched(driver, "o2") + } + + test("mesos supports killing and relaunching tasks with executors") { + setBackend() + + // launches a task on a valid offer + val minMem = backend.executorMemory(sc) + 1024 + val minCpu = 4 + val offer1 = (minMem, minCpu) + val offer2 = (minMem, 1) + offerResources(List(offer1, offer2)) + verifyTaskLaunched(driver, "o1") + + // accounts for a killed task + val status = createTaskStatus("0", "s1", TaskState.TASK_KILLED) + backend.statusUpdate(driver, status) + verify(driver, times(1)).reviveOffers() + + // Launches a new task on a valid offer from the same slave + offerResources(List(offer2)) + verifyTaskLaunched(driver, "o2") + } + + test("mesos supports spark.executor.cores") { + val executorCores = 4 + setBackend(Map("spark.executor.cores" -> executorCores.toString)) + + val executorMemory = backend.executorMemory(sc) + val offers = List((executorMemory * 2, executorCores + 1)) + offerResources(offers) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") + assert(cpus == executorCores) + } + + test("mesos supports unset spark.executor.cores") { + setBackend() + + val executorMemory = backend.executorMemory(sc) + val offerCores = 10 + offerResources(List((executorMemory * 2, offerCores))) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") + assert(cpus == offerCores) + } + + test("mesos does not acquire more than spark.cores.max") { + val maxCores = 10 + setBackend(Map("spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List((executorMemory, maxCores + 1))) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 1) + + val cpus = backend.getResource(taskInfos.head.getResourcesList, "cpus") + assert(cpus == maxCores) + } + + test("mesos declines offers that violate attribute constraints") { + setBackend(Map("spark.mesos.constraints" -> "x:true")) + offerResources(List((backend.executorMemory(sc), 4))) + verifyDeclinedOffer(driver, createOfferId("o1"), true) + } + + test("mesos declines offers with a filter when reached spark.cores.max") { + val maxCores = 3 + setBackend(Map("spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + (executorMemory, maxCores + 1), + (executorMemory, maxCores + 1))) + + verifyTaskLaunched(driver, "o1") + verifyDeclinedOffer(driver, createOfferId("o2"), true) + } + + test("mesos assigns tasks round-robin on offers") { + val executorCores = 4 + val maxCores = executorCores * 2 + setBackend(Map("spark.executor.cores" -> executorCores.toString, + "spark.cores.max" -> maxCores.toString)) + + val executorMemory = backend.executorMemory(sc) + offerResources(List( + (executorMemory * 2, executorCores * 2), + (executorMemory * 2, executorCores * 2))) + + verifyTaskLaunched(driver, "o1") + verifyTaskLaunched(driver, "o2") + } + + test("mesos creates multiple executors on a single slave") { + val executorCores = 4 + setBackend(Map("spark.executor.cores" -> executorCores.toString)) + + // offer with room for two executors + val executorMemory = backend.executorMemory(sc) + offerResources(List((executorMemory * 2, executorCores * 2))) + + // verify two executors were started on a single offer + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.length == 2) + } + + test("mesos doesn't register twice with the same shuffle service") { + setBackend(Map("spark.shuffle.service.enabled" -> "true")) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched(driver, "o1") + + val offer2 = createOffer("o2", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer2).asJava) + verifyTaskLaunched(driver, "o2") + + val status1 = createTaskStatus("0", "s1", TaskState.TASK_RUNNING) + backend.statusUpdate(driver, status1) + + val status2 = createTaskStatus("1", "s1", TaskState.TASK_RUNNING) + backend.statusUpdate(driver, status2) + verify(externalShuffleClient, times(1)) + .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) + } + + test("Port offer decline when there is no appropriate range") { + setBackend(Map("spark.blockManager.port" -> "30100")) + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verify(driver, times(1)).declineOffer(offer1.getId) + } + + test("Port offer accepted when ephemeral ports are used") { + setBackend() + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched(driver, "o1") + } + + test("Port offer accepted with user defined port numbers") { + val port = 30100 + setBackend(Map("spark.blockManager.port" -> s"$port")) + val offeredPorts = (30000L, 31000L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + val taskInfo = verifyTaskLaunched(driver, "o1") + + val taskPortResources = taskInfo.head.getResourcesList.asScala. + find(r => r.getType == Value.Type.RANGES && r.getName == "ports") + + val isPortInOffer = (r: Resource) => { + r.getRanges().getRangeList + .asScala.exists(range => range.getBegin == port && range.getEnd == port) + } + assert(taskPortResources.exists(isPortInOffer)) + } + + test("mesos kills an executor when told") { + setBackend() + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched(driver, "o1") + + backend.doKillExecutors(List("0")) + verify(driver, times(1)).killTask(createTaskId("0")) + } + + test("weburi is set in created scheduler driver") { + setBackend() + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(webuiUrl.isDefined) + assert(webuiUrl.get.equals("http://webui")) + driver + } + } + + backend.start() + } + + test("honors unset spark.mesos.containerizer") { + setBackend(Map("spark.mesos.executor.docker.image" -> "test")) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.DOCKER) + } + + test("honors spark.mesos.containerizer=\"mesos\"") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "test", + "spark.mesos.containerizer" -> "mesos")) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val taskInfos = verifyTaskLaunched(driver, "o1") + assert(taskInfos.head.getContainer.getType == ContainerInfo.Type.MESOS) + } + + test("docker settings are reflected in created tasks") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image", + "spark.mesos.executor.docker.forcePullImage" -> "true", + "spark.mesos.executor.docker.volumes" -> "/host_vol:/container_vol:ro", + "spark.mesos.executor.docker.portmaps" -> "8080:80:tcp" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val volumes = containerInfo.getVolumesList.asScala + assert(volumes.size == 1) + + val volume = volumes.head + assert(volume.getHostPath == "/host_vol") + assert(volume.getContainerPath == "/container_vol") + assert(volume.getMode == Volume.Mode.RO) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(dockerInfo.getForcePullImage) + + val portMappings = dockerInfo.getPortMappingsList.asScala + assert(portMappings.size == 1) + + val portMapping = portMappings.head + assert(portMapping.getHostPort == 8080) + assert(portMapping.getContainerPort == 80) + assert(portMapping.getProtocol == "tcp") + } + + test("force-pull-image option is disabled by default") { + setBackend(Map( + "spark.mesos.executor.docker.image" -> "some_image" + )) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(launchedTasks.size == 1) + + val containerInfo = launchedTasks.head.getContainer + assert(containerInfo.getType == ContainerInfo.Type.DOCKER) + + val dockerInfo = containerInfo.getDocker + + assert(dockerInfo.getImage == "some_image") + assert(!dockerInfo.getForcePullImage) + } + + test("Do not call removeExecutor() after backend is stopped") { + setBackend() + + // launches a task on a valid offer + val offers = List((backend.executorMemory(sc), 1)) + offerResources(offers) + verifyTaskLaunched(driver, "o1") + + // launches a thread simulating status update + val statusUpdateThread = new Thread { + override def run(): Unit = { + while (!stopCalled) { + Thread.sleep(100) + } + + val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED) + backend.statusUpdate(driver, status) + } + }.start + + backend.stop() + // Any method of the backend involving sending messages to the driver endpoint should not + // be called after the backend is stopped. + verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]]) + } + + test("mesos supports spark.executor.uri") { + val url = "spark.spark.spark.com" + setBackend(Map( + "spark.executor.uri" -> url + ), false) + + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu) + backend.resourceOffers(driver, List(offer1).asJava) + + val launchedTasks = verifyTaskLaunched(driver, "o1") + assert(launchedTasks.head.getCommand.getUrisList.asScala(0).getValue == url) + } + + private def verifyDeclinedOffer(driver: SchedulerDriver, + offerId: OfferID, + filter: Boolean = false): Unit = { + if (filter) { + verify(driver, times(1)).declineOffer(Matchers.eq(offerId), anyObject[Filters]) + } else { + verify(driver, times(1)).declineOffer(Matchers.eq(offerId)) + } + } + + private def offerResources(offers: List[(Int, Int)], startId: Int = 1): Unit = { + val mesosOffers = offers.zipWithIndex.map {case (offer, i) => + createOffer(s"o${i + startId}", s"s${i + startId}", offer._1, offer._2)} + + backend.resourceOffers(driver, mesosOffers.asJava) + } + + private def createTaskStatus(taskId: String, slaveId: String, state: TaskState): TaskStatus = { + TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(taskId).build()) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) + .setState(state) + .build + } + + private def createSchedulerBackend( + taskScheduler: TaskSchedulerImpl, + driver: SchedulerDriver, + shuffleClient: MesosExternalShuffleClient, + endpoint: RpcEndpointRef): MesosCoarseGrainedSchedulerBackend = { + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = driver + + override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient + + override protected def createDriverEndpointRef( + properties: ArrayBuffer[(String, String)]): RpcEndpointRef = endpoint + + // override to avoid race condition with the driver thread on `mesosDriver` + override def startScheduler(newDriver: SchedulerDriver): Unit = { + mesosDriver = newDriver + } + + override def stopExecutors(): Unit = { + stopCalled = true + } + + markRegistered() + } + backend.start() + backend + } + + private def setBackend(sparkConfVars: Map[String, String] = null, + setHome: Boolean = true) { + sparkConf = (new SparkConf) + .setMaster("local[*]") + .setAppName("test-mesos-dynamic-alloc") + .set("spark.mesos.driver.webui.url", "http://webui") + + if (setHome) { + sparkConf.setSparkHome("/path") + } + + if (sparkConfVars != null) { + sparkConf.setAll(sparkConfVars) + } + + sc = new SparkContext(sparkConf) + + driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + externalShuffleClient = mock[MesosExternalShuffleClient] + driverEndpoint = mock[RpcEndpointRef] + + backend = createSchedulerBackend(taskScheduler, driver, externalShuffleClient, driverEndpoint) + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala new file mode 100644 index 0000000000..fcf39f6391 --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackendSuite.scala @@ -0,0 +1,385 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.nio.ByteBuffer +import java.util.Arrays +import java.util.Collection +import java.util.Collections + +import scala.collection.JavaConverters._ +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.executor.MesosExecutorBackend +import org.apache.spark.scheduler.{LiveListenerBus, SparkListenerExecutorAdded, + TaskDescription, TaskSchedulerImpl, WorkerOffer} +import org.apache.spark.scheduler.cluster.ExecutorInfo + +class MesosFineGrainedSchedulerBackendSuite + extends SparkFunSuite with LocalSparkContext with MockitoSugar { + + test("weburi is set in created scheduler driver") { + val conf = new SparkConf + conf.set("spark.mesos.driver.webui.url", "http://webui") + conf.set("spark.app.name", "name1") + + val sc = mock[SparkContext] + when(sc.conf).thenReturn(conf) + when(sc.sparkUser).thenReturn("sparkUser1") + when(sc.appName).thenReturn("appName1") + + val taskScheduler = mock[TaskSchedulerImpl] + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(webuiUrl.isDefined) + assert(webuiUrl.get.equals("http://webui")) + driver + } + } + + backend.start() + } + + test("Use configured mesosExecutor.cores for ExecutorInfo") { + val mesosExecutorCores = 3 + val conf = new SparkConf + conf.set("spark.mesos.mesosExecutor.cores", mesosExecutorCores.toString) + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + + when(sc.conf).thenReturn(conf) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.executorMemory).thenReturn(100) + when(sc.listenerBus).thenReturn(listenerBus) + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val resources = Arrays.asList( + mesosSchedulerBackend.createResource("cpus", 4), + mesosSchedulerBackend.createResource("mem", 1024)) + // uri is null. + val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + val executorResources = executorInfo.getResourcesList + val cpus = executorResources.asScala.find(_.getName.equals("cpus")).get.getScalar.getValue + + assert(cpus === mesosExecutorCores) + } + + test("check spark-class location correctly") { + val conf = new SparkConf + conf.set("spark.mesos.executor.home", "/mesos-home") + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + + when(sc.conf).thenReturn(conf) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.executorMemory).thenReturn(100) + when(sc.listenerBus).thenReturn(listenerBus) + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val mesosSchedulerBackend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val resources = Arrays.asList( + mesosSchedulerBackend.createResource("cpus", 4), + mesosSchedulerBackend.createResource("mem", 1024)) + // uri is null. + val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + assert(executorInfo.getCommand.getValue === + s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") + + // uri exists. + conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") + val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + assert(executorInfo1.getCommand.getValue === + s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") + } + + test("spark docker properties correctly populate the DockerInfo message") { + val taskScheduler = mock[TaskSchedulerImpl] + + val conf = new SparkConf() + .set("spark.mesos.executor.docker.image", "spark/mock") + .set("spark.mesos.executor.docker.forcePullImage", "true") + .set("spark.mesos.executor.docker.volumes", "/a,/b:/b,/c:/c:rw,/d:ro,/e:/e:ro") + .set("spark.mesos.executor.docker.portmaps", "80:8080,53:53:tcp") + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/spark-home")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(conf) + when(sc.listenerBus).thenReturn(listenerBus) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val (execInfo, _) = backend.createExecutorInfo( + Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") + assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) + assert(execInfo.getContainer.getDocker.getForcePullImage.equals(true)) + val portmaps = execInfo.getContainer.getDocker.getPortMappingsList + assert(portmaps.get(0).getHostPort.equals(80)) + assert(portmaps.get(0).getContainerPort.equals(8080)) + assert(portmaps.get(0).getProtocol.equals("tcp")) + assert(portmaps.get(1).getHostPort.equals(53)) + assert(portmaps.get(1).getContainerPort.equals(53)) + assert(portmaps.get(1).getProtocol.equals("tcp")) + val volumes = execInfo.getContainer.getVolumesList + assert(volumes.get(0).getContainerPath.equals("/a")) + assert(volumes.get(0).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(1).getContainerPath.equals("/b")) + assert(volumes.get(1).getHostPath.equals("/b")) + assert(volumes.get(1).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(2).getContainerPath.equals("/c")) + assert(volumes.get(2).getHostPath.equals("/c")) + assert(volumes.get(2).getMode.equals(Volume.Mode.RW)) + assert(volumes.get(3).getContainerPath.equals("/d")) + assert(volumes.get(3).getMode.equals(Volume.Mode.RO)) + assert(volumes.get(4).getContainerPath.equals("/e")) + assert(volumes.get(4).getHostPath.equals("/e")) + assert(volumes.get(4).getMode.equals(Volume.Mode.RO)) + } + + test("mesos resource offers result in launching tasks") { + def createOffer(id: Int, mem: Int, cpu: Int): Offer = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + } + + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(new SparkConf) + when(sc.listenerBus).thenReturn(listenerBus) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val minMem = backend.executorMemory(sc) + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer(1, minMem, minCpu)) + mesosOffers.add(createOffer(2, minMem - 1, minCpu)) + mesosOffers.add(createOffer(3, minMem, minCpu)) + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](2) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + (minCpu - backend.mesosExecutorCores).toInt + )) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(2).getSlaveId.getValue, + mesosOffers.get(2).getHostname, + (minCpu - backend.mesosExecutorCores).toInt + )) + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(mesosOffers.get(1).getId)).thenReturn(Status.valueOf(1)) + when(driver.declineOffer(mesosOffers.get(2).getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) + verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) + assert(capture.getValue.size() === 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + val cpus = taskInfo.getResourcesList.get(0) + assert(cpus.getName.equals("cpus")) + assert(cpus.getScalar.getValue.equals(2.0)) + assert(taskInfo.getSlaveId.getValue.equals("s1")) + + // Unwanted resources offered on an existing node. Make sure they are declined + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer(1, minMem, minCpu)) + reset(taskScheduler) + reset(driver) + when(taskScheduler.resourceOffers(any(classOf[Seq[WorkerOffer]]))).thenReturn(Seq(Seq())) + when(taskScheduler.CPUS_PER_TASK).thenReturn(2) + when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers2) + verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) + } + + test("can handle multiple roles") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(new SparkConf) + when(sc.listenerBus).thenReturn(listenerBus) + + val id = 1 + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole("prod") + .setScalar(Scalar.newBuilder().setValue(500)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("prod") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(1)) + builder.addResourcesBuilder() + .setName("mem") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(600)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(2)) + val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(offer) + + val backend = new MesosFineGrainedSchedulerBackend(taskScheduler, sc, "master") + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 // Deducting 1 for executor + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + + assert(capture.getValue.size() === 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + assert(taskInfo.getResourcesCount === 1) + val cpusDev = taskInfo.getResourcesList.get(0) + assert(cpusDev.getName.equals("cpus")) + assert(cpusDev.getScalar.getValue.equals(1.0)) + assert(cpusDev.getRole.equals("dev")) + val executorResources = taskInfo.getExecutor.getResourcesList.asScala + assert(executorResources.exists { r => + r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod") + }) + assert(executorResources.exists { r => + r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") + }) + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala new file mode 100644 index 0000000000..e3d794931a --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import scala.collection.JavaConverters._ +import scala.language.reflectiveCalls + +import org.apache.mesos.Protos.{Resource, Value} +import org.mockito.Mockito._ +import org.scalatest._ +import org.scalatest.mock.MockitoSugar + +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} + +class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoSugar { + + // scalastyle:off structural.type + // this is the documented way of generating fixtures in scalatest + def fixture: Object {val sc: SparkContext; val sparkConf: SparkConf} = new { + val sparkConf = new SparkConf + val sc = mock[SparkContext] + when(sc.conf).thenReturn(sparkConf) + } + + private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = { + val rangeValue = Value.Range.newBuilder() + rangeValue.setBegin(range._1) + rangeValue.setEnd(range._2) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + + role.foreach { r => builder.setRole(r) } + builder.build() + } + + private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{resource => resource.getRanges.getRangeList + .asScala.map(range => (range.getBegin, range.getEnd))} + } + + def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def arePortsEqual(array1: Array[Long], array2: Array[Long]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{ resource => + resource.getRanges.getRangeList.asScala.toList.map{ + range => (range.getBegin, range.getEnd)}} + } + + val utils = new MesosSchedulerUtils { } + // scalastyle:on structural.type + + test("use at-least minimum overhead") { + val f = fixture + when(f.sc.executorMemory).thenReturn(512) + utils.executorMemory(f.sc) shouldBe 896 + } + + test("use overhead if it is greater than minimum value") { + val f = fixture + when(f.sc.executorMemory).thenReturn(4096) + utils.executorMemory(f.sc) shouldBe 4505 + } + + test("use spark.mesos.executor.memoryOverhead (if set)") { + val f = fixture + when(f.sc.executorMemory).thenReturn(1024) + f.sparkConf.set("spark.mesos.executor.memoryOverhead", "512") + utils.executorMemory(f.sc) shouldBe 1536 + } + + test("parse a non-empty constraint string correctly") { + val expectedMap = Map( + "os" -> Set("centos7"), + "zone" -> Set("us-east-1a", "us-east-1b") + ) + utils.parseConstraintString("os:centos7;zone:us-east-1a,us-east-1b") should be (expectedMap) + } + + test("parse an empty constraint string correctly") { + utils.parseConstraintString("") shouldBe Map() + } + + test("throw an exception when the input is malformed") { + an[IllegalArgumentException] should be thrownBy + utils.parseConstraintString("os;zone:us-east") + } + + test("empty values for attributes' constraints matches all values") { + val constraintsStr = "os:" + val parsedConstraints = utils.parseConstraintString(constraintsStr) + + parsedConstraints shouldBe Map("os" -> Set()) + + val zoneSet = Value.Set.newBuilder().addItem("us-east-1a").addItem("us-east-1b").build() + val noOsOffer = Map("zone" -> zoneSet) + val centosOffer = Map("os" -> Value.Text.newBuilder().setValue("centos").build()) + val ubuntuOffer = Map("os" -> Value.Text.newBuilder().setValue("ubuntu").build()) + + utils.matchesAttributeRequirements(parsedConstraints, noOsOffer) shouldBe false + utils.matchesAttributeRequirements(parsedConstraints, centosOffer) shouldBe true + utils.matchesAttributeRequirements(parsedConstraints, ubuntuOffer) shouldBe true + } + + test("subset match is performed for set attributes") { + val supersetConstraint = Map( + "os" -> Value.Text.newBuilder().setValue("ubuntu").build(), + "zone" -> Value.Set.newBuilder() + .addItem("us-east-1a") + .addItem("us-east-1b") + .addItem("us-east-1c") + .build()) + + val zoneConstraintStr = "os:;zone:us-east-1a,us-east-1c" + val parsedConstraints = utils.parseConstraintString(zoneConstraintStr) + + utils.matchesAttributeRequirements(parsedConstraints, supersetConstraint) shouldBe true + } + + test("less than equal match is performed on scalar attributes") { + val offerAttribs = Map("gpus" -> Value.Scalar.newBuilder().setValue(3).build()) + + val ltConstraint = utils.parseConstraintString("gpus:2") + val eqConstraint = utils.parseConstraintString("gpus:3") + val gtConstraint = utils.parseConstraintString("gpus:4") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + } + + test("contains match is performed for range attributes") { + val offerAttribs = Map("ports" -> Value.Range.newBuilder().setBegin(7000).setEnd(8000).build()) + val ltConstraint = utils.parseConstraintString("ports:6000") + val eqConstraint = utils.parseConstraintString("ports:7500") + val gtConstraint = utils.parseConstraintString("ports:8002") + val multiConstraint = utils.parseConstraintString("ports:5000,7500,8300") + + utils.matchesAttributeRequirements(ltConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(eqConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(gtConstraint, offerAttribs) shouldBe false + utils.matchesAttributeRequirements(multiConstraint, offerAttribs) shouldBe true + } + + test("equality match is performed for text attributes") { + val offerAttribs = Map("os" -> Value.Text.newBuilder().setValue("centos7").build()) + + val trueConstraint = utils.parseConstraintString("os:centos7") + val falseConstraint = utils.parseConstraintString("os:ubuntu") + + utils.matchesAttributeRequirements(trueConstraint, offerAttribs) shouldBe true + utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false + } + + test("Port reservation is done correctly with user specified ports only") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3000" ) + conf.set("spark.blockManager.port", "4000") + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3000, 4000), List(portResource)) + resourcesToBeUsed.length shouldBe 2 + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray + + portsToUse.length shouldBe 2 + arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + + arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true + } + + test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3100" ) + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3100), List(portResource)) + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 1 + portsToUse.contains(3100) shouldBe true + } + + test("Port reservation is done correctly with all random ports") { + val conf = new SparkConf() + val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), List(portResource)) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.isEmpty shouldBe true + } + + test("Port reservation is done correctly with user specified ports only - multiple ranges") { + val conf = new SparkConf() + conf.set("spark.executor.port", "2100" ) + conf.set("spark.blockManager.port", "4000") + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(2100, 4000), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 2 + val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + + arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true + } + + test("Port reservation is done correctly with all random ports - multiple ranges") { + val conf = new SparkConf() + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + portsToUse.isEmpty shouldBe true + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala new file mode 100644 index 0000000000..5a81bb335f --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosTaskLaunchDataSuite.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.nio.ByteBuffer + +import org.apache.spark.SparkFunSuite + +class MesosTaskLaunchDataSuite extends SparkFunSuite { + test("serialize and deserialize data must be same") { + val serializedTask = ByteBuffer.allocate(40) + (Range(100, 110).map(serializedTask.putInt(_))) + serializedTask.rewind + val attemptNumber = 100 + val byteString = MesosTaskLaunchData(serializedTask, attemptNumber).toByteString + serializedTask.rewind + val mesosTaskLaunchData = MesosTaskLaunchData.fromByteString(byteString) + assert(mesosTaskLaunchData.attemptNumber == attemptNumber) + assert(mesosTaskLaunchData.serializedTask.equals(serializedTask)) + } +} diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala new file mode 100644 index 0000000000..fa9406f5f0 --- /dev/null +++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster.mesos + +import java.util.Collections + +import scala.collection.JavaConverters._ + +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} +import org.apache.mesos.SchedulerDriver +import org.mockito.{ArgumentCaptor, Matchers} +import org.mockito.Mockito._ + +object Utils { + def createOffer( + offerId: String, + slaveId: String, + mem: Int, + cpu: Int, + ports: Option[(Long, Long)] = None): Offer = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + ports.foreach { resourcePorts => + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) + } + builder.setId(createOfferId(offerId)) + .setFrameworkId(FrameworkID.newBuilder() + .setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)) + .setHostname(s"host${slaveId}") + .build() + } + + def verifyTaskLaunched(driver: SchedulerDriver, offerId: String): List[TaskInfo] = { + val captor = ArgumentCaptor.forClass(classOf[java.util.Collection[TaskInfo]]) + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(createOfferId(offerId))), + captor.capture()) + captor.getValue.asScala.toList + } + + def createOfferId(offerId: String): OfferID = { + OfferID.newBuilder().setValue(offerId).build() + } + + def createSlaveId(slaveId: String): SlaveID = { + SlaveID.newBuilder().setValue(slaveId).build() + } + + def createExecutorId(executorId: String): ExecutorID = { + ExecutorID.newBuilder().setValue(executorId).build() + } + + def createTaskId(taskId: String): TaskID = { + TaskID.newBuilder().setValue(taskId).build() + } +} + |