aboutsummaryrefslogtreecommitdiff
path: root/core/src/test/scala
diff options
context:
space:
mode:
authorStavros Kontopoulos <stavros.kontopoulos@lightbend.com>2016-08-15 09:55:32 +0100
committerSean Owen <sowen@cloudera.com>2016-08-15 09:55:32 +0100
commit1a028bdefa6312bf0eec46b89a1947da7e9d84af (patch)
treeb5232c29af1b3e3837a309e907d82bc90d4fdcce /core/src/test/scala
parent2a3d286f3421f6836b71afcbda3084222752e6b1 (diff)
downloadspark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.tar.gz
spark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.tar.bz2
spark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.zip
[SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode
- Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com> Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com> Closes #11157 from skonto/honour_ports_coarse.
Diffstat (limited to 'core/src/test/scala')
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala42
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala114
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala20
3 files changed, 170 insertions, 6 deletions
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 26a3ad49d0..c06379707a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -17,8 +17,6 @@
package org.apache.spark.scheduler.cluster.mesos
-import java.util.Collections
-
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
@@ -212,6 +210,46 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
.registerDriverWithShuffleService(anyString, anyInt, anyLong, anyLong)
}
+ test("Port offer decline when there is no appropriate range") {
+ setBackend(Map("spark.blockManager.port" -> "30100"))
+ val offeredPorts = (31100L, 31200L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verify(driver, times(1)).declineOffer(offer1.getId)
+ }
+
+ test("Port offer accepted when ephemeral ports are used") {
+ setBackend()
+ val offeredPorts = (31100L, 31200L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ verifyTaskLaunched(driver, "o1")
+ }
+
+ test("Port offer accepted with user defined port numbers") {
+ val port = 30100
+ setBackend(Map("spark.blockManager.port" -> s"$port"))
+ val offeredPorts = (30000L, 31000L)
+ val (mem, cpu) = (backend.executorMemory(sc), 4)
+
+ val offer1 = createOffer("o1", "s1", mem, cpu, Some(offeredPorts))
+ backend.resourceOffers(driver, List(offer1).asJava)
+ val taskInfo = verifyTaskLaunched(driver, "o1")
+
+ val taskPortResources = taskInfo.head.getResourcesList.asScala.
+ find(r => r.getType == Value.Type.RANGES && r.getName == "ports")
+
+ val isPortInOffer = (r: Resource) => {
+ r.getRanges().getRangeList
+ .asScala.exists(range => range.getBegin == port && range.getEnd == port)
+ }
+ assert(taskPortResources.exists(isPortInOffer))
+ }
+
test("mesos kills an executor when told") {
setBackend()
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
index ceb3a52983..e3d794931a 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtilsSuite.scala
@@ -17,9 +17,10 @@
package org.apache.spark.scheduler.cluster.mesos
+import scala.collection.JavaConverters._
import scala.language.reflectiveCalls
-import org.apache.mesos.Protos.Value
+import org.apache.mesos.Protos.{Resource, Value}
import org.mockito.Mockito._
import org.scalatest._
import org.scalatest.mock.MockitoSugar
@@ -35,6 +36,41 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
val sc = mock[SparkContext]
when(sc.conf).thenReturn(sparkConf)
}
+
+ private def createTestPortResource(range: (Long, Long), role: Option[String] = None): Resource = {
+ val rangeValue = Value.Range.newBuilder()
+ rangeValue.setBegin(range._1)
+ rangeValue.setEnd(range._2)
+ val builder = Resource.newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+
+ role.foreach { r => builder.setRole(r) }
+ builder.build()
+ }
+
+ private def rangesResourcesToTuple(resources: List[Resource]): List[(Long, Long)] = {
+ resources.flatMap{resource => resource.getRanges.getRangeList
+ .asScala.map(range => (range.getBegin, range.getEnd))}
+ }
+
+ def arePortsEqual(array1: Array[(Long, Long)], array2: Array[(Long, Long)])
+ : Boolean = {
+ array1.sortBy(identity).deep == array2.sortBy(identity).deep
+ }
+
+ def arePortsEqual(array1: Array[Long], array2: Array[Long])
+ : Boolean = {
+ array1.sortBy(identity).deep == array2.sortBy(identity).deep
+ }
+
+ def getRangesFromResources(resources: List[Resource]): List[(Long, Long)] = {
+ resources.flatMap{ resource =>
+ resource.getRanges.getRangeList.asScala.toList.map{
+ range => (range.getBegin, range.getEnd)}}
+ }
+
val utils = new MesosSchedulerUtils { }
// scalastyle:on structural.type
@@ -140,4 +176,80 @@ class MesosSchedulerUtilsSuite extends SparkFunSuite with Matchers with MockitoS
utils.matchesAttributeRequirements(falseConstraint, offerAttribs) shouldBe false
}
+ test("Port reservation is done correctly with user specified ports only") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "3000" )
+ conf.set("spark.blockManager.port", "4000")
+ val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(3000, 4000), List(portResource))
+ resourcesToBeUsed.length shouldBe 2
+
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}.toArray
+
+ portsToUse.length shouldBe 2
+ arePortsEqual(portsToUse, Array(3000L, 4000L)) shouldBe true
+
+ val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+ val expectedUSed = Array((3000L, 3000L), (4000L, 4000L))
+
+ arePortsEqual(portRangesToBeUsed.toArray, expectedUSed) shouldBe true
+ }
+
+ test("Port reservation is done correctly with some user specified ports (spark.executor.port)") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "3100" )
+ val portResource = createTestPortResource((3000, 5000), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(3100), List(portResource))
+
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.length shouldBe 1
+ portsToUse.contains(3100) shouldBe true
+ }
+
+ test("Port reservation is done correctly with all random ports") {
+ val conf = new SparkConf()
+ val portResource = createTestPortResource((3000L, 5000L), Some("my_role"))
+
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(), List(portResource))
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.isEmpty shouldBe true
+ }
+
+ test("Port reservation is done correctly with user specified ports only - multiple ranges") {
+ val conf = new SparkConf()
+ conf.set("spark.executor.port", "2100" )
+ conf.set("spark.blockManager.port", "4000")
+ val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+ createTestPortResource((2000, 2500), Some("other_role")))
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(2100, 4000), portResourceList)
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+
+ portsToUse.length shouldBe 2
+ val portsRangesLeft = rangesResourcesToTuple(resourcesLeft)
+ val portRangesToBeUsed = rangesResourcesToTuple(resourcesToBeUsed)
+
+ val expectedUsed = Array((2100L, 2100L), (4000L, 4000L))
+
+ arePortsEqual(portsToUse.toArray, Array(2100L, 4000L)) shouldBe true
+ arePortsEqual(portRangesToBeUsed.toArray, expectedUsed) shouldBe true
+ }
+
+ test("Port reservation is done correctly with all random ports - multiple ranges") {
+ val conf = new SparkConf()
+ val portResourceList = List(createTestPortResource((3000, 5000), Some("my_role")),
+ createTestPortResource((2000, 2500), Some("other_role")))
+ val (resourcesLeft, resourcesToBeUsed) = utils
+ .partitionPortResources(List(), portResourceList)
+ val portsToUse = getRangesFromResources(resourcesToBeUsed).map{r => r._1}
+ portsToUse.isEmpty shouldBe true
+ }
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index ff26d14ef5..fa9406f5f0 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -19,15 +19,21 @@ package org.apache.spark.scheduler.cluster.mesos
import java.util.Collections
+import scala.collection.JavaConverters._
+
import org.apache.mesos.Protos._
-import org.apache.mesos.Protos.Value.Scalar
+import org.apache.mesos.Protos.Value.{Range => MesosRange, Ranges, Scalar}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
-import scala.collection.JavaConverters._
object Utils {
- def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int): Offer = {
+ def createOffer(
+ offerId: String,
+ slaveId: String,
+ mem: Int,
+ cpu: Int,
+ ports: Option[(Long, Long)] = None): Offer = {
val builder = Offer.newBuilder()
builder.addResourcesBuilder()
.setName("mem")
@@ -37,6 +43,13 @@ object Utils {
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Scalar.newBuilder().setValue(cpu))
+ ports.foreach { resourcePorts =>
+ builder.addResourcesBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Ranges.newBuilder().addRange(MesosRange.newBuilder()
+ .setBegin(resourcePorts._1).setEnd(resourcePorts._2).build()))
+ }
builder.setId(createOfferId(offerId))
.setFrameworkId(FrameworkID.newBuilder()
.setValue("f1"))
@@ -69,3 +82,4 @@ object Utils {
TaskID.newBuilder().setValue(taskId).build()
}
}
+