aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorWeiqing Yang <yangweiqing001@gmail.com>2016-11-11 18:36:23 -0800
committerMridul Muralidharan <mridul@gmail.com>2016-11-11 18:36:23 -0800
commit3af894511be6fcc17731e28b284dba432fe911f5 (patch)
tree0d0911d6885d62ab63684c05123f1a8d1fd688b8 /yarn/src
parent46b2550bcd3690a260b995fd4d024a73b92a0299 (diff)
downloadspark-3af894511be6fcc17731e28b284dba432fe911f5.tar.gz
spark-3af894511be6fcc17731e28b284dba432fe911f5.tar.bz2
spark-3af894511be6fcc17731e28b284dba432fe911f5.zip
[SPARK-16759][CORE] Add a configuration property to pass caller contexts of upstream applications into Spark
## What changes were proposed in this pull request? Many applications take Spark as a computing engine and run on it. This PR adds a configuration property `spark.log.callerContext` that can be used by Spark's upstream applications (e.g. Oozie) to set up their caller contexts into Spark. In the end, Spark will combine its own caller context with the caller contexts of its upstream applications, and write them into Yarn RM log and HDFS audit log. The audit log has a config to truncate the caller contexts passed in (default 128). The caller contexts will be sent over rpc, so it should be concise. The call context written into HDFS log and Yarn log consists of two parts: the information `A` specified by Spark itself and the value `B` of `spark.log.callerContext` property. Currently `A` typically takes 64 to 74 characters, so `B` can have up to 50 characters (mentioned in the doc `running-on-yarn.md`) ## How was this patch tested? Manual tests. I have run some Spark applications with `spark.log.callerContext` configuration in Yarn client/cluster mode, and verified that the caller contexts were written into Yarn RM log and HDFS audit log correctly. The ways to configure `spark.log.callerContext` property: - In spark-defaults.conf: ``` spark.log.callerContext infoSpecifiedByUpstreamApp ``` - In app's source code: ``` val spark = SparkSession .builder .appName("SparkKMeans") .config("spark.log.callerContext", "infoSpecifiedByUpstreamApp") .getOrCreate() ``` When running on Spark Yarn cluster mode, the driver is unable to pass 'spark.log.callerContext' to Yarn client and AM since Yarn client and AM have already started before the driver performs `.config("spark.log.callerContext", "infoSpecifiedByUpstreamApp")`. The following example shows the command line used to submit a SparkKMeans application and the corresponding records in Yarn RM log and HDFS audit log. Command: ``` ./bin/spark-submit --verbose --executor-cores 3 --num-executors 1 --master yarn --deploy-mode client --class org.apache.spark.examples.SparkKMeans examples/target/original-spark-examples_2.11-2.1.0-SNAPSHOT.jar hdfs://localhost:9000/lr_big.txt 2 5 ``` Yarn RM log: <img width="1440" alt="screen shot 2016-10-19 at 9 12 03 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547050/7d2f278c-9649-11e6-9df8-8d5ff12609f0.png"> HDFS audit log: <img width="1400" alt="screen shot 2016-10-19 at 10 18 14 pm" src="https://cloud.githubusercontent.com/assets/8546874/19547102/096060ae-964a-11e6-981a-cb28efd5a058.png"> Author: Weiqing Yang <yangweiqing001@gmail.com> Closes #15563 from weiqingy/SPARK-16759.
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala3
2 files changed, 4 insertions, 2 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f2b9dfb4d1..918cc2dd04 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -202,7 +202,8 @@ private[spark] class ApplicationMaster(
attemptID = Option(appAttemptId.getAttemptId.toString)
}
- new CallerContext("APPMASTER",
+ new CallerContext(
+ "APPMASTER", sparkConf.get(APP_CALLER_CONTEXT),
Option(appAttemptId.getApplicationId.toString), attemptID).setCurrentContext()
logInfo("ApplicationAttemptId: " + appAttemptId)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index e77fa386dc..1b75688b28 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -161,7 +161,8 @@ private[spark] class Client(
reportLauncherState(SparkAppHandle.State.SUBMITTED)
launcherBackend.setAppId(appId.toString)
- new CallerContext("CLIENT", Option(appId.toString)).setCurrentContext()
+ new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
+ Option(appId.toString)).setCurrentContext()
// Verify whether the cluster has enough resources for our AM
verifyClusterResources(newAppResponse)