aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorMichael Gummelt <mgummelt@mesosphere.io>2017-03-07 21:29:08 +0000
committerSean Owen <sowen@cloudera.com>2017-03-07 21:29:08 +0000
commit2e30c0b9bcaa6f7757bd85d1f1ec392d5f916f83 (patch)
treeb57ae2c410c9be8e12f8c23a191b4f00acc6e770 /resource-managers
parent6f4684622a951806bebe7652a14f7d1ce03e24c7 (diff)
downloadspark-2e30c0b9bcaa6f7757bd85d1f1ec392d5f916f83.tar.gz
spark-2e30c0b9bcaa6f7757bd85d1f1ec392d5f916f83.tar.bz2
spark-2e30c0b9bcaa6f7757bd85d1f1ec392d5f916f83.zip
[SPARK-19702][MESOS] Increase default refuse_seconds timeout in the Mesos Spark Dispatcher
## What changes were proposed in this pull request? Increase default refuse_seconds timeout, and make it configurable. See JIRA for details on how this reduces the risk of starvation. ## How was this patch tested? Unit tests, Manual testing, and Mesos/Spark integration test suite cc susanxhuynh skonto jmlvanre Author: Michael Gummelt <mgummelt@mesosphere.io> Closes #17031 from mgummelt/SPARK-19702-suppress-revive.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala75
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala69
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala19
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala60
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala51
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala7
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala11
7 files changed, 187 insertions, 105 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 2760f31b12..1bc6f71860 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -152,6 +152,7 @@ private[spark] class MesosClusterScheduler(
// is registered with Mesos master.
@volatile protected var ready = false
private var masterInfo: Option[MasterInfo] = None
+ private var schedulerDriver: SchedulerDriver = _
def submitDriver(desc: MesosDriverDescription): CreateSubmissionResponse = {
val c = new CreateSubmissionResponse
@@ -168,9 +169,8 @@ private[spark] class MesosClusterScheduler(
return c
}
c.submissionId = desc.submissionId
- queuedDriversState.persist(desc.submissionId, desc)
- queuedDrivers += desc
c.success = true
+ addDriverToQueue(desc)
}
c
}
@@ -191,7 +191,7 @@ private[spark] class MesosClusterScheduler(
// 4. Check if it has already completed.
if (launchedDrivers.contains(submissionId)) {
val task = launchedDrivers(submissionId)
- mesosDriver.killTask(task.taskId)
+ schedulerDriver.killTask(task.taskId)
k.success = true
k.message = "Killing running driver"
} else if (removeFromQueuedDrivers(submissionId)) {
@@ -324,7 +324,7 @@ private[spark] class MesosClusterScheduler(
ready = false
metricsSystem.report()
metricsSystem.stop()
- mesosDriver.stop(true)
+ schedulerDriver.stop(true)
}
override def registered(
@@ -340,6 +340,8 @@ private[spark] class MesosClusterScheduler(
stateLock.synchronized {
this.masterInfo = Some(masterInfo)
+ this.schedulerDriver = driver
+
if (!pendingRecover.isEmpty) {
// Start task reconciliation if we need to recover.
val statuses = pendingRecover.collect {
@@ -506,11 +508,10 @@ private[spark] class MesosClusterScheduler(
}
private class ResourceOffer(
- val offerId: OfferID,
- val slaveId: SlaveID,
- var resources: JList[Resource]) {
+ val offer: Offer,
+ var remainingResources: JList[Resource]) {
override def toString(): String = {
- s"Offer id: ${offerId}, resources: ${resources}"
+ s"Offer id: ${offer.getId}, resources: ${remainingResources}"
}
}
@@ -518,16 +519,16 @@ private[spark] class MesosClusterScheduler(
val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
val (remainingResources, cpuResourcesToUse) =
- partitionResources(offer.resources, "cpus", desc.cores)
+ partitionResources(offer.remainingResources, "cpus", desc.cores)
val (finalResources, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", desc.mem)
- offer.resources = finalResources.asJava
+ offer.remainingResources = finalResources.asJava
val appName = desc.conf.get("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
.setTaskId(taskId)
.setName(s"Driver for ${appName}")
- .setSlaveId(offer.slaveId)
+ .setSlaveId(offer.offer.getSlaveId)
.setCommand(buildDriverCommand(desc))
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
@@ -549,23 +550,29 @@ private[spark] class MesosClusterScheduler(
val driverCpu = submission.cores
val driverMem = submission.mem
logTrace(s"Finding offer to launch driver with cpu: $driverCpu, mem: $driverMem")
- val offerOption = currentOffers.find { o =>
- getResource(o.resources, "cpus") >= driverCpu &&
- getResource(o.resources, "mem") >= driverMem
+ val offerOption = currentOffers.find { offer =>
+ getResource(offer.remainingResources, "cpus") >= driverCpu &&
+ getResource(offer.remainingResources, "mem") >= driverMem
}
if (offerOption.isEmpty) {
logDebug(s"Unable to find offer to launch driver id: ${submission.submissionId}, " +
s"cpu: $driverCpu, mem: $driverMem")
} else {
val offer = offerOption.get
- val queuedTasks = tasks.getOrElseUpdate(offer.offerId, new ArrayBuffer[TaskInfo])
+ val queuedTasks = tasks.getOrElseUpdate(offer.offer.getId, new ArrayBuffer[TaskInfo])
try {
val task = createTaskInfo(submission, offer)
queuedTasks += task
- logTrace(s"Using offer ${offer.offerId.getValue} to launch driver " +
+ logTrace(s"Using offer ${offer.offer.getId.getValue} to launch driver " +
submission.submissionId)
- val newState = new MesosClusterSubmissionState(submission, task.getTaskId, offer.slaveId,
- None, new Date(), None, getDriverFrameworkID(submission))
+ val newState = new MesosClusterSubmissionState(
+ submission,
+ task.getTaskId,
+ offer.offer.getSlaveId,
+ None,
+ new Date(),
+ None,
+ getDriverFrameworkID(submission))
launchedDrivers(submission.submissionId) = newState
launchedDriversState.persist(submission.submissionId, newState)
afterLaunchCallback(submission.submissionId)
@@ -588,7 +595,7 @@ private[spark] class MesosClusterScheduler(
val currentTime = new Date()
val currentOffers = offers.asScala.map {
- o => new ResourceOffer(o.getId, o.getSlaveId, o.getResourcesList)
+ offer => new ResourceOffer(offer, offer.getResourcesList)
}.toList
stateLock.synchronized {
@@ -615,8 +622,8 @@ private[spark] class MesosClusterScheduler(
driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava)
}
- for (o <- currentOffers if !tasks.contains(o.offerId)) {
- driver.declineOffer(o.offerId)
+ for (offer <- currentOffers if !tasks.contains(offer.offer.getId)) {
+ declineOffer(driver, offer.offer, None, Some(getRejectOfferDuration(conf)))
}
}
@@ -662,6 +669,12 @@ private[spark] class MesosClusterScheduler(
override def statusUpdate(driver: SchedulerDriver, status: TaskStatus): Unit = {
val taskId = status.getTaskId.getValue
+
+ logInfo(s"Received status update: taskId=${taskId}" +
+ s" state=${status.getState}" +
+ s" message=${status.getMessage}" +
+ s" reason=${status.getReason}");
+
stateLock.synchronized {
if (launchedDrivers.contains(taskId)) {
if (status.getReason == Reason.REASON_RECONCILIATION &&
@@ -682,8 +695,7 @@ private[spark] class MesosClusterScheduler(
val newDriverDescription = state.driverDescription.copy(
retryState = Some(new MesosClusterRetryState(status, retries, nextRetry, waitTimeSec)))
- pendingRetryDrivers += newDriverDescription
- pendingRetryDriversState.persist(taskId, newDriverDescription)
+ addDriverToPending(newDriverDescription, taskId);
} else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
removeFromLaunchedDrivers(taskId)
state.finishDate = Some(new Date())
@@ -746,4 +758,21 @@ private[spark] class MesosClusterScheduler(
def getQueuedDriversSize: Int = queuedDrivers.size
def getLaunchedDriversSize: Int = launchedDrivers.size
def getPendingRetryDriversSize: Int = pendingRetryDrivers.size
+
+ private def addDriverToQueue(desc: MesosDriverDescription): Unit = {
+ queuedDriversState.persist(desc.submissionId, desc)
+ queuedDrivers += desc
+ revive()
+ }
+
+ private def addDriverToPending(desc: MesosDriverDescription, taskId: String) = {
+ pendingRetryDriversState.persist(taskId, desc)
+ pendingRetryDrivers += desc
+ revive()
+ }
+
+ private def revive(): Unit = {
+ logInfo("Reviving Offers.")
+ schedulerDriver.reviveOffers()
+ }
}
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index f69c223ab9..85c2e9c76f 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -26,6 +26,7 @@ import scala.collection.mutable
import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
@@ -119,11 +120,11 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
- getRejectOfferDurationForUnmetConstraints(sc)
+ getRejectOfferDurationForUnmetConstraints(sc.conf)
// Reject offers when we reached the maximum number of cores for this framework
private val rejectOfferDurationForReachedMaxCores =
- getRejectOfferDurationForReachedMaxCores(sc)
+ getRejectOfferDurationForReachedMaxCores(sc.conf)
// A client for talking to the external shuffle service
private val mesosExternalShuffleClient: Option[MesosExternalShuffleClient] = {
@@ -146,6 +147,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
@volatile var appId: String = _
+ private var schedulerDriver: SchedulerDriver = _
+
def newMesosTaskId(): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
@@ -252,9 +255,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
override def registered(
- d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
- appId = frameworkId.getValue
- mesosExternalShuffleClient.foreach(_.init(appId))
+ driver: org.apache.mesos.SchedulerDriver,
+ frameworkId: FrameworkID,
+ masterInfo: MasterInfo) {
+ this.appId = frameworkId.getValue
+ this.mesosExternalShuffleClient.foreach(_.init(appId))
+ this.schedulerDriver = driver
markRegistered()
}
@@ -293,46 +299,25 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
private def declineUnmatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+ driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
offers.foreach { offer =>
- declineOffer(d, offer, Some("unmet constraints"),
+ declineOffer(
+ driver,
+ offer,
+ Some("unmet constraints"),
Some(rejectOfferDurationForUnmetConstraints))
}
}
- private def declineOffer(
- d: org.apache.mesos.SchedulerDriver,
- offer: Offer,
- reason: Option[String] = None,
- refuseSeconds: Option[Long] = None): Unit = {
-
- val id = offer.getId.getValue
- val offerAttributes = toAttributeMap(offer.getAttributesList)
- val mem = getResource(offer.getResourcesList, "mem")
- val cpus = getResource(offer.getResourcesList, "cpus")
- val ports = getRangeResource(offer.getResourcesList, "ports")
-
- logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem" +
- s" cpu: $cpus port: $ports for $refuseSeconds seconds" +
- reason.map(r => s" (reason: $r)").getOrElse(""))
-
- refuseSeconds match {
- case Some(seconds) =>
- val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
- d.declineOffer(offer.getId, filters)
- case _ => d.declineOffer(offer.getId)
- }
- }
-
/**
* Launches executors on accepted offers, and declines unused offers. Executors are launched
* round-robin on offers.
*
- * @param d SchedulerDriver
+ * @param driver SchedulerDriver
* @param offers Mesos offers that match attribute constraints
*/
private def handleMatchedOffers(
- d: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
+ driver: org.apache.mesos.SchedulerDriver, offers: mutable.Buffer[Offer]): Unit = {
val tasks = buildMesosTasks(offers)
for (offer <- offers) {
val offerAttributes = toAttributeMap(offer.getAttributesList)
@@ -358,15 +343,19 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
s" ports: $ports")
}
- d.launchTasks(
+ driver.launchTasks(
Collections.singleton(offer.getId),
offerTasks.asJava)
} else if (totalCoresAcquired >= maxCores) {
// Reject an offer for a configurable amount of time to avoid starving other frameworks
- declineOffer(d, offer, Some("reached spark.cores.max"),
+ declineOffer(driver,
+ offer,
+ Some("reached spark.cores.max"),
Some(rejectOfferDurationForReachedMaxCores))
} else {
- declineOffer(d, offer)
+ declineOffer(
+ driver,
+ offer)
}
}
}
@@ -582,8 +571,8 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
// Close the mesos external shuffle client if used
mesosExternalShuffleClient.foreach(_.close())
- if (mesosDriver != null) {
- mesosDriver.stop()
+ if (schedulerDriver != null) {
+ schedulerDriver.stop()
}
}
@@ -634,13 +623,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
}
override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
- if (mesosDriver == null) {
+ if (schedulerDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
} else {
for (executorId <- executorIds) {
val taskId = TaskID.newBuilder().setValue(executorId).build()
- mesosDriver.killTask(taskId)
+ schedulerDriver.killTask(taskId)
}
// no need to adjust `executorLimitOption` since the AllocationManager already communicated
// the desired limit through a call to `doRequestTotalExecutors`.
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 7e561916a7..215271302e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
@@ -65,7 +66,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
- getRejectOfferDurationForUnmetConstraints(sc)
+ getRejectOfferDurationForUnmetConstraints(sc.conf)
+
+ private var schedulerDriver: SchedulerDriver = _
@volatile var appId: String = _
@@ -89,6 +92,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
/**
* Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+ *
* @param availableResources Available resources that is offered by Mesos
* @param execId The executor id to assign to this new executor.
* @return A tuple of the new mesos executor info and the remaining available resources.
@@ -178,10 +182,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
override def registered(
- d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ driver: org.apache.mesos.SchedulerDriver,
+ frameworkId: FrameworkID,
+ masterInfo: MasterInfo) {
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
+ this.schedulerDriver = driver
markRegistered()
}
}
@@ -383,13 +390,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def stop() {
- if (mesosDriver != null) {
- mesosDriver.stop()
+ if (schedulerDriver != null) {
+ schedulerDriver.stop()
}
}
override def reviveOffers() {
- mesosDriver.reviveOffers()
+ schedulerDriver.reviveOffers()
}
override def frameworkMessage(
@@ -426,7 +433,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
- mesosDriver.killTask(
+ schedulerDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
index 1d742fefbb..3f25535cb5 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala
@@ -46,9 +46,6 @@ trait MesosSchedulerUtils extends Logging {
// Lock used to wait for scheduler to be registered
private final val registerLatch = new CountDownLatch(1)
- // Driver for talking to Mesos
- protected var mesosDriver: SchedulerDriver = null
-
/**
* Creates a new MesosSchedulerDriver that communicates to the Mesos master.
*
@@ -115,10 +112,6 @@ trait MesosSchedulerUtils extends Logging {
*/
def startScheduler(newDriver: SchedulerDriver): Unit = {
synchronized {
- if (mesosDriver != null) {
- registerLatch.await()
- return
- }
@volatile
var error: Option[Exception] = None
@@ -128,8 +121,7 @@ trait MesosSchedulerUtils extends Logging {
setDaemon(true)
override def run() {
try {
- mesosDriver = newDriver
- val ret = mesosDriver.run()
+ val ret = newDriver.run()
logInfo("driver.run() returned with code " + ret)
if (ret != null && ret.equals(Status.DRIVER_ABORTED)) {
error = Some(new SparkException("Error starting driver, DRIVER_ABORTED"))
@@ -379,12 +371,24 @@ trait MesosSchedulerUtils extends Logging {
}
}
- protected def getRejectOfferDurationForUnmetConstraints(sc: SparkContext): Long = {
- sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForUnmetConstraints", "120s")
+ private def getRejectOfferDurationStr(conf: SparkConf): String = {
+ conf.get("spark.mesos.rejectOfferDuration", "120s")
+ }
+
+ protected def getRejectOfferDuration(conf: SparkConf): Long = {
+ Utils.timeStringAsSeconds(getRejectOfferDurationStr(conf))
+ }
+
+ protected def getRejectOfferDurationForUnmetConstraints(conf: SparkConf): Long = {
+ conf.getTimeAsSeconds(
+ "spark.mesos.rejectOfferDurationForUnmetConstraints",
+ getRejectOfferDurationStr(conf))
}
- protected def getRejectOfferDurationForReachedMaxCores(sc: SparkContext): Long = {
- sc.conf.getTimeAsSeconds("spark.mesos.rejectOfferDurationForReachedMaxCores", "120s")
+ protected def getRejectOfferDurationForReachedMaxCores(conf: SparkConf): Long = {
+ conf.getTimeAsSeconds(
+ "spark.mesos.rejectOfferDurationForReachedMaxCores",
+ getRejectOfferDurationStr(conf))
}
/**
@@ -438,6 +442,7 @@ trait MesosSchedulerUtils extends Logging {
/**
* The values of the non-zero ports to be used by the executor process.
+ *
* @param conf the spark config to use
* @return the ono-zero values of the ports
*/
@@ -521,4 +526,33 @@ trait MesosSchedulerUtils extends Logging {
case TaskState.KILLED => MesosTaskState.TASK_KILLED
case TaskState.LOST => MesosTaskState.TASK_LOST
}
+
+ protected def declineOffer(
+ driver: org.apache.mesos.SchedulerDriver,
+ offer: Offer,
+ reason: Option[String] = None,
+ refuseSeconds: Option[Long] = None): Unit = {
+
+ val id = offer.getId.getValue
+ val offerAttributes = toAttributeMap(offer.getAttributesList)
+ val mem = getResource(offer.getResourcesList, "mem")
+ val cpus = getResource(offer.getResourcesList, "cpus")
+ val ports = getRangeResource(offer.getResourcesList, "ports")
+
+ logDebug(s"Declining offer: $id with " +
+ s"attributes: $offerAttributes " +
+ s"mem: $mem " +
+ s"cpu: $cpus " +
+ s"port: $ports " +
+ refuseSeconds.map(s => s"for ${s} seconds ").getOrElse("") +
+ reason.map(r => s" (reason: $r)").getOrElse(""))
+
+ refuseSeconds match {
+ case Some(seconds) =>
+ val filters = Filters.newBuilder().setRefuseSeconds(seconds).build()
+ driver.declineOffer(offer.getId, filters)
+ case _ =>
+ driver.declineOffer(offer.getId)
+ }
+ }
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index b9d098486b..32967b04cd 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -53,19 +53,32 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
override def start(): Unit = { ready = true }
}
scheduler.start()
+ scheduler.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
+ }
+
+ private def testDriverDescription(submissionId: String): MesosDriverDescription = {
+ new MesosDriverDescription(
+ "d1",
+ "jar",
+ 1000,
+ 1,
+ true,
+ command,
+ Map[String, String](),
+ submissionId,
+ new Date())
}
test("can queue drivers") {
setScheduler()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
+ val response = scheduler.submitDriver(testDriverDescription("s1"))
assert(response.success)
- val response2 =
- scheduler.submitDriver(new MesosDriverDescription(
- "d1", "jar", 1000, 1, true, command, Map[String, String](), "s2", new Date()))
+ verify(driver, times(1)).reviveOffers()
+
+ val response2 = scheduler.submitDriver(testDriverDescription("s2"))
assert(response2.success)
+
val state = scheduler.getSchedulerState()
val queuedDrivers = state.queuedDrivers.toList
assert(queuedDrivers(0).submissionId == response.submissionId)
@@ -75,9 +88,7 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
test("can kill queued drivers") {
setScheduler()
- val response = scheduler.submitDriver(
- new MesosDriverDescription("d1", "jar", 1000, 1, true,
- command, Map[String, String](), "s1", new Date()))
+ val response = scheduler.submitDriver(testDriverDescription("s1"))
assert(response.success)
val killResponse = scheduler.killDriver(response.submissionId)
assert(killResponse.success)
@@ -238,18 +249,10 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
}
test("can kill supervised drivers") {
- val driver = mock[SchedulerDriver]
val conf = new SparkConf()
conf.setMaster("mesos://localhost:5050")
conf.setAppName("spark mesos")
- scheduler = new MesosClusterScheduler(
- new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
- override def start(): Unit = {
- ready = true
- mesosDriver = driver
- }
- }
- scheduler.start()
+ setScheduler(conf.getAll.toMap)
val response = scheduler.submitDriver(
new MesosDriverDescription("d1", "jar", 100, 1, true, command,
@@ -291,4 +294,16 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(state.launchedDrivers.isEmpty)
assert(state.finishedDrivers.size == 1)
}
+
+ test("Declines offer with refuse seconds = 120.") {
+ setScheduler()
+
+ val filter = Filters.newBuilder().setRefuseSeconds(120).build()
+ val offerId = OfferID.newBuilder().setValue("o1").build()
+ val offer = Utils.createOffer(offerId.getValue, "s1", 1000, 1)
+
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+
+ verify(driver, times(1)).declineOffer(offerId, filter)
+ }
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index 78346e9744..98033bec6d 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -552,17 +552,14 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
override protected def getShuffleClient(): MesosExternalShuffleClient = shuffleClient
// override to avoid race condition with the driver thread on `mesosDriver`
- override def startScheduler(newDriver: SchedulerDriver): Unit = {
- mesosDriver = newDriver
- }
+ override def startScheduler(newDriver: SchedulerDriver): Unit = {}
override def stopExecutors(): Unit = {
stopCalled = true
}
-
- markRegistered()
}
backend.start()
+ backend.registered(driver, Utils.TEST_FRAMEWORK_ID, Utils.TEST_MASTER_INFO)
backend
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
index 7ebb294aa9..2a67cbc913 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/Utils.scala
@@ -28,6 +28,17 @@ import org.mockito.{ArgumentCaptor, Matchers}
import org.mockito.Mockito._
object Utils {
+
+ val TEST_FRAMEWORK_ID = FrameworkID.newBuilder()
+ .setValue("test-framework-id")
+ .build()
+
+ val TEST_MASTER_INFO = MasterInfo.newBuilder()
+ .setId("test-master")
+ .setIp(0)
+ .setPort(0)
+ .build()
+
def createOffer(
offerId: String,
slaveId: String,