diff options
Diffstat (limited to 'resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala')
-rw-r--r-- | resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala | 19 |
1 files changed, 13 insertions, 6 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala index 7e561916a7..215271302e 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala @@ -24,6 +24,7 @@ import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.Protos.{ExecutorInfo => MesosExecutorInfo, TaskInfo => MesosTaskInfo, _} +import org.apache.mesos.SchedulerDriver import org.apache.mesos.protobuf.ByteString import org.apache.spark.{SparkContext, SparkException, TaskState} @@ -65,7 +66,9 @@ private[spark] class MesosFineGrainedSchedulerBackend( // reject offers with mismatched constraints in seconds private val rejectOfferDurationForUnmetConstraints = - getRejectOfferDurationForUnmetConstraints(sc) + getRejectOfferDurationForUnmetConstraints(sc.conf) + + private var schedulerDriver: SchedulerDriver = _ @volatile var appId: String = _ @@ -89,6 +92,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( /** * Creates a MesosExecutorInfo that is used to launch a Mesos executor. + * * @param availableResources Available resources that is offered by Mesos * @param execId The executor id to assign to this new executor. * @return A tuple of the new mesos executor info and the remaining available resources. @@ -178,10 +182,13 @@ private[spark] class MesosFineGrainedSchedulerBackend( override def offerRescinded(d: org.apache.mesos.SchedulerDriver, o: OfferID) {} override def registered( - d: org.apache.mesos.SchedulerDriver, frameworkId: FrameworkID, masterInfo: MasterInfo) { + driver: org.apache.mesos.SchedulerDriver, + frameworkId: FrameworkID, + masterInfo: MasterInfo) { inClassLoader() { appId = frameworkId.getValue logInfo("Registered as framework ID " + appId) + this.schedulerDriver = driver markRegistered() } } @@ -383,13 +390,13 @@ private[spark] class MesosFineGrainedSchedulerBackend( } override def stop() { - if (mesosDriver != null) { - mesosDriver.stop() + if (schedulerDriver != null) { + schedulerDriver.stop() } } override def reviveOffers() { - mesosDriver.reviveOffers() + schedulerDriver.reviveOffers() } override def frameworkMessage( @@ -426,7 +433,7 @@ private[spark] class MesosFineGrainedSchedulerBackend( } override def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = { - mesosDriver.killTask( + schedulerDriver.killTask( TaskID.newBuilder() .setValue(taskId.toString).build() ) |