aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorunknown <l00251599@HGHY1L002515991.china.huawei.com>2015-04-08 13:56:42 -0700
committerAndrew Or <andrew@databricks.com>2015-04-08 13:56:42 -0700
commit55a92ef34c0b57b6e379523d5d79baa05392de37 (patch)
tree9e54dfeb9a75212489c7973bd366d564c635d6ec /yarn
parent86403f5525782bc9656ab11790f7020baa6b2c1f (diff)
downloadspark-55a92ef34c0b57b6e379523d5d79baa05392de37.tar.gz
spark-55a92ef34c0b57b6e379523d5d79baa05392de37.tar.bz2
spark-55a92ef34c0b57b6e379523d5d79baa05392de37.zip
[SPARK-4346][SPARK-3596][YARN] Commonize the monitor logic
1. YarnClientSchedulerBack.asyncMonitorApplication use Client.monitorApplication so that commonize the monitor logic 2. Support changing the yarn client monitor interval, see #5292 3. More details see discussion on https://github.com/apache/spark/pull/3143 Author: unknown <l00251599@HGHY1L002515991.china.huawei.com> Author: Sephiroth-Lin <linwzhong@gmail.com> Closes #5305 from Sephiroth-Lin/SPARK-4346_3596 and squashes the following commits: 47c0014 [unknown] Edit conflicts 52b29fe [unknown] Interrupt thread when we call stop() d4298a1 [unknown] Unused, don't push aaacb42 [Sephiroth-Lin] don't wrap the entire block in the try ee2b2fd [Sephiroth-Lin] update 6483a2a [unknown] Catch exception 6b47ff7 [unknown] Update code 568f46f [unknown] YarnClientSchedulerBack.asyncMonitorApplication should be common with Client.monitorApplication
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala10
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala32
2 files changed, 18 insertions, 24 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 79d55a09eb..7219852c0a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.client.api.{YarnClient, YarnClientApplication}
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
@@ -561,7 +562,14 @@ private[spark] class Client(
var lastState: YarnApplicationState = null
while (true) {
Thread.sleep(interval)
- val report = getApplicationReport(appId)
+ val report: ApplicationReport =
+ try {
+ getApplicationReport(appId)
+ } catch {
+ case e: ApplicationNotFoundException =>
+ logError(s"Application $appId not found.")
+ return (YarnApplicationState.KILLED, FinalApplicationStatus.KILLED)
+ }
val state = report.getYarnApplicationState
if (logApplicationReport) {
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 8abdc26b43..407dc1ac4d 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -34,7 +34,7 @@ private[spark] class YarnClientSchedulerBackend(
private var client: Client = null
private var appId: ApplicationId = null
- @volatile private var stopping: Boolean = false
+ private var monitorThread: Thread = null
/**
* Create a Yarn client to submit an application to the ResourceManager.
@@ -57,7 +57,8 @@ private[spark] class YarnClientSchedulerBackend(
client = new Client(args, conf)
appId = client.submitApplication()
waitForApplication()
- asyncMonitorApplication()
+ monitorThread = asyncMonitorApplication()
+ monitorThread.start()
}
/**
@@ -123,34 +124,19 @@ private[spark] class YarnClientSchedulerBackend(
* If the application has exited for any reason, stop the SparkContext.
* This assumes both `client` and `appId` have already been set.
*/
- private def asyncMonitorApplication(): Unit = {
+ private def asyncMonitorApplication(): Thread = {
assert(client != null && appId != null, "Application has not been submitted yet!")
val t = new Thread {
override def run() {
- while (!stopping) {
- var state: YarnApplicationState = null
- try {
- val report = client.getApplicationReport(appId)
- state = report.getYarnApplicationState()
- } catch {
- case e: ApplicationNotFoundException =>
- state = YarnApplicationState.KILLED
- }
- if (state == YarnApplicationState.FINISHED ||
- state == YarnApplicationState.KILLED ||
- state == YarnApplicationState.FAILED) {
- logError(s"Yarn application has already exited with state $state!")
- sc.stop()
- stopping = true
- }
- Thread.sleep(1000L)
- }
+ val (state, _) = client.monitorApplication(appId, logApplicationReport = false)
+ logError(s"Yarn application has already exited with state $state!")
+ sc.stop()
Thread.currentThread().interrupt()
}
}
t.setName("Yarn application state monitor")
t.setDaemon(true)
- t.start()
+ t
}
/**
@@ -158,7 +144,7 @@ private[spark] class YarnClientSchedulerBackend(
*/
override def stop() {
assert(client != null, "Attempted to stop this scheduler before starting it!")
- stopping = true
+ monitorThread.interrupt()
super.stop()
client.stop()
logInfo("Stopped")