aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorDennis Huo <dhuo@google.com>2015-08-18 14:34:20 -0700
committerSandy Ryza <sandy@cloudera.com>2015-08-18 14:34:20 -0700
commit9b731fad2b43ca18f3c5274062d4c7bc2622ab72 (patch)
tree3e524f86259164e4f5b3285aee764b9f25dcdd01 /yarn
parent80cb25b228e821a80256546a2f03f73a45cf7645 (diff)
downloadspark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.tar.gz
spark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.tar.bz2
spark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.zip
[SPARK-9782] [YARN] Support YARN application tags via SparkConf
Add a new test case in yarn/ClientSuite which checks how the various SparkConf and ClientArguments propagate into the ApplicationSubmissionContext. Author: Dennis Huo <dhuo@google.com> Closes #8072 from dennishuo/dhuo-yarn-application-tags.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala21
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala36
2 files changed, 57 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 6d63ddaf15..5c6a716863 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -163,6 +163,23 @@ private[spark] class Client(
appContext.setQueue(args.amQueue)
appContext.setAMContainerSpec(containerContext)
appContext.setApplicationType("SPARK")
+ sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+ .map(StringUtils.getTrimmedStringCollection(_))
+ .filter(!_.isEmpty())
+ .foreach { tagCollection =>
+ try {
+ // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use
+ // reflection to set it, printing a warning if a tag was specified but the YARN version
+ // doesn't support it.
+ val method = appContext.getClass().getMethod(
+ "setApplicationTags", classOf[java.util.Set[String]])
+ method.invoke(appContext, new java.util.HashSet[String](tagCollection))
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " +
+ "YARN does not support it")
+ }
+ }
sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match {
case Some(v) => appContext.setMaxAppAttempts(v)
case None => logDebug("spark.yarn.maxAppAttempts is not set. " +
@@ -987,6 +1004,10 @@ object Client extends Logging {
// of the executors
val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars"
+ // Comma-separated list of strings to pass through as YARN application tags appearing
+ // in YARN ApplicationReports, which can be used for filtering when querying YARN.
+ val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags"
+
// Staging directory is private! -> rwx--------
val STAGING_DIR_PERMISSION: FsPermission =
FsPermission.createImmutable(Integer.parseInt("700", 8).toShort)
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 837f8d3fa5..0a5402c89e 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.MRJobConfig
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
+import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse
import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.YarnClientApplication
import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.Records
import org.mockito.Matchers._
import org.mockito.Mockito._
import org.scalatest.{BeforeAndAfterAll, Matchers}
@@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
cp should contain ("/remotePath/my1.jar")
}
+ test("configuration and args propagate through createApplicationSubmissionContext") {
+ val conf = new Configuration()
+ // When parsing tags, duplicates and leading/trailing whitespace should be removed.
+ // Spaces between non-comma strings should be preserved as single tags. Empty strings may or
+ // may not be removed depending on the version of Hadoop being used.
+ val sparkConf = new SparkConf()
+ .set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup")
+ .set("spark.yarn.maxAppAttempts", "42")
+ val args = new ClientArguments(Array(
+ "--name", "foo-test-app",
+ "--queue", "staging-queue"), sparkConf)
+
+ val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+ val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
+ val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
+
+ val client = new Client(args, conf, sparkConf)
+ client.createApplicationSubmissionContext(
+ new YarnClientApplication(getNewApplicationResponse, appContext),
+ containerLaunchContext)
+
+ appContext.getApplicationName should be ("foo-test-app")
+ appContext.getQueue should be ("staging-queue")
+ appContext.getAMContainerSpec should be (containerLaunchContext)
+ appContext.getApplicationType should be ("SPARK")
+ appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method =>
+ val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]]
+ tags should contain allOf ("tag1", "dup", "tag2", "multi word")
+ tags.filter(!_.isEmpty).size should be (4)
+ }
+ appContext.getMaxAppAttempts should be (42)
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =