aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorWangTaoTheTonic <wangtao111@huawei.com>2015-04-07 08:36:25 -0500
committerThomas Graves <tgraves@apache.org>2015-04-07 08:36:25 -0500
commitb65bad65c3500475b974ca0219f218eef296db2c (patch)
treeb71870446ab8931b39c11d2bcbb4c54eaf776d77 /yarn
parentae980eb41c00b5f1f64c650f267b884e864693f0 (diff)
downloadspark-b65bad65c3500475b974ca0219f218eef296db2c.tar.gz
spark-b65bad65c3500475b974ca0219f218eef296db2c.tar.bz2
spark-b65bad65c3500475b974ca0219f218eef296db2c.zip
[SPARK-3591][YARN]fire and forget for YARN cluster mode
https://issues.apache.org/jira/browse/SPARK-3591 The output after this patch: >doggie153:/opt/oss/spark-1.3.0-bin-hadoop2.4/bin # ./spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster ../lib/spark-examples*.jar 15/03/31 21:15:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 15/03/31 21:15:25 INFO RMProxy: Connecting to ResourceManager at doggie153/10.177.112.153:8032 15/03/31 21:15:25 INFO Client: Requesting a new application from cluster with 4 NodeManagers 15/03/31 21:15:25 INFO Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container) 15/03/31 21:15:25 INFO Client: Will allocate AM container, with 896 MB memory including 384 MB overhead 15/03/31 21:15:25 INFO Client: Setting up container launch context for our AM 15/03/31 21:15:25 INFO Client: Preparing resources for our AM container 15/03/31 21:15:26 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-assembly-1.4.0-SNAPSHOT-hadoop2.4.1.jar 15/03/31 21:15:27 INFO Client: Uploading resource file:/opt/oss/spark-1.3.0-bin-hadoop2.4/lib/spark-examples-1.3.0-hadoop2.4.0.jar -> hdfs://doggie153:9000/user/root/.sparkStaging/application_1427257505534_0016/spark-examples-1.3.0-hadoop2.4.0.jar 15/03/31 21:15:28 INFO Client: Setting up the launch environment for our AM container 15/03/31 21:15:28 INFO SecurityManager: Changing view acls to: root 15/03/31 21:15:28 INFO SecurityManager: Changing modify acls to: root 15/03/31 21:15:28 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root) 15/03/31 21:15:28 INFO Client: Submitting application 16 to ResourceManager 15/03/31 21:15:28 INFO YarnClientImpl: Submitted application application_1427257505534_0016 15/03/31 21:15:28 INFO Client: ... waiting before polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: ... polling ResourceManager for application state 15/03/31 21:15:33 INFO Client: Application report for application_1427257505534_0016 (state: RUNNING) 15/03/31 21:15:33 INFO Client: client token: N/A diagnostics: N/A ApplicationMaster host: doggie157 ApplicationMaster RPC port: 0 queue: default start time: 1427807728307 final status: UNDEFINED tracking URL: http://doggie153:8088/proxy/application_1427257505534_0016/ user: root /cc andrewor14 Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #5297 from WangTaoTheTonic/SPARK-3591 and squashes the following commits: c76d232 [WangTaoTheTonic] wrap lines 16c90a8 [WangTaoTheTonic] move up lines to avoid duplicate fea390d [WangTaoTheTonic] log failed/killed report, style and comment be1cc2e [WangTaoTheTonic] reword f0bc54f [WangTaoTheTonic] minor: expose appid in excepiton messages ba9b22b [WangTaoTheTonic] wrong config name e1a4013 [WangTaoTheTonic] revert to the old version and do some robust 19706c0 [WangTaoTheTonic] add a config to control whether to forget 0cbdce8 [WangTaoTheTonic] fire and forget for YARN cluster mode
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala83
1 files changed, 50 insertions, 33 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 61f8fc3f5a..79d55a09eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -66,6 +66,8 @@ private[spark] class Client(
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
private val distCacheMgr = new ClientDistributedCacheManager()
private val isClusterMode = args.isClusterMode
+ private val fireAndForget = isClusterMode &&
+ !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
def stop(): Unit = yarnClient.stop()
@@ -564,31 +566,13 @@ private[spark] class Client(
if (logApplicationReport) {
logInfo(s"Application report for $appId (state: $state)")
- val details = Seq[(String, String)](
- ("client token", getClientToken(report)),
- ("diagnostics", report.getDiagnostics),
- ("ApplicationMaster host", report.getHost),
- ("ApplicationMaster RPC port", report.getRpcPort.toString),
- ("queue", report.getQueue),
- ("start time", report.getStartTime.toString),
- ("final status", report.getFinalApplicationStatus.toString),
- ("tracking URL", report.getTrackingUrl),
- ("user", report.getUser)
- )
-
- // Use more loggable format if value is null or empty
- val formattedDetails = details
- .map { case (k, v) =>
- val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
- s"\n\t $k: $newValue" }
- .mkString("")
// If DEBUG is enabled, log report details every iteration
// Otherwise, log them every time the application changes state
if (log.isDebugEnabled) {
- logDebug(formattedDetails)
+ logDebug(formatReportDetails(report))
} else if (lastState != state) {
- logInfo(formattedDetails)
+ logInfo(formatReportDetails(report))
}
}
@@ -609,24 +593,57 @@ private[spark] class Client(
throw new SparkException("While loop is depleted! This should never happen...")
}
+ private def formatReportDetails(report: ApplicationReport): String = {
+ val details = Seq[(String, String)](
+ ("client token", getClientToken(report)),
+ ("diagnostics", report.getDiagnostics),
+ ("ApplicationMaster host", report.getHost),
+ ("ApplicationMaster RPC port", report.getRpcPort.toString),
+ ("queue", report.getQueue),
+ ("start time", report.getStartTime.toString),
+ ("final status", report.getFinalApplicationStatus.toString),
+ ("tracking URL", report.getTrackingUrl),
+ ("user", report.getUser)
+ )
+
+ // Use more loggable format if value is null or empty
+ details.map { case (k, v) =>
+ val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A")
+ s"\n\t $k: $newValue"
+ }.mkString("")
+ }
+
/**
- * Submit an application to the ResourceManager and monitor its state.
- * This continues until the application has exited for any reason.
+ * Submit an application to the ResourceManager.
+ * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
+ * reporting the application's status until the application has exited for any reason.
+ * Otherwise, the client process will exit after submission.
* If the application finishes with a failed, killed, or undefined status,
* throw an appropriate SparkException.
*/
def run(): Unit = {
- val (yarnApplicationState, finalApplicationStatus) = monitorApplication(submitApplication())
- if (yarnApplicationState == YarnApplicationState.FAILED ||
- finalApplicationStatus == FinalApplicationStatus.FAILED) {
- throw new SparkException("Application finished with failed status")
- }
- if (yarnApplicationState == YarnApplicationState.KILLED ||
- finalApplicationStatus == FinalApplicationStatus.KILLED) {
- throw new SparkException("Application is killed")
- }
- if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
- throw new SparkException("The final status of application is undefined")
+ val appId = submitApplication()
+ if (fireAndForget) {
+ val report = getApplicationReport(appId)
+ val state = report.getYarnApplicationState
+ logInfo(s"Application report for $appId (state: $state)")
+ logInfo(formatReportDetails(report))
+ if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
+ throw new SparkException(s"Application $appId finished with status: $state")
+ }
+ } else {
+ val (yarnApplicationState, finalApplicationStatus) = monitorApplication(appId)
+ if (yarnApplicationState == YarnApplicationState.FAILED ||
+ finalApplicationStatus == FinalApplicationStatus.FAILED) {
+ throw new SparkException(s"Application $appId finished with failed status")
+ }
+ if (yarnApplicationState == YarnApplicationState.KILLED ||
+ finalApplicationStatus == FinalApplicationStatus.KILLED) {
+ throw new SparkException(s"Application $appId is killed")
+ }
+ if (finalApplicationStatus == FinalApplicationStatus.UNDEFINED) {
+ throw new SparkException(s"The final status of application $appId is undefined")
+ }
}
}
}