aboutsummaryrefslogtreecommitdiff
path: root/yarn/src/main/scala/org
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-10-09 15:28:09 -0500
committerImran Rashid <irashid@cloudera.com>2015-10-09 15:28:09 -0500
commit015f7ef503d5544f79512b6333326749a1f0c48b (patch)
tree990942b40bd374f632c3954cd4aab3741dd17f63 /yarn/src/main/scala/org
parent70f44ad2d836236c74e1336a7368982d5fe3abff (diff)
downloadspark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.gz
spark-015f7ef503d5544f79512b6333326749a1f0c48b.tar.bz2
spark-015f7ef503d5544f79512b6333326749a1f0c48b.zip
[SPARK-8673] [LAUNCHER] API and infrastructure for communicating with child apps.
This change adds an API that encapsulates information about an app launched using the library. It also creates a socket-based communication layer for apps that are launched as child processes; the launching application listens for connections from launched apps, and once communication is established, the channel can be used to send updates to the launching app, or to send commands to the child app. The change also includes hooks for local, standalone/client and yarn masters. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #7052 from vanzin/SPARK-8673.
Diffstat (limited to 'yarn/src/main/scala/org')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala43
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala10
2 files changed, 48 insertions, 5 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index eb3b7fb885..cec81b9406 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -55,8 +55,8 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException
import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException}
+import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils}
import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.util.Utils
private[spark] class Client(
@@ -70,8 +70,6 @@ private[spark] class Client(
def this(clientArgs: ClientArguments, spConf: SparkConf) =
this(clientArgs, SparkHadoopUtil.get.newConfiguration(spConf), spConf)
- def this(clientArgs: ClientArguments) = this(clientArgs, new SparkConf())
-
private val yarnClient = YarnClient.createYarnClient
private val yarnConf = new YarnConfiguration(hadoopConf)
private var credentials: Credentials = null
@@ -84,10 +82,27 @@ private[spark] class Client(
private var principal: String = null
private var keytab: String = null
+ private val launcherBackend = new LauncherBackend() {
+ override def onStopRequest(): Unit = {
+ if (isClusterMode && appId != null) {
+ yarnClient.killApplication(appId)
+ } else {
+ setState(SparkAppHandle.State.KILLED)
+ stop()
+ }
+ }
+ }
private val fireAndForget = isClusterMode &&
!sparkConf.getBoolean("spark.yarn.submit.waitAppCompletion", true)
+ private var appId: ApplicationId = null
+
+ def reportLauncherState(state: SparkAppHandle.State): Unit = {
+ launcherBackend.setState(state)
+ }
+
def stop(): Unit = {
+ launcherBackend.close()
yarnClient.stop()
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
@@ -103,6 +118,7 @@ private[spark] class Client(
def submitApplication(): ApplicationId = {
var appId: ApplicationId = null
try {
+ launcherBackend.connect()
// Setup the credentials before doing anything else,
// so we have don't have issues at any point.
setupCredentials()
@@ -116,6 +132,8 @@ private[spark] class Client(
val newApp = yarnClient.createApplication()
val newAppResponse = newApp.getNewApplicationResponse()
appId = newAppResponse.getApplicationId()
+ reportLauncherState(SparkAppHandle.State.SUBMITTED)
+ launcherBackend.setAppId(appId.toString())
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)
@@ -881,6 +899,20 @@ private[spark] class Client(
}
}
+ if (lastState != state) {
+ state match {
+ case YarnApplicationState.RUNNING =>
+ reportLauncherState(SparkAppHandle.State.RUNNING)
+ case YarnApplicationState.FINISHED =>
+ reportLauncherState(SparkAppHandle.State.FINISHED)
+ case YarnApplicationState.FAILED =>
+ reportLauncherState(SparkAppHandle.State.FAILED)
+ case YarnApplicationState.KILLED =>
+ reportLauncherState(SparkAppHandle.State.KILLED)
+ case _ =>
+ }
+ }
+
if (state == YarnApplicationState.FINISHED ||
state == YarnApplicationState.FAILED ||
state == YarnApplicationState.KILLED) {
@@ -928,8 +960,8 @@ private[spark] class Client(
* throw an appropriate SparkException.
*/
def run(): Unit = {
- val appId = submitApplication()
- if (fireAndForget) {
+ this.appId = submitApplication()
+ if (!launcherBackend.isConnected() && fireAndForget) {
val report = getApplicationReport(appId)
val state = report.getYarnApplicationState
logInfo(s"Application report for $appId (state: $state)")
@@ -971,6 +1003,7 @@ private[spark] class Client(
}
object Client extends Logging {
+
def main(argStrings: Array[String]) {
if (!sys.props.contains("SPARK_SUBMIT")) {
logWarning("WARNING: This client is deprecated and will be removed in a " +
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 36d5759554..20771f6554 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -23,6 +23,7 @@ import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState}
import org.apache.spark.{SparkException, Logging, SparkContext}
import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil}
+import org.apache.spark.launcher.SparkAppHandle
import org.apache.spark.scheduler.TaskSchedulerImpl
private[spark] class YarnClientSchedulerBackend(
@@ -177,6 +178,15 @@ private[spark] class YarnClientSchedulerBackend(
if (monitorThread != null) {
monitorThread.stopMonitor()
}
+
+ // Report a final state to the launcher if one is connected. This is needed since in client
+ // mode this backend doesn't let the app monitor loop run to completion, so it does not report
+ // the final state itself.
+ //
+ // Note: there's not enough information at this point to provide a better final state,
+ // so assume the application was successful.
+ client.reportLauncherState(SparkAppHandle.State.FINISHED)
+
super.stop()
YarnSparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
client.stop()