From 1a028bdefa6312bf0eec46b89a1947da7e9d84af Mon Sep 17 00:00:00 2001 From: Stavros Kontopoulos Date: Mon, 15 Aug 2016 09:55:32 +0100 Subject: [SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode - Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros Kontopoulos Author: Stavros Kontopoulos Closes #11157 from skonto/honour_ports_coarse. --- .../MesosCoarseGrainedSchedulerBackendSuite.scala | 42 +++++++- .../cluster/mesos/MesosSchedulerUtilsSuite.scala | 114 ++++++++++++++++++++- .../spark/scheduler/cluster/mesos/Utils.scala | 20 +++- 3 files changed, 170 insertions(+), 6 deletions(-) (limited to 'core/src/test/scala') diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 26a3ad49d0..c06379707a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -17,8 +17,6 @@ package org.apache.spark.scheduler.cluster.mesos -import java.util.Collections - import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -212,6 +210,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite .registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong) } + test("Port offer decline when there is no appropriate range") { + setBackend(Map("spark.blockManager.port" -> "30100")) + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verify(driver, times(1)).declineOffer(offer1.getId) + } + + test("Port offer accepted when ephemeral ports are used") { + setBackend() + val offeredPorts = (31100L, 31200L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + verifyTaskLaunched(driver, "o1") + } + + test("Port offer accepted with user defined port numbers") { + val port = 30100 + setBackend(Map("spark.blockManager.port" -> s"$port")) + val offeredPorts = (30000L, 31000L) + val (mem, cpu) = (backend.executorMemory(sc), 4) + + val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts)) + backend.resourceOffers(driver, List(offer1).asJava) + val taskInfo = verifyTaskLaunched(driver, "o1") + + val taskPortResources = taskInfo.head.getResourcesList.asScala. + find(r => r.getType == Value.Type.RANGES && r.getName == "ports") + + val isPortInOffer = (r: Resource) => { + r.getRanges().getRangeList + .asScala.exists(range => range.getBegin == port && range.getEnd == port) + } + assert(taskPortResources.exists(isPortInOffer)) + } + test("mesos kills an executor when told") { setBackend() diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala index ceb3a52983..e3d794931a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala @@ -17,9 +17,10 @@ package org.apache.spark.scheduler.cluster.mesos +import scala.collection.JavaConverters._ import scala.language.reflectiveCalls -import org.apache.mesos.Protos.Value +import org.apache.mesos.Protos.{Resource, Value} import org.mockito.Mockito._ import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -35,6 +36,41 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS val sc = mock[SparkContext] when(sc.conf).thenReturn(sparkConf) } + + private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = { + val rangeValue = Value.Range.newBuilder() + rangeValue.setBegin(range._1) + rangeValue.setEnd(range._2) + val builder = Resource.newBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Value.Ranges.newBuilder().addRange(rangeValue)) + + role.foreach { r => builder.setRole(r) } + builder.build() + } + + private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{resource => resource.getRanges.getRangeList + .asScala.map(range => (range.getBegin, range.getEnd))} + } + + def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def arePortsEqual(array1: Array[Long], array2: Array[Long]) + : Boolean = { + array1.sortBy(identity).deep == array2.sortBy(identity).deep + } + + def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = { + resources.flatMap{ resource => + resource.getRanges.getRangeList.asScala.toList.map{ + range => (range.getBegin, range.getEnd)}} + } + val utils = new MesosSchedulerUtils { } // scalastyle:on structural.type @@ -140,4 +176,80 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false } + test("Port reservation is done correctly with user specified ports only") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3000" ) + conf.set("spark.blockManager.port", "4000") + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3000, 4000), List(portResource)) + resourcesToBeUsed.length shouldBe 2 + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray + + portsToUse.length shouldBe 2 + arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true + + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUSed = Array((3000L, 3000L), (4000L, 4000L)) + + arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true + } + + test("Port reservation is done correctly with some user specified ports (spark.executor.port)") { + val conf = new SparkConf() + conf.set("spark.executor.port", "3100" ) + val portResource = createTestPortResource((3000, 5000), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(3100), List(portResource)) + + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 1 + portsToUse.contains(3100) shouldBe true + } + + test("Port reservation is done correctly with all random ports") { + val conf = new SparkConf() + val portResource = createTestPortResource((3000L, 5000L), Some("my_role")) + + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), List(portResource)) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.isEmpty shouldBe true + } + + test("Port reservation is done correctly with user specified ports only - multiple ranges") { + val conf = new SparkConf() + conf.set("spark.executor.port", "2100" ) + conf.set("spark.blockManager.port", "4000") + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(2100, 4000), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + + portsToUse.length shouldBe 2 + val portsRangesLeft = rangesResourcesToTuple(resourcesLeft) + val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed) + + val expectedUsed = Array((2100L, 2100L), (4000L, 4000L)) + + arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true + arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true + } + + test("Port reservation is done correctly with all random ports - multiple ranges") { + val conf = new SparkConf() + val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")), + createTestPortResource((2000, 2500), Some("other_role"))) + val (resourcesLeft, resourcesToBeUsed) = utils + .partitionPortResources(List(), portResourceList) + val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1} + portsToUse.isEmpty shouldBe true + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala index ff26d14ef5..fa9406f5f0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala @@ -19,15 +19,21 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.Collections +import scala.collection.JavaConverters._ + import org.apache.mesos.Protos._ -import org.apache.mesos.Protos.Value.Scalar +import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} import org.mockito.Mockito._ -import scala.collection.JavaConverters._ object Utils { - def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = { + def createOffer( + offerId: String, + slaveId: String, + mem: Int, + cpu: Int, + ports: Option[(Long, Long)] = None): Offer = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -37,6 +43,13 @@ object Utils { .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) + ports.foreach { resourcePorts => + builder.addResourcesBuilder() + .setName("ports") + .setType(Value.Type.RANGES) + .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder() + .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build())) + } builder.setId(createOfferId(offerId)) .setFrameworkId(FrameworkID.newBuilder() .setValue("f1")) @@ -69,3 +82,4 @@ object Utils { TaskID.newBuilder().setValue(taskId).build() } } + -- cgit v1.2.3