aboutsummaryrefslogtreecommitdiff
path: root/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
diff options
context:
space:
mode:
Diffstat (limited to 'resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala')
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala57
1 files changed, 56 insertions, 1 deletions
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 74e5ce227d..b9d098486b 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
-import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
@@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
}
+
+ test("can kill supervised drivers") {
+ val driver = mock[SchedulerDriver]
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = {
+ ready = true
+ mesosDriver = driver
+ }
+ }
+ scheduler.start()
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date()))
+ assert(response.success)
+ val slaveId = SlaveID.newBuilder().setValue("s1").build()
+ val offer = Offer.newBuilder()
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1000).build())
+ .setName("mem")
+ .setType(Type.SCALAR))
+ .setId(OfferID.newBuilder().setValue("o1").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+ .setSlaveId(slaveId)
+ .setHostname("host1")
+ .build()
+ // Offer the resource to launch the submitted driver
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ var state = scheduler.getSchedulerState()
+ assert(state.launchedDrivers.size == 1)
+ // Issue the request to kill the launched driver
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+
+ val taskStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+ .setSlaveId(slaveId)
+ .setState(MesosTaskState.TASK_KILLED)
+ .build()
+ // Update the status of the killed task
+ scheduler.statusUpdate(driver, taskStatus)
+ // Driver should be moved to finishedDrivers for kill
+ state = scheduler.getSchedulerState()
+ assert(state.pendingRetryDrivers.isEmpty)
+ assert(state.launchedDrivers.isEmpty)
+ assert(state.finishedDrivers.size == 1)
+ }
}