aboutsummaryrefslogtreecommitdiff
path: root/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala60
1 files changed, 47 insertions, 13 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1d742fefbb..3f25535cb5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -46,9 +46,6 @@ trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)
- // Driver for talking to Mesos
- protected var mesosDriver: SchedulerDriver = null
-
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
@@ -115,10 +112,6 @@ trait MesosSchedulerUtils extends Logging {
*/
def startScheduler(newDriver: SchedulerDriver): Unit = {
synchronized {
- if (mesosDriver != null) {
- registerLatch.await()
- return
- }
@volatile
var error: Option[Exception] = None
@@ -128,8 +121,7 @@ trait MesosSchedulerUtils extends Logging {
setDaemon(true)
override def run() {
try {
- mesosDriver = newDriver
- val ret = mesosDriver.run()
+ val ret = newDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
@@ -379,12 +371,24 @@ trait MesosSchedulerUtils extends Logging {
}
}
- protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
- sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+ private def getRejectOfferDurationStr(conf: SparkConf): String = {
+ conf.get("spark.mesos.rejectOfferDuration", "120s")
+ }
+
+ protected def getRejectOfferDuration(conf: SparkConf): Long = {
+ Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf))
+ }
+
+ protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = {
+ conf.getTimeAsSeconds(
+ "spark.mesos.rejectOfferDurationForUnmetConstraints",
+ getRejectOfferDurationStr(conf))
}
- protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
- sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+ protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = {
+ conf.getTimeAsSeconds(
+ "spark.mesos.rejectOfferDurationForReachedMaxCores",
+ getRejectOfferDurationStr(conf))
}
/**
@@ -438,6 +442,7 @@ trait MesosSchedulerUtils extends Logging {
/**
* The values of the non-zero ports to be used by the executor process.
+ *
* @param conf the spark config to use
* @return the ono-zero values of the ports
*/
@@ -521,4 +526,33 @@ trait MesosSchedulerUtils extends Logging {
case TaskState.KILLED => MesosTaskState.TASK_KILLED
case TaskState.LOST => MesosTaskState.TASK_LOST
}
+
+ protected def declineOffer(
+ driver: org.apache.mesos.SchedulerDriver,
+ offer: Offer,
+ reason: Option[String] = None,
+ refuseSeconds: Option[Long] = None): Unit = {
+
+ val id = offer.getId.getValue
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus")
+ val ports = getRangeResource(offer.getResourcesList, "ports")
+
+ logDebug(s"Declining offer: $id with " +
+ s"attributes: $offerAttributes " +
+ s"mem: $mem " +
+ s"cpu: $cpus " +
+ s"port: $ports " +
+ refuseSeconds.map(s => s"for ${s} seconds ").getOrElse("") +
+ reason.map(r => s" (reason: $r)").getOrElse(""))
+
+ refuseSeconds match {
+ case Some(seconds) =>
+ val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+ driver.declineOffer(offer.getId, filters)
+ case _ =>
+ driver.declineOffer(offer.getId)
+ }
+ }
}