aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala33
2 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 263e6197a6..5177557132 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String,
reason: String): Unit = {
stateLock.synchronized {
- removeExecutor(taskId, SlaveLost(reason))
+ // Do not call removeExecutor() after this scheduler backend was stopped because
+ // removeExecutor() internally will send a message to the driver endpoint but
+ // the driver endpoint is not available now, otherwise an exception will be thrown.
+ if (!stopCalled) {
+ removeExecutor(taskId, SlaveLost(reason))
+ }
slaves(slaveId).taskIDs.remove(taskId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a74fdf79a1..0e66979901 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
+ @volatile private var stopCalled = false
test("mesos supports killing and limiting executors") {
setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
+ test("Do not call removeExecutor() after backend is stopped") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val offers = List((backend.executorMemory(sc), 1))
+ offerResources(offers)
+ verifyTaskLaunched(driver, "o1")
+
+ // launches a thread simulating status update
+ val statusUpdateThread = new Thread {
+ override def run(): Unit = {
+ while (!stopCalled) {
+ Thread.sleep(100)
+ }
+
+ val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+ backend.statusUpdate(driver, status)
+ }
+ }.start
+
+ backend.stop()
+ // Any method of the backend involving sending messages to the driver endpoint should not
+ // be called after the backend is stopped.
+ verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver
}
+ override def stopExecutors(): Unit = {
+ stopCalled = true
+ }
+
markRegistered()
}
backend.start()