diff options
author | Marcelo Vanzin <vanzin@cloudera.com> | 2015-10-09 15:28:09 -0500 |
---|---|---|
committer | Imran Rashid <irashid@cloudera.com> | 2015-10-09 15:28:09 -0500 |
commit | 015f7ef503d5544f79512b6333326749a1f0c48b (patch) | |
tree | 990942b40bd374f632c3954cd4aab3741dd17f63 /yarn/src/main/scala/org | |
parent | 70f44ad2d836236c74e1336a7368982d5fe3abff (diff) | |
download | spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2 spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip |
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app
launched using the library. It also creates a socket-based communication
layer for apps that are launched as child processes; the launching
application listens for connections from launched apps, and once
communication is established, the channel can be used to send updates
to the launching app, or to send commands to the child app.
The change also includes hooks for local, standalone/client and yarn
masters.
Author: Marcelo Vanzin <vanzin@cloudera.com>
Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'yarn/src/main/scala/org')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 43 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala | 10 |
2 files changed, 48 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index eb3b7fb885..cec81b9406 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils private[spark] class Client( @@ -70,8 +70,6 @@ private[spark] class Client( def this(clientArgs: ClientArguments, spConf: SparkConf) = this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf) - def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf()) - private val yarnClient = YarnClient.createYarnClient private val yarnConf = new YarnConfiguration(hadoopConf) private var credentials: Credentials = null @@ -84,10 +82,27 @@ private[spark] class Client( private var principal: String = null private var keytab: String = null + private val launcherBackend = new LauncherBackend() { + override def onStopRequest(): Unit = { + if (isClusterMode && appId != null) { + yarnClient.killApplication(appId) + } else { + setState(SparkAppHandle.State.KILLED) + stop() + } + } + } private val fireAndForget = isClusterMode && !sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true) + private var appId: ApplicationId = null + + def reportLauncherState(state: SparkAppHandle.State): Unit = { + launcherBackend.setState(state) + } + def stop(): Unit = { + launcherBackend.close() yarnClient.stop() // Unset YARN mode system env variable, to allow switching between cluster types. System.clearProperty("SPARK_YARN_MODE") @@ -103,6 +118,7 @@ private[spark] class Client( def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { + launcherBackend.connect() // Setup the credentials before doing anything else, // so we have don't have issues at any point. setupCredentials() @@ -116,6 +132,8 @@ private[spark] class Client( val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() + reportLauncherState(SparkAppHandle.State.SUBMITTED) + launcherBackend.setAppId(appId.toString()) // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) @@ -881,6 +899,20 @@ private[spark] class Client( } } + if (lastState != state) { + state match { + case YarnApplicationState.RUNNING => + reportLauncherState(SparkAppHandle.State.RUNNING) + case YarnApplicationState.FINISHED => + reportLauncherState(SparkAppHandle.State.FINISHED) + case YarnApplicationState.FAILED => + reportLauncherState(SparkAppHandle.State.FAILED) + case YarnApplicationState.KILLED => + reportLauncherState(SparkAppHandle.State.KILLED) + case _ => + } + } + if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { @@ -928,8 +960,8 @@ private[spark] class Client( * throw an appropriate SparkException. */ def run(): Unit = { - val appId = submitApplication() - if (fireAndForget) { + this.appId = submitApplication() + if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport(appId) val state = report.getYarnApplicationState logInfo(s"Application report for $appId (state: $state)") @@ -971,6 +1003,7 @@ private[spark] class Client( } object Client extends Logging { + def main(argStrings: Array[String]) { if (!sys.props.contains("SPARK_SUBMIT")) { logWarning("WARNING: This client is deprecated and will be removed in a " + diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 36d5759554..20771f6554 100644 --- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} +import org.apache.spark.launcher.SparkAppHandle import org.apache.spark.scheduler.TaskSchedulerImpl private[spark] class YarnClientSchedulerBackend( @@ -177,6 +178,15 @@ private[spark] class YarnClientSchedulerBackend( if (monitorThread != null) { monitorThread.stopMonitor() } + + // Report a final state to the launcher if one is connected. This is needed since in client + // mode this backend doesn't let the app monitor loop run to completion, so it does not report + // the final state itself. + // + // Note: there's not enough information at this point to provide a better final state, + // so assume the application was successful. + client.reportLauncherState(SparkAppHandle.State.FINISHED) + super.stop() YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer() client.stop() |