From 1a028bdefa6312bf0eec46b89a1947da7e9d84af Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 15 Aug 2016 09:55:32 +0100 Subject: [SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode - Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros Kontopoulos Author: Stavros Kontopoulos Closes #11157 from skonto/honour_ports_coarse. --- .../src/main/scala/org/apache/spark/SparkEnv.scala | 1 + .../mesos/MesosCoarseGrainedSchedulerBackend.scala | 59 +++++++--- .../cluster/mesos/MesosSchedulerUtils.scala | 125 ++++++++++++++++++++- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 42 ++++++- .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 114 ++++++++++++++++++- .../spark/scheduler/cluster/mesos/Utils.scala | 20 +++- 6 files changed, 336 insertions(+), 25 deletions(-) (limited to 'core') diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index af50a6dc2d..cc8e3fdc97 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -231,6 +231,7 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", rpcEnv.address.port.toString) } else if (rpcEnv.address != null) { conf.set("spark.executor.port", rpcEnv.address.port.toString) + logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}") } // Create an instance of the class with the given name, possibly initializing it with our conf diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 4a88824854..6b9313e5ed 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable -import scala.collection.mutable.{Buffer, HashMap, HashSet} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -71,13 +70,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) // Cores we have acquired with each Mesos task ID - val coresByTaskId = new HashMap[String, Int] + val coresByTaskId = new mutable.HashMap[String, Int] var totalCoresAcquired = 0 // SlaveID -> Slave // This map accumulates entries for the duration of the job. Slaves are never deleted, because // we need to maintain e.g. failure state and connection state. - private val slaves = new HashMap[String, Slave] + private val slaves = new mutable.HashMap[String, Slave] /** * The total number of executors we aim to have. Undefined when not using dynamic allocation. @@ -285,7 +284,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private def declineUnmatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = { + d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { offers.foreach { offer => declineOffer(d, offer, Some("unmet constraints"), Some(rejectOfferDurationForUnmetConstraints)) @@ -302,9 +301,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( val offerAttributes = toAttributeMap(offer.getAttributesList) val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus") + val ports = getRangeResource(offer.getResourcesList, "ports") logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" + - s" cpu: $cpus for $refuseSeconds seconds" + + s" cpu: $cpus port: $ports for $refuseSeconds seconds" + reason.map(r => s" (reason: $r)").getOrElse("")) refuseSeconds match { @@ -323,26 +323,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( * @param offers Mesos offers that match attribute constraints */ private def handleMatchedOffers( - d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = { + d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = { val tasks = buildMesosTasks(offers) for (offer <- offers) { val offerAttributes = toAttributeMap(offer.getAttributesList) val offerMem = getResource(offer.getResourcesList, "mem") val offerCpus = getResource(offer.getResourcesList, "cpus") + val offerPorts = getRangeResource(offer.getResourcesList, "ports") val id = offer.getId.getValue if (tasks.contains(offer.getId)) { // accept val offerTasks = tasks(offer.getId) logDebug(s"Accepting offer: $id with attributes: $offerAttributes " + - s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.") + s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." + + s" Launching ${offerTasks.size} Mesos tasks.") for (task <- offerTasks) { val taskId = task.getTaskId val mem = getResource(task.getResourcesList, "mem") val cpus = getResource(task.getResourcesList, "cpus") + val ports = getRangeResource(task.getResourcesList, "ports").mkString(",") - logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.") + logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" + + s" ports: $ports") } d.launchTasks( @@ -365,9 +369,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( * @param offers Mesos offers that match attribute constraints * @return A map from OfferID to a list of Mesos tasks to launch on that offer */ - private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { + private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = { // offerID -> tasks - val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) + val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil) // offerID -> resources val remainingResources = mutable.Map(offers.map(offer => @@ -397,18 +401,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId) - val (afterCPUResources, cpuResourcesToUse) = - partitionResources(resources, "cpus", taskCPUs) - val (resourcesLeft, memResourcesToUse) = - partitionResources(afterCPUResources.asJava, "mem", taskMemory) + val (resourcesLeft, resourcesToUse) = + partitionTaskResources(resources, taskCPUs, taskMemory) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId)) .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse.asJava) - .addAllResources(memResourcesToUse.asJava) + + taskBuilder.addAllResources(resourcesToUse.asJava) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo( @@ -428,18 +430,39 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( tasks.toMap } + /** Extracts task needed resources from a list of available resources. */ + private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int) + : (List[Resource], List[Resource]) = { + + // partition cpus & mem + val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs) + val (afterMemResources, memResourcesToUse) = + partitionResources(afterCPUResources.asJava, "mem", taskMemory) + + // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched + // on the same host. This essentially means one executor per host. + // TODO: handle network isolator case + val (nonPortResources, portResourcesToUse) = + partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources) + + (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse) + } + private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = { val offerMem = getResource(resources, "mem") val offerCPUs = getResource(resources, "cpus").toInt val cpus = executorCores(offerCPUs) val mem = executorMemory(sc) + val ports = getRangeResource(resources, "ports") + val meetsPortRequirements = checkPorts(sc.conf, ports) cpus > 0 && cpus <= offerCPUs && cpus + totalCoresAcquired <= maxCores && mem <= offerMem && numExecutors() < executorLimit && - slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES + slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES && + meetsPortRequirements } private def executorCores(offerCPUs: Int): Int = { @@ -613,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( } private class Slave(val hostname: String) { - val taskIDs = new HashSet[String]() + val taskIDs = new mutable.HashSet[String]() var taskFailures = 0 var shuffleRegistered = false } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 81db789166..1bbede1853 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -47,6 +47,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. + * * @param masterUrl The url to connect to Mesos master * @param scheduler the scheduler class to receive scheduler callbacks * @param sparkUser User to impersonate with when running tasks @@ -147,6 +148,20 @@ private[mesos] trait MesosSchedulerUtils extends Logging { res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } + /** + * Transforms a range resource to a list of ranges + * + * @param res the mesos resource list + * @param name the name of the resource + * @return the list of ranges returned + */ + protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = { + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala + .map(r => (r.getBegin, r.getEnd)).toList).toList + } + /** * Signal that the scheduler has registered with Mesos. */ @@ -172,6 +187,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. + * * @param resources The full list of available resources * @param resourceName The name of the resource to take from the available resources * @param amountToUse The amount of resources to take from the available resources @@ -223,7 +239,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Converts the attributes from the resource offer into a Map of name -> Attribute Value * The attribute values are the mesos attribute types and they are - * @param offerAttributes + * + * @param offerAttributes the attributes offered * @return */ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { @@ -333,6 +350,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Return the amount of memory to allocate to each executor, taking into account * container overheads. + * * @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value * @return memory requirement as (0.1 * ) or MEMORY_OVERHEAD_MINIMUM * (whichever is larger) @@ -357,6 +375,111 @@ private[mesos] trait MesosSchedulerUtils extends Logging { sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") } + /** + * Checks executor ports if they are within some range of the offered list of ports ranges, + * + * @param conf the Spark Config + * @param ports the list of ports to check + * @return true if ports are within range false otherwise + */ + protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = { + + def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = { + ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port } + } + + val portsToCheck = nonZeroPortValuesFromConfig(conf) + val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports)) + // make sure we have enough ports to allocate per offer + val enoughPorts = + ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size + enoughPorts && withinRange + } + + /** + * Partitions port resources. + * + * @param requestedPorts non-zero ports to assign + * @param offeredResources the resources offered + * @return resources left, port resources to be used. + */ + def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource]) + : (List[Resource], List[Resource]) = { + if (requestedPorts.isEmpty) { + (offeredResources, List[Resource]()) + } else { + // partition port offers + val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources) + + val portsAndRoles = requestedPorts. + map(x => (x, findPortAndGetAssignedRangeRole(x, portResources))) + + val assignedPortResources = createResourcesFromPorts(portsAndRoles) + + // ignore non-assigned port resources, they will be declined implicitly by mesos + // no need for splitting port resources. + (resourcesWithoutPorts, assignedPortResources) + } + } + + val managedPortNames = List("spark.executor.port", "spark.blockManager.port") + + /** + * The values of the non-zero ports to be used by the executor process. + * @param conf the spark config to use + * @return the ono-zero values of the ports + */ + def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = { + managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0) + } + + /** Creates a mesos resource for a specific port number. */ + private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = { + portsAndRoles.flatMap{ case (port, role) => + createMesosPortResource(List((port, port)), Some(role))} + } + + /** Helper to create mesos resources for specific port ranges. */ + private def createMesosPortResource( + ranges: List[(Long, Long)], + role: Option[String] = None): List[Resource] = { + ranges.map { case (rangeStart, rangeEnd) => + val rangeValue = Value.Range.newBuilder() + .setBegin(rangeStart) + .setEnd(rangeEnd) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + role.foreach(r => builder.setRole(r)) + builder.build() + } + } + + /** + * Helper to assign a port to an offered range and get the latter's role + * info to use it later on. + */ + private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource]) + : String = { + + val ranges = portResources. + map(resource => + (resource.getRole, resource.getRanges.getRangeList.asScala + .map(r => (r.getBegin, r.getEnd)).toList)) + + val rangePortRole = ranges + .find { case (role, rangeList) => rangeList + .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}} + // this is safe since we have previously checked about the ranges (see checkPorts method) + rangePortRole.map{ case (role, rangeList) => role}.get + } + + /** Retrieves the port resources from a list of mesos offered resources */ + private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = { + resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") } + } + /** * spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver * submissions with frameworkIDs. However, this causes issues when a driver process launches diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 26a3ad49d0..c06379707a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.Collections - import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -212,6 +210,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) } + test("Port offer decline when there is no appropriate range") { + setBackend(Map("spark.blockManager.port" -> "30100")) + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verify(driver, times(1)).declineOffer(offer1.getId) + } + + test("Port offer accepted when ephemeral ports are used") { + setBackend() + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched(driver, "o1") + } + + test("Port offer accepted with user defined port numbers") { + val port = 30100 + setBackend(Map("spark.blockManager.port" -> s"$port")) + val offeredPorts = (30000L, 31000L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + val taskInfo = verifyTaskLaunched(driver, "o1") + + val taskPortResources = taskInfo.head.getResourcesList.asScala. + find(r => r.getType == Value.Type.RANGES && r.getName == "ports") + + val isPortInOffer = (r: Resource) => { + r.getRanges().getRangeList + .asScala.exists(range => range.getBegin == port && range.getEnd == port) + } + assert(taskPortResources.exists(isPortInOffer)) + } + test("mesos kills an executor when told") { setBackend() diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index ceb3a52983..e3d794931a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.scheduler.cluster.mesos +import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import org.apache.mesos.Protos.Value +import org.apache.mesos.Protos.{Resource, Value} import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -35,6 +36,41 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) } + + private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = { + val rangeValue = Value.Range.newBuilder() + rangeValue.setBegin(range._1) + rangeValue.setEnd(range._2) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + + role.foreach { r => builder.setRole(r) } + builder.build() + } + + private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{resource => resource.getRanges.getRangeList + .asScala.map(range => (range.getBegin, range.getEnd))} + } + + def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def arePortsEqual(array1: Array[Long], array2: Array[Long]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{ resource => + resource.getRanges.getRangeList.asScala.toList.map{ + range => (range.getBegin, range.getEnd)}} + } + val utils = new MesosSchedulerUtils { } // scalastyle:on structural.type @@ -140,4 +176,80 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false } + test("Port reservation is done correctly with user specified ports only") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3000" ) + conf.set("spark.blockManager.port", "4000") + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3000, 4000), List(portResource)) + resourcesToBeUsed.length shouldBe 2 + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray + + portsToUse.length shouldBe 2 + arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + + arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true + } + + test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3100" ) + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3100), List(portResource)) + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 1 + portsToUse.contains(3100) shouldBe true + } + + test("Port reservation is done correctly with all random ports") { + val conf = new SparkConf() + val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), List(portResource)) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.isEmpty shouldBe true + } + + test("Port reservation is done correctly with user specified ports only - multiple ranges") { + val conf = new SparkConf() + conf.set("spark.executor.port", "2100" ) + conf.set("spark.blockManager.port", "4000") + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(2100, 4000), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 2 + val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + + arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true + } + + test("Port reservation is done correctly with all random ports - multiple ranges") { + val conf = new SparkConf() + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + portsToUse.isEmpty shouldBe true + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index ff26d14ef5..fa9406f5f0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -19,15 +19,21 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.Collections +import scala.collection.JavaConverters._ + import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Mockito._ -import scala.collection.JavaConverters._ object Utils { - def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { + def createOffer( + offerId: String, + slaveId: String, + mem: Int, + cpu: Int, + ports: Option[(Long, Long)] = None): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -37,6 +43,13 @@ object Utils { .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) + ports.foreach { resourcePorts => + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) + } builder.setId(createOfferId(offerId)) .setFrameworkId(FrameworkID.newBuilder() .setValue("f1")) @@ -69,3 +82,4 @@ object Utils { TaskID.newBuilder().setValue(taskId).build() } } + -- cgit v1.2.3