diff options
author | Charles Allen <charles@allen-net.com> | 2016-02-04 10:27:25 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2016-02-04 10:27:25 -0800 |
commit | 2eaeafe8a2aa31be9b230b8d53d3baccd32535b1 (patch) | |
tree | 5bc7e36e3f6ac2d3c6ff9ee2ca46723d1337ac0d /core | |
parent | dee801adb78d6abd0abbf76b4dfa71aa296b4f0b (diff) | |
download | spark-2eaeafe8a2aa31be9b230b8d53d3baccd32535b1.tar.gz spark-2eaeafe8a2aa31be9b230b8d53d3baccd32535b1.tar.bz2 spark-2eaeafe8a2aa31be9b230b8d53d3baccd32535b1.zip |
[SPARK-12330][MESOS] Fix mesos coarse mode cleanup
In the current implementation the mesos coarse scheduler does not wait for the mesos tasks to complete before ending the driver. This causes a race where the task has to finish cleaning up before the mesos driver terminates it with a SIGINT (and SIGKILL after 3 seconds if the SIGINT doesn't work).
This PR causes the mesos coarse scheduler to wait for the mesos tasks to finish (with a timeout defined by `spark.mesos.coarse.shutdown.ms`)
This PR also fixes a regression caused by [SPARK-10987] whereby submitting a shutdown causes a race between the local shutdown procedure and the notification of the scheduler driver disconnection. If the scheduler driver disconnection wins the race, the coarse executor incorrectly exits with status 1 (instead of the proper status 0)
With this patch the mesos coarse scheduler terminates properly, the executors clean up, and the tasks are reported as `FINISHED` in the Mesos console (as opposed to `KILLED` in < 1.6 or `FAILED` in 1.6 and later)
Author: Charles Allen <charles@allen-net.com>
Closes #10319 from drcrallen/SPARK-12330.
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala | 8 | ||||
-rw-r--r-- | core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala | 39 |
2 files changed, 45 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 136cf4a84d..3b5cb18da1 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -19,6 +19,7 @@ package org.apache.spark.executor import java.net.URL import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable import scala.util.{Failure, Success} @@ -42,6 +43,7 @@ private[spark] class CoarseGrainedExecutorBackend( env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { + private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None @@ -102,19 +104,23 @@ private[spark] class CoarseGrainedExecutorBackend( } case StopExecutor => + stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => + stopping.set(true) executor.stop() stop() rpcEnv.shutdown() } override def onDisconnected(remoteAddress: RpcAddress): Unit = { - if (driver.exists(_.address == remoteAddress)) { + if (stopping.get()) { + logInfo(s"Driver from $remoteAddress disconnected during shutdown") + } else if (driver.exists(_.address == remoteAddress)) { logError(s"Driver $remoteAddress disassociated! Shutting down.") System.exit(1) } else { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 2f095b86c6..722293bb7a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -19,11 +19,13 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{Collections, List => JList} +import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} +import com.google.common.base.Stopwatch import com.google.common.collect.HashBiMap import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver} import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} @@ -60,6 +62,12 @@ private[spark] class CoarseMesosSchedulerBackend( // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + private[this] val shutdownTimeoutMS = conf.getTimeAsMs("spark.mesos.coarse.shutdown.ms", "10s") + .ensuring(_ >= 0, "spark.mesos.coarse.shutdown.ms must be >= 0") + + // Synchronization protected by stateLock + private[this] var stopCalled: Boolean = false + // If shuffle service is enabled, the Spark driver will register with the shuffle service. // This is for cleaning up shuffle files reliably. private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) @@ -245,6 +253,13 @@ private[spark] class CoarseMesosSchedulerBackend( */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { + if (stopCalled) { + logDebug("Ignoring offers during shutdown") + // Driver should simply return a stopped status on race + // condition between this.stop() and completing here + offers.asScala.map(_.getId).foreach(d.declineOffer) + return + } val filters = Filters.newBuilder().setRefuseSeconds(5).build() for (offer <- offers.asScala) { val offerAttributes = toAttributeMap(offer.getAttributesList) @@ -364,7 +379,29 @@ private[spark] class CoarseMesosSchedulerBackend( } override def stop() { - super.stop() + // Make sure we're not launching tasks during shutdown + stateLock.synchronized { + if (stopCalled) { + logWarning("Stop called multiple times, ignoring") + return + } + stopCalled = true + super.stop() + } + // Wait for executors to report done, or else mesosDriver.stop() will forcefully kill them. + // See SPARK-12330 + val stopwatch = new Stopwatch() + stopwatch.start() + // slaveIdsWithExecutors has no memory barrier, so this is eventually consistent + while (slaveIdsWithExecutors.nonEmpty && + stopwatch.elapsed(TimeUnit.MILLISECONDS) < shutdownTimeoutMS) { + Thread.sleep(100) + } + if (slaveIdsWithExecutors.nonEmpty) { + logWarning(s"Timed out waiting for ${slaveIdsWithExecutors.size} remaining executors " + + s"to terminate within $shutdownTimeoutMS ms. This may leave temporary files " + + "on the mesos nodes.") + } if (mesosDriver != null) { mesosDriver.stop() } |