aboutsummaryrefslogtreecommitdiff
path: root/core/src/main
diff options
context:
space:
mode:
authorStavros Kontopoulos <stavros.kontopoulos@lightbend.com>2016-08-15 09:55:32 +0100
committerSean Owen <sowen@cloudera.com>2016-08-15 09:55:32 +0100
commit1a028bdefa6312bf0eec46b89a1947da7e9d84af (patch)
treeb5232c29af1b3e3837a309e907d82bc90d4fdcce /core/src/main
parent2a3d286f3421f6836b71afcbda3084222752e6b1 (diff)
downloadspark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.tar.gz
spark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.tar.bz2
spark-1a028bdefa6312bf0eec46b89a1947da7e9d84af.zip
[SPARK-11714][MESOS] Make Spark on Mesos honor port restrictions on coarse grain mode
- Make mesos coarse grained scheduler accept port offers and pre-assign ports Previous attempt was for fine grained: https://github.com/apache/spark/pull/10808 Author: Stavros Kontopoulos <stavros.kontopoulos@lightbend.com> Author: Stavros Kontopoulos <stavros.kontopoulos@typesafe.com> Closes #11157 from skonto/honour_ports_coarse.
Diffstat (limited to 'core/src/main')
-rw-r--r--core/src/main/scala/org/apache/spark/SparkEnv.scala1
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala59
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala125
3 files changed, 166 insertions, 19 deletions
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index af50a6dc2d..cc8e3fdc97 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -231,6 +231,7 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", rpcEnv.address.port.toString)
} else if (rpcEnv.address != null) {
conf.set("spark.executor.port", rpcEnv.address.port.toString)
+ logInfo(s"Setting spark.executor.port to: ${rpcEnv.address.port.toString}")
}
// Create an instance of the class with the given name, possibly initializing it with our conf
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 4a88824854..6b9313e5ed 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
-import scala.collection.mutable.{Buffer, HashMap, HashSet}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -71,13 +70,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
// Cores we have acquired with each Mesos task ID
- val coresByTaskId = new HashMap[String, Int]
+ val coresByTaskId = new mutable.HashMap[String, Int]
var totalCoresAcquired = 0
// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
// we need to maintain e.g. failure state and connection state.
- private val slaves = new HashMap[String, Slave]
+ private val slaves = new mutable.HashMap[String, Slave]
/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation.
@@ -285,7 +284,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private def declineUnmatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
offers.foreach { offer =>
declineOffer(d, offer, Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
@@ -302,9 +301,10 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
val offerAttributes = toAttributeMap(offer.getAttributesList)
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus")
+ val ports = getRangeResource(offer.getResourcesList, "ports")
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
- s" cpu: $cpus for $refuseSeconds seconds" +
+ s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
reason.map(r => s" (reason: $r)").getOrElse(""))
refuseSeconds match {
@@ -323,26 +323,30 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
* @param offers Mesos offers that match attribute constraints
*/
private def handleMatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: Buffer[Offer]): Unit = {
+ d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
val offerMem = getResource(offer.getResourcesList, "mem")
val offerCpus = getResource(offer.getResourcesList, "cpus")
+ val offerPorts = getRangeResource(offer.getResourcesList, "ports")
val id = offer.getId.getValue
if (tasks.contains(offer.getId)) { // accept
val offerTasks = tasks(offer.getId)
logDebug(s"Accepting offer: $id with attributes: $offerAttributes " +
- s"mem: $offerMem cpu: $offerCpus. Launching ${offerTasks.size} Mesos tasks.")
+ s"mem: $offerMem cpu: $offerCpus ports: $offerPorts." +
+ s" Launching ${offerTasks.size} Mesos tasks.")
for (task <- offerTasks) {
val taskId = task.getTaskId
val mem = getResource(task.getResourcesList, "mem")
val cpus = getResource(task.getResourcesList, "cpus")
+ val ports = getRangeResource(task.getResourcesList, "ports").mkString(",")
- logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus.")
+ logDebug(s"Launching Mesos task: ${taskId.getValue} with mem: $mem cpu: $cpus" +
+ s" ports: $ports")
}
d.launchTasks(
@@ -365,9 +369,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
* @param offers Mesos offers that match attribute constraints
* @return A map from OfferID to a list of Mesos tasks to launch on that offer
*/
- private def buildMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
+ private def buildMesosTasks(offers: mutable.Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
// offerID -> tasks
- val tasks = new HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
+ val tasks = new mutable.HashMap[OfferID, List[MesosTaskInfo]].withDefaultValue(Nil)
// offerID -> resources
val remainingResources = mutable.Map(offers.map(offer =>
@@ -397,18 +401,16 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
slaves.getOrElseUpdate(slaveId, new Slave(offer.getHostname)).taskIDs.add(taskId)
- val (afterCPUResources, cpuResourcesToUse) =
- partitionResources(resources, "cpus", taskCPUs)
- val (resourcesLeft, memResourcesToUse) =
- partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+ val (resourcesLeft, resourcesToUse) =
+ partitionTaskResources(resources, taskCPUs, taskMemory)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, taskCPUs + extraCoresPerExecutor, taskId))
.setName("Task " + taskId)
- .addAllResources(cpuResourcesToUse.asJava)
- .addAllResources(memResourcesToUse.asJava)
+
+ taskBuilder.addAllResources(resourcesToUse.asJava)
sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil.setupContainerBuilderDockerInfo(
@@ -428,18 +430,39 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
tasks.toMap
}
+ /** Extracts task needed resources from a list of available resources. */
+ private def partitionTaskResources(resources: JList[Resource], taskCPUs: Int, taskMemory: Int)
+ : (List[Resource], List[Resource]) = {
+
+ // partition cpus & mem
+ val (afterCPUResources, cpuResourcesToUse) = partitionResources(resources, "cpus", taskCPUs)
+ val (afterMemResources, memResourcesToUse) =
+ partitionResources(afterCPUResources.asJava, "mem", taskMemory)
+
+ // If user specifies port numbers in SparkConfig then consecutive tasks will not be launched
+ // on the same host. This essentially means one executor per host.
+ // TODO: handle network isolator case
+ val (nonPortResources, portResourcesToUse) =
+ partitionPortResources(nonZeroPortValuesFromConfig(sc.conf), afterMemResources)
+
+ (nonPortResources, cpuResourcesToUse ++ memResourcesToUse ++ portResourcesToUse)
+ }
+
private def canLaunchTask(slaveId: String, resources: JList[Resource]): Boolean = {
val offerMem = getResource(resources, "mem")
val offerCPUs = getResource(resources, "cpus").toInt
val cpus = executorCores(offerCPUs)
val mem = executorMemory(sc)
+ val ports = getRangeResource(resources, "ports")
+ val meetsPortRequirements = checkPorts(sc.conf, ports)
cpus > 0 &&
cpus <= offerCPUs &&
cpus + totalCoresAcquired <= maxCores &&
mem <= offerMem &&
numExecutors() < executorLimit &&
- slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES
+ slaves.get(slaveId).map(_.taskFailures).getOrElse(0) < MAX_SLAVE_FAILURES &&
+ meetsPortRequirements
}
private def executorCores(offerCPUs: Int): Int = {
@@ -613,7 +636,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private class Slave(val hostname: String) {
- val taskIDs = new HashSet[String]()
+ val taskIDs = new mutable.HashSet[String]()
var taskFailures = 0
var shuffleRegistered = false
}
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 81db789166..1bbede1853 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -47,6 +47,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
+ *
* @param masterUrl The url to connect to Mesos master
* @param scheduler the scheduler class to receive scheduler callbacks
* @param sparkUser User to impersonate with when running tasks
@@ -148,6 +149,20 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
/**
+ * Transforms a range resource to a list of ranges
+ *
+ * @param res the mesos resource list
+ * @param name the name of the resource
+ * @return the list of ranges returned
+ */
+ protected def getRangeResource(res: JList[Resource], name: String): List[(Long, Long)] = {
+ // A resource can have multiple values in the offer since it can either be from
+ // a specific role or wildcard.
+ res.asScala.filter(_.getName == name).flatMap(_.getRanges.getRangeList.asScala
+ .map(r => (r.getBegin, r.getEnd)).toList).toList
+ }
+
+ /**
* Signal that the scheduler has registered with Mesos.
*/
protected def markRegistered(): Unit = {
@@ -172,6 +187,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
+ *
* @param resources The full list of available resources
* @param resourceName The name of the resource to take from the available resources
* @param amountToUse The amount of resources to take from the available resources
@@ -223,7 +239,8 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Converts the attributes from the resource offer into a Map of name -> Attribute Value
* The attribute values are the mesos attribute types and they are
- * @param offerAttributes
+ *
+ * @param offerAttributes the attributes offered
* @return
*/
protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = {
@@ -333,6 +350,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Return the amount of memory to allocate to each executor, taking into account
* container overheads.
+ *
* @param sc SparkContext to use to get `spark.mesos.executor.memoryOverhead` value
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
@@ -358,6 +376,111 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
/**
+ * Checks executor ports if they are within some range of the offered list of ports ranges,
+ *
+ * @param conf the Spark Config
+ * @param ports the list of ports to check
+ * @return true if ports are within range false otherwise
+ */
+ protected def checkPorts(conf: SparkConf, ports: List[(Long, Long)]): Boolean = {
+
+ def checkIfInRange(port: Long, ps: List[(Long, Long)]): Boolean = {
+ ps.exists{case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port }
+ }
+
+ val portsToCheck = nonZeroPortValuesFromConfig(conf)
+ val withinRange = portsToCheck.forall(p => checkIfInRange(p, ports))
+ // make sure we have enough ports to allocate per offer
+ val enoughPorts =
+ ports.map{case (rangeStart, rangeEnd) => rangeEnd - rangeStart + 1}.sum >= portsToCheck.size
+ enoughPorts && withinRange
+ }
+
+ /**
+ * Partitions port resources.
+ *
+ * @param requestedPorts non-zero ports to assign
+ * @param offeredResources the resources offered
+ * @return resources left, port resources to be used.
+ */
+ def partitionPortResources(requestedPorts: List[Long], offeredResources: List[Resource])
+ : (List[Resource], List[Resource]) = {
+ if (requestedPorts.isEmpty) {
+ (offeredResources, List[Resource]())
+ } else {
+ // partition port offers
+ val (resourcesWithoutPorts, portResources) = filterPortResources(offeredResources)
+
+ val portsAndRoles = requestedPorts.
+ map(x => (x, findPortAndGetAssignedRangeRole(x, portResources)))
+
+ val assignedPortResources = createResourcesFromPorts(portsAndRoles)
+
+ // ignore non-assigned port resources, they will be declined implicitly by mesos
+ // no need for splitting port resources.
+ (resourcesWithoutPorts, assignedPortResources)
+ }
+ }
+
+ val managedPortNames = List("spark.executor.port", "spark.blockManager.port")
+
+ /**
+ * The values of the non-zero ports to be used by the executor process.
+ * @param conf the spark config to use
+ * @return the ono-zero values of the ports
+ */
+ def nonZeroPortValuesFromConfig(conf: SparkConf): List[Long] = {
+ managedPortNames.map(conf.getLong(_, 0)).filter( _ != 0)
+ }
+
+ /** Creates a mesos resource for a specific port number. */
+ private def createResourcesFromPorts(portsAndRoles: List[(Long, String)]) : List[Resource] = {
+ portsAndRoles.flatMap{ case (port, role) =>
+ createMesosPortResource(List((port, port)), Some(role))}
+ }
+
+ /** Helper to create mesos resources for specific port ranges. */
+ private def createMesosPortResource(
+ ranges: List[(Long, Long)],
+ role: Option[String] = None): List[Resource] = {
+ ranges.map { case (rangeStart, rangeEnd) =>
+ val rangeValue = Value.Range.newBuilder()
+ .setBegin(rangeStart)
+ .setEnd(rangeEnd)
+ val builder = Resource.newBuilder()
+ .setName("ports")
+ .setType(Value.Type.RANGES)
+ .setRanges(Value.Ranges.newBuilder().addRange(rangeValue))
+ role.foreach(r => builder.setRole(r))
+ builder.build()
+ }
+ }
+
+ /**
+ * Helper to assign a port to an offered range and get the latter's role
+ * info to use it later on.
+ */
+ private def findPortAndGetAssignedRangeRole(port: Long, portResources: List[Resource])
+ : String = {
+
+ val ranges = portResources.
+ map(resource =>
+ (resource.getRole, resource.getRanges.getRangeList.asScala
+ .map(r => (r.getBegin, r.getEnd)).toList))
+
+ val rangePortRole = ranges
+ .find { case (role, rangeList) => rangeList
+ .exists{ case (rangeStart, rangeEnd) => rangeStart <= port & rangeEnd >= port}}
+ // this is safe since we have previously checked about the ranges (see checkPorts method)
+ rangePortRole.map{ case (role, rangeList) => role}.get
+ }
+
+ /** Retrieves the port resources from a list of mesos offered resources */
+ private def filterPortResources(resources: List[Resource]): (List[Resource], List[Resource]) = {
+ resources.partition { r => !(r.getType == Value.Type.RANGES && r.getName == "ports") }
+ }
+
+ /**
* spark.mesos.driver.frameworkId is set by the cluster dispatcher to correlate driver
* submissions with frameworkIDs. However, this causes issues when a driver process launches
* more than one framework (more than one SparkContext(, because they all try to register with