aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorlinweizhong <linweizhong@huawei.com>2015-08-05 10:16:12 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2015-08-05 10:16:12 -0700
commit7a969a6967c4ecc0f004b73bff27a75257a94e86 (patch)
treee0d39fe44043a667cfb852362971831ae4065b98 /yarn
parent23d982204bb9ef74d3b788a32ce6608116968719 (diff)
downloadspark-7a969a6967c4ecc0f004b73bff27a75257a94e86.tar.gz
spark-7a969a6967c4ecc0f004b73bff27a75257a94e86.tar.bz2
spark-7a969a6967c4ecc0f004b73bff27a75257a94e86.zip
[SPARK-9519] [YARN] Confirm stop sc successfully when application was killed
Currently, when we kill application on Yarn, then will call sc.stop() at Yarn application state monitor thread, then in YarnClientSchedulerBackend.stop() will call interrupt this will cause SparkContext not stop fully as we will wait executor to exit. Author: linweizhong <linweizhong@huawei.com> Closes #7846 from Sephiroth-Lin/SPARK-9519 and squashes the following commits: 1ae736d [linweizhong] Update comments 2e8e365 [linweizhong] Add comment explaining the code ad0e23b [linweizhong] Update 243d2c7 [linweizhong] Confirm stop sc successfully when application was killed
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala47
1 files changed, 32 insertions, 15 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index d97fa2e215..d225061fcd 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -33,7 +33,7 @@ private[spark] class YarnClientSchedulerBackend(
private var client: Client = null
private var appId: ApplicationId = null
- private var monitorThread: Thread = null
+ private var monitorThread: MonitorThread = null
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -132,23 +132,41 @@ private[spark] class YarnClientSchedulerBackend(
}
/**
+ * We create this class for SPARK-9519. Basically when we interrupt the monitor thread it's
+ * because the SparkContext is being shut down(sc.stop() called by user code), but if
+ * monitorApplication return, it means the Yarn application finished before sc.stop() was called,
+ * which means we should call sc.stop() here, and we don't allow the monitor to be interrupted
+ * before SparkContext stops successfully.
+ */
+ private class MonitorThread extends Thread {
+ private var allowInterrupt = true
+
+ override def run() {
+ try {
+ val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+ logError(s"Yarn application has already exited with state $state!")
+ allowInterrupt = false
+ sc.stop()
+ } catch {
+ case e: InterruptedException => logInfo("Interrupting monitor thread")
+ }
+ }
+
+ def stopMonitor(): Unit = {
+ if (allowInterrupt) {
+ this.interrupt()
+ }
+ }
+ }
+
+ /**
* Monitor the application state in a separate thread.
* If the application has exited for any reason, stop the SparkContext.
* This assumes both `client` and `appId` have already been set.
*/
- private def asyncMonitorApplication(): Thread = {
+ private def asyncMonitorApplication(): MonitorThread = {
assert(client != null && appId != null, "Application has not been submitted yet!")
- val t = new Thread {
- override def run() {
- try {
- val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
- logError(s"Yarn application has already exited with state $state!")
- sc.stop()
- } catch {
- case e: InterruptedException => logInfo("Interrupting monitor thread")
- }
- }
- }
+ val t = new MonitorThread
t.setName("Yarn application state monitor")
t.setDaemon(true)
t
@@ -160,7 +178,7 @@ private[spark] class YarnClientSchedulerBackend(
override def stop() {
assert(client != null, "Attempted to stop this scheduler before starting it!")
if (monitorThread != null) {
- monitorThread.interrupt()
+ monitorThread.stopMonitor()
}
super.stop()
client.stop()
@@ -174,5 +192,4 @@ private[spark] class YarnClientSchedulerBackend(
super.applicationId
}
}
-
}