aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSun Rui <sunrui2016@gmail.com>2016-08-09 09:39:45 +0100
committerSean Owen <sowen@cloudera.com>2016-08-09 09:39:45 +0100
commitaf710e5bdda9da04dbba615e219e7e496ca82acc (patch)
tree937445876c6f31ab8aaa09e5d36726450a5e10e6
parent801e4d097f45b269a9c6b25723d925f3e24ba498 (diff)
downloadspark-af710e5bdda9da04dbba615e219e7e496ca82acc.tar.gz
spark-af710e5bdda9da04dbba615e219e7e496ca82acc.tar.bz2
spark-af710e5bdda9da04dbba615e219e7e496ca82acc.zip
[SPARK-16522][MESOS] Spark application throws exception on exit.
## What changes were proposed in this pull request? Spark applications running on Mesos throw exception upon exit. For details, refer to https://issues.apache.org/jira/browse/SPARK-16522. I am not sure if there is any better fix, so wait for review comments. ## How was this patch tested? Manual test. Observed that the exception is gone upon application exit. Author: Sun Rui <sunrui2016@gmail.com> Closes #14175 from sun-rui/SPARK-16522.
-rw-r--r--core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala7
-rw-r--r--core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala33
2 files changed, 39 insertions, 1 deletions
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index 263e6197a6..5177557132 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -553,7 +553,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
taskId: String,
reason: String): Unit = {
stateLock.synchronized {
- removeExecutor(taskId, SlaveLost(reason))
+ // Do not call removeExecutor() after this scheduler backend was stopped because
+ // removeExecutor() internally will send a message to the driver endpoint but
+ // the driver endpoint is not available now, otherwise an exception will be thrown.
+ if (!stopCalled) {
+ removeExecutor(taskId, SlaveLost(reason))
+ }
slaves(slaveId).taskIDs.remove(taskId)
}
}
diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
index a74fdf79a1..0e66979901 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala
@@ -21,6 +21,7 @@ import java.util.Collections
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
+import scala.reflect.ClassTag
import org.apache.mesos.{Protos, Scheduler, SchedulerDriver}
import org.apache.mesos.Protos._
@@ -33,6 +34,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcEndpointRef
+import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RemoveExecutor
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.mesos.Utils._
@@ -47,6 +49,7 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
private var backend: MesosCoarseGrainedSchedulerBackend = _
private var externalShuffleClient: MesosExternalShuffleClient = _
private var driverEndpoint: RpcEndpointRef = _
+ @volatile private var stopCalled = false
test("mesos supports killing and limiting executors") {
setBackend()
@@ -341,6 +344,32 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
assert(!dockerInfo.getForcePullImage)
}
+ test("Do not call removeExecutor() after backend is stopped") {
+ setBackend()
+
+ // launches a task on a valid offer
+ val offers = List((backend.executorMemory(sc), 1))
+ offerResources(offers)
+ verifyTaskLaunched(driver, "o1")
+
+ // launches a thread simulating status update
+ val statusUpdateThread = new Thread {
+ override def run(): Unit = {
+ while (!stopCalled) {
+ Thread.sleep(100)
+ }
+
+ val status = createTaskStatus("0", "s1", TaskState.TASK_FINISHED)
+ backend.statusUpdate(driver, status)
+ }
+ }.start
+
+ backend.stop()
+ // Any method of the backend involving sending messages to the driver endpoint should not
+ // be called after the backend is stopped.
+ verify(driverEndpoint, never()).askWithRetry(isA(classOf[RemoveExecutor]))(any[ClassTag[_]])
+ }
+
private def verifyDeclinedOffer(driver: SchedulerDriver,
offerId: OfferID,
filter: Boolean = false): Unit = {
@@ -396,6 +425,10 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
mesosDriver = newDriver
}
+ override def stopExecutors(): Unit = {
+ stopCalled = true
+ }
+
markRegistered()
}
backend.start()