aboutsummaryrefslogtreecommitdiff
path: root/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala19
1 files changed, 13 insertions, 6 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
index 7e561916a7..215271302e 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}
import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _}
+import org.apache.mesos.SchedulerDriver
import org.apache.mesos.protobuf.ByteString
import org.apache.spark.{SparkContext, SparkException, TaskState}
@@ -65,7 +66,9 @@ private[spark] class MesosFineGrainedSchedulerBackend(
// reject offers with mismatched constraints in seconds
private val rejectOfferDurationForUnmetConstraints =
- getRejectOfferDurationForUnmetConstraints(sc)
+ getRejectOfferDurationForUnmetConstraints(sc.conf)
+
+ private var schedulerDriver: SchedulerDriver = _
@volatile var appId: String = _
@@ -89,6 +92,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
/**
* Creates a MesosExecutorInfo that is used to launch a Mesos executor.
+ *
* @param availableResources Available resources that is offered by Mesos
* @param execId The executor id to assign to this new executor.
* @return A tuple of the new mesos executor info and the remaining available resources.
@@ -178,10 +182,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {}
override def registered(
- d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) {
+ driver: org.apache.mesos.SchedulerDriver,
+ frameworkId: FrameworkID,
+ masterInfo: MasterInfo) {
inClassLoader() {
appId = frameworkId.getValue
logInfo("Registered as framework ID " + appId)
+ this.schedulerDriver = driver
markRegistered()
}
}
@@ -383,13 +390,13 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def stop() {
- if (mesosDriver != null) {
- mesosDriver.stop()
+ if (schedulerDriver != null) {
+ schedulerDriver.stop()
}
}
override def reviveOffers() {
- mesosDriver.reviveOffers()
+ schedulerDriver.reviveOffers()
}
override def frameworkMessage(
@@ -426,7 +433,7 @@ private[spark] class MesosFineGrainedSchedulerBackend(
}
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = {
- mesosDriver.killTask(
+ schedulerDriver.killTask(
TaskID.newBuilder()
.setValue(taskId.toString).build()
)