aboutsummaryrefslogtreecommitdiff
path: root/mesos
diff options
context:
space:
mode:
Diffstat (limited to 'mesos')
-rw-r--r--mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala5
-rw-r--r--mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala13
2 files changed, 13 insertions, 5 deletions
diff --git a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index fde1fb3228..a64b5768c5 100644
--- a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -23,6 +23,7 @@ import java.util.concurrent.locks.ReentrantLock
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.concurrent.Future
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
@@ -606,7 +607,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
super.applicationId
}
- override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
+ override def doRequestTotalExecutors(requestedTotal: Int): Future[Boolean] = Future.successful {
// We don't truly know if we can fulfill the full amount of executors
// since at coarse grain it depends on the amount of slaves available.
logInfo("Capping the total amount of executors to " + requestedTotal)
@@ -614,7 +615,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
true
}
- override def doKillExecutors(executorIds: Seq[String]): Boolean = {
+ override def doKillExecutors(executorIds: Seq[String]): Future[Boolean] = Future.successful {
if (mesosDriver == null) {
logWarning("Asked to kill executors before the Mesos driver was started.")
false
diff --git a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index d98ddb2700..6948be0ead 100644
--- a/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.scheduler.cluster.mesos
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.duration._
import scala.concurrent.Promise
import scala.reflect.ClassTag
@@ -27,6 +28,7 @@ import org.apache.mesos.Protos._
import org.mockito.Matchers
import org.mockito.Matchers._
import org.mockito.Mockito._
+import org.scalatest.concurrent.ScalaFutures
import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter
@@ -40,7 +42,8 @@ import org.apache.spark.scheduler.cluster.mesos.Utils._
class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
with LocalSparkContext
with MockitoSugar
- with BeforeAndAfter {
+ with BeforeAndAfter
+ with ScalaFutures {
private var sparkConf: SparkConf = _
private var driver: SchedulerDriver = _
@@ -50,6 +53,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var driverEndpoint: RpcEndpointRef = _
@volatile private var stopCalled = false
+ // All 'requests' to the scheduler run immediately on the same thread, so
+ // demand that all futures have their value available immediately.
+ implicit override val patienceConfig = PatienceConfig(timeout = 0.seconds)
+
test("mesos supports killing and limiting executors") {
setBackend()
sparkConf.set("spark.driver.host", "driverHost")
@@ -64,8 +71,8 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
verifyTaskLaunched(driver, "o1")
// kills executors
- backend.doRequestTotalExecutors(0)
- assert(backend.doKillExecutors(Seq("0")))
+ assert(backend.doRequestTotalExecutors(0).futureValue)
+ assert(backend.doKillExecutors(Seq("0")).futureValue)
val taskID0 = createTaskId("0")
verify(driver, times(1)).killTask(taskID0)