diff options
Diffstat (limited to 'resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala')
-rw-r--r-- | resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala | 60 |
1 files changed, 47 insertions, 13 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 1d742fefbb..3f25535cb5 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -46,9 +46,6 @@ trait MesosSchedulerUtils extends Logging { // Lock used to wait for scheduler to be registered private final val registerLatch = new CountDownLatch(1) - // Driver for talking to Mesos - protected var mesosDriver: SchedulerDriver = null - /** * Creates a new MesosSchedulerDriver that communicates to the Mesos master. * @@ -115,10 +112,6 @@ trait MesosSchedulerUtils extends Logging { */ def startScheduler(newDriver: SchedulerDriver): Unit = { synchronized { - if (mesosDriver != null) { - registerLatch.await() - return - } @volatile var error: Option[Exception] = None @@ -128,8 +121,7 @@ trait MesosSchedulerUtils extends Logging { setDaemon(true) override def run() { try { - mesosDriver = newDriver - val ret = mesosDriver.run() + val ret = newDriver.run() logInfo("driver.run() returned with code " + ret) if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { error = Some(new SparkException("Error starting driver, DRIVER_ABORTED")) @@ -379,12 +371,24 @@ trait MesosSchedulerUtils extends Logging { } } - protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s") + private def getRejectOfferDurationStr(conf: SparkConf): String = { + conf.get("spark.mesos.rejectOfferDuration", "120s") + } + + protected def getRejectOfferDuration(conf: SparkConf): Long = { + Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf)) + } + + protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = { + conf.getTimeAsSeconds( + "spark.mesos.rejectOfferDurationForUnmetConstraints", + getRejectOfferDurationStr(conf)) } - protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = { - sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s") + protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = { + conf.getTimeAsSeconds( + "spark.mesos.rejectOfferDurationForReachedMaxCores", + getRejectOfferDurationStr(conf)) } /** @@ -438,6 +442,7 @@ trait MesosSchedulerUtils extends Logging { /** * The values of the non-zero ports to be used by the executor process. + * * @param conf the spark config to use * @return the ono-zero values of the ports */ @@ -521,4 +526,33 @@ trait MesosSchedulerUtils extends Logging { case TaskState.KILLED => MesosTaskState.TASK_KILLED case TaskState.LOST => MesosTaskState.TASK_LOST } + + protected def declineOffer( + driver: org.apache.mesos.SchedulerDriver, + offer: Offer, + reason: Option[String] = None, + refuseSeconds: Option[Long] = None): Unit = { + + val id = offer.getId.getValue + val offerAttributes = toAttributeMap(offer.getAttributesList) + val mem = getResource(offer.getResourcesList, "mem") + val cpus = getResource(offer.getResourcesList, "cpus") + val ports = getRangeResource(offer.getResourcesList, "ports") + + logDebug(s"Declining offer: $id with " + + s"attributes: $offerAttributes " + + s"mem: $mem " + + s"cpu: $cpus " + + s"port: $ports " + + refuseSeconds.map(s => s"for ${s} seconds ").getOrElse("") + + reason.map(r => s" (reason: $r)").getOrElse("")) + + refuseSeconds match { + case Some(seconds) => + val filters = Filters.newBuilder().setRefuseSeconds(seconds).build() + driver.declineOffer(offer.getId, filters) + case _ => + driver.declineOffer(offer.getId) + } + } } |