diff options
author | unknown <l00251599@HGHY1L002515991.china.huawei.com> | 2015-04-08 13:56:42 -0700 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2015-04-08 13:56:42 -0700 |
commit | 55a92ef34c0b57b6e379523d5d79baa05392de37 (patch) | |
tree | 9e54dfeb9a75212489c7973bd366d564c635d6ec /yarn | |
parent | 86403f5525782bc9656ab11790f7020baa6b2c1f (diff) | |
download | spark-55a92ef34c0b57b6e379523d5d79baa05392de37.tar.gz spark-55a92ef34c0b57b6e379523d5d79baa05392de37.tar.bz2 spark-55a92ef34c0b57b6e379523d5d79baa05392de37.zip |
[SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic
1. YarnClientSchedulerBack.asyncMonitorApplication use Client.monitorApplication so that commonize the monitor logic
2. Support changing the yarn client monitor interval, see #5292
3. More details see discussion on https://github.com/apache/spark/pull/3143
Author: unknown <l00251599@HGHY1L002515991.china.huawei.com>
Author: Sephiroth-Lin <linwzhong@gmail.com>
Closes #5305 from Sephiroth-Lin/SPARK-4346_3596 and squashes the following commits:
47c0014 [unknown] Edit conflicts
52b29fe [unknown] Interrupt thread when we call stop()
d4298a1 [unknown] Unused, don't push
aaacb42 [Sephiroth-Lin] don't wrap the entire block in the try
ee2b2fd [Sephiroth-Lin] update
6483a2a [unknown] Catch exception
6b47ff7 [unknown] Update code
568f46f [unknown] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 10 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 32 |
2 files changed, 18 insertions, 24 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 79d55a09eb..7219852c0a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication} import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} @@ -561,7 +562,14 @@ private[spark] class Client( var lastState: YarnApplicationState = null while (true) { Thread.sleep(interval) - val report = getApplicationReport(appId) + val report: ApplicationReport = + try { + getApplicationReport(appId) + } catch { + case e: ApplicationNotFoundException => + logError(s"Application $appId not found.") + return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED) + } val state = report.getYarnApplicationState if (logApplicationReport) { diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 8abdc26b43..407dc1ac4d 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend( private var client: Client = null private var appId: ApplicationId = null - @volatile private var stopping: Boolean = false + private var monitorThread: Thread = null /** * Create a Yarn client to submit an application to the ResourceManager. @@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend( client = new Client(args, conf) appId = client.submitApplication() waitForApplication() - asyncMonitorApplication() + monitorThread = asyncMonitorApplication() + monitorThread.start() } /** @@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend( * If the application has exited for any reason, stop the SparkContext. * This assumes both `client` and `appId` have already been set. */ - private def asyncMonitorApplication(): Unit = { + private def asyncMonitorApplication(): Thread = { assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { - while (!stopping) { - var state: YarnApplicationState = null - try { - val report = client.getApplicationReport(appId) - state = report.getYarnApplicationState() - } catch { - case e: ApplicationNotFoundException => - state = YarnApplicationState.KILLED - } - if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.KILLED || - state == YarnApplicationState.FAILED) { - logError(s"Yarn application has already exited with state $state!") - sc.stop() - stopping = true - } - Thread.sleep(1000L) - } + val (state, _) = client.monitorApplication(appId, logApplicationReport = false) + logError(s"Yarn application has already exited with state $state!") + sc.stop() Thread.currentThread().interrupt() } } t.setName("Yarn application state monitor") t.setDaemon(true) - t.start() + t } /** @@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend( */ override def stop() { assert(client != null, "Attempted to stop this scheduler before starting it!") - stopping = true + monitorThread.interrupt() super.stop() client.stop() logInfo("Stopped") |