aboutsummaryrefslogtreecommitdiff
path: root/resource-managers
diff options
context:
space:
mode:
authorDevaraj K <devaraj@apache.org>2017-01-03 11:02:42 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2017-01-03 11:02:42 -0800
commit89bf370e4f53c02b018b23adc653cd718869489e (patch)
treeb36dc855d30a5f95efb0ec8979dfa317d569671d /resource-managers
parent7a2b5f93bc3d3224470837ed3323964ba7cb1dca (diff)
downloadspark-89bf370e4f53c02b018b23adc653cd718869489e.tar.gz
spark-89bf370e4f53c02b018b23adc653cd718869489e.tar.bz2
spark-89bf370e4f53c02b018b23adc653cd718869489e.zip
[SPARK-15555][MESOS] Driver with --supervise option cannot be killed in Mesos mode
## What changes were proposed in this pull request? Not adding the Killed applications for retry. ## How was this patch tested? I have verified manually in the Mesos cluster, with the changes the killed applications move to Finished Drivers section and will not retry. Author: Devaraj K <devaraj@apache.org> Closes #13323 from devaraj-kavali/SPARK-15555.
Diffstat (limited to 'resource-managers')
-rw-r--r--resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala1
-rw-r--r--resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala57
2 files changed, 56 insertions, 2 deletions
diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index f384290a6f..c5bbcb9bef 100644
--- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -647,7 +647,6 @@ private[spark] class MesosClusterScheduler(
*/
private def shouldRelaunch(state: MesosTaskState): Boolean = {
state == MesosTaskState.TASK_FAILED ||
- state == MesosTaskState.TASK_KILLED ||
state == MesosTaskState.TASK_LOST
}
diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
index 74e5ce227d..b9d098486b 100644
--- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
+++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala
@@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date}
import scala.collection.JavaConverters._
-import org.apache.mesos.Protos._
+import org.apache.mesos.Protos.{TaskState => MesosTaskState, _}
import org.apache.mesos.Protos.Value.{Scalar, Type}
import org.apache.mesos.SchedulerDriver
import org.mockito.{ArgumentCaptor, Matchers}
@@ -236,4 +236,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi
assert(networkInfos.size == 1)
assert(networkInfos.get(0).getName == "test-network-name")
}
+
+ test("can kill supervised drivers") {
+ val driver = mock[SchedulerDriver]
+ val conf = new SparkConf()
+ conf.setMaster("mesos://localhost:5050")
+ conf.setAppName("spark mesos")
+ scheduler = new MesosClusterScheduler(
+ new BlackHoleMesosClusterPersistenceEngineFactory, conf) {
+ override def start(): Unit = {
+ ready = true
+ mesosDriver = driver
+ }
+ }
+ scheduler.start()
+
+ val response = scheduler.submitDriver(
+ new MesosDriverDescription("d1", "jar", 100, 1, true, command,
+ Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date()))
+ assert(response.success)
+ val slaveId = SlaveID.newBuilder().setValue("s1").build()
+ val offer = Offer.newBuilder()
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR))
+ .addResources(
+ Resource.newBuilder().setRole("*")
+ .setScalar(Scalar.newBuilder().setValue(1000).build())
+ .setName("mem")
+ .setType(Type.SCALAR))
+ .setId(OfferID.newBuilder().setValue("o1").build())
+ .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build())
+ .setSlaveId(slaveId)
+ .setHostname("host1")
+ .build()
+ // Offer the resource to launch the submitted driver
+ scheduler.resourceOffers(driver, Collections.singletonList(offer))
+ var state = scheduler.getSchedulerState()
+ assert(state.launchedDrivers.size == 1)
+ // Issue the request to kill the launched driver
+ val killResponse = scheduler.killDriver(response.submissionId)
+ assert(killResponse.success)
+
+ val taskStatus = TaskStatus.newBuilder()
+ .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build())
+ .setSlaveId(slaveId)
+ .setState(MesosTaskState.TASK_KILLED)
+ .build()
+ // Update the status of the killed task
+ scheduler.statusUpdate(driver, taskStatus)
+ // Driver should be moved to finishedDrivers for kill
+ state = scheduler.getSchedulerState()
+ assert(state.pendingRetryDrivers.isEmpty)
+ assert(state.launchedDrivers.isEmpty)
+ assert(state.finishedDrivers.size == 1)
+ }
}