aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md8
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala21
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala36
3 files changed, 65 insertions, 0 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index ec32c419b7..8ac26e98da 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -320,6 +320,14 @@ If you need a reference to the proper location to put log files in the YARN so t
</td>
</tr>
<tr>
+ <td><code>spark.yarn.tags</code></td>
+ <td>(none)</td>
+ <td>
+ Comma-separated list of strings to pass through as YARN application tags appearing
+ in YARN ApplicationReports, which can be used for filtering when querying YARN apps.
+ </td>
+</tr>
+<tr>
<td><code>spark.yarn.keytab</code></td>
<td>(none)</td>
<td>
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6d63ddaf15..5c6a716863 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -163,6 +163,23 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
+ sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+ .map(StringUtils.getTrimmedStringCollection(_))
+ .filter(!_.isEmpty())
+ .foreach { tagCollection =>
+ try {
+ // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
+ // reflection to set it, printing a warning if a tag was specified but the YARN version
+ // doesn't support it.
+ val method = appContext.getClass().getMethod(
+ "setApplicationTags", classOf[java.util.Set[String]])
+ method.invoke(appContext, new java.util.HashSet[String](tagCollection))
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
+ "YARN does not support it")
+ }
+ }
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
@@ -987,6 +1004,10 @@ object Client extends Logging {
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
+ // Comma-separated list of strings to pass through as YARN application tags appearing
+ // in YARN ApplicationReports, which can be used for filtering when querying YARN.
+ val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
+
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 837f8d3fa5..0a5402c89e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.Records
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
cp should contain ("/remotePath/my1.jar")
}
+ test("configuration and args propagate through createApplicationSubmissionContext") {
+ val conf = new Configuration()
+ // When parsing tags, duplicates and leading/trailing whitespace should be removed.
+ // Spaces between non-comma strings should be preserved as single tags. Empty strings may or
+ // may not be removed depending on the version of Hadoop being used.
+ val sparkConf = new SparkConf()
+ .set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
+ .set("spark.yarn.maxAppAttempts", "42")
+ val args = new ClientArguments(Array(
+ "--name", "foo-test-app",
+ "--queue", "staging-queue"), sparkConf)
+
+ val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, conf, sparkConf)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getApplicationName should be ("foo-test-app")
+ appContext.getQueue should be ("staging-queue")
+ appContext.getAMContainerSpec should be (containerLaunchContext)
+ appContext.getApplicationType should be ("SPARK")
+ appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
+ val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
+ tags should contain allOf ("tag1", "dup", "tag2", "multi word")
+ tags.filter(!_.isEmpty).size should be (4)
+ }
+ appContext.getMaxAppAttempts should be (42)
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =