diff options
author | Dennis Huo <dhuo@google.com> | 2015-08-18 14:34:20 -0700 |
---|---|---|
committer | Sandy Ryza <sandy@cloudera.com> | 2015-08-18 14:34:20 -0700 |
commit | 9b731fad2b43ca18f3c5274062d4c7bc2622ab72 (patch) | |
tree | 3e524f86259164e4f5b3285aee764b9f25dcdd01 /yarn | |
parent | 80cb25b228e821a80256546a2f03f73a45cf7645 (diff) | |
download | spark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.tar.gz spark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.tar.bz2 spark-9b731fad2b43ca18f3c5274062d4c7bc2622ab72.zip |
[SPARK-9782] [YARN] Support YARN application tags via SparkConf
Add a new test case in yarn/ClientSuite which checks how the various SparkConf
and ClientArguments propagate into the ApplicationSubmissionContext.
Author: Dennis Huo <dhuo@google.com>
Closes #8072 from dennishuo/dhuo-yarn-application-tags.
Diffstat (limited to 'yarn')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 21 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 36 |
2 files changed, 57 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6d63ddaf15..5c6a716863 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType("SPARK") + sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection => + try { + // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use + // reflection to set it, printing a warning if a tag was specified but the YARN version + // doesn't support it. + val method = appContext.getClass().getMethod( + "setApplicationTags", classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tagCollection)) + } catch { + case e: NoSuchMethodException => + logWarning(s"Ignoring $CONF_SPARK_YARN_APPLICATION_TAGS because this version of " + + "YARN does not support it") + } + } sparkConf.getOption("spark.yarn.maxAppAttempts").map(_.toInt) match { case Some(v) => appContext.setMaxAppAttempts(v) case None => logDebug("spark.yarn.maxAppAttempts is not set. " + @@ -987,6 +1004,10 @@ object Client extends Logging { // of the executors val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" + // Comma-separated list of strings to pass through as YARN application tags appearing + // in YARN ApplicationReports, which can be used for filtering when querying YARN. + val CONF_SPARK_YARN_APPLICATION_TAGS = "spark.yarn.tags" + // Staging directory is private! -> rwx-------- val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 837f8d3fa5..0a5402c89e 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -29,8 +29,11 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.api.ApplicationConstants.Environment +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.YarnClientApplication import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.Records import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterAll, Matchers} @@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { cp should contain ("/remotePath/my1.jar") } + test("configuration and args propagate through createApplicationSubmissionContext") { + val conf = new Configuration() + // When parsing tags, duplicates and leading/trailing whitespace should be removed. + // Spaces between non-comma strings should be preserved as single tags. Empty strings may or + // may not be removed depending on the version of Hadoop being used. + val sparkConf = new SparkConf() + .set(Client.CONF_SPARK_YARN_APPLICATION_TAGS, ",tag1, dup,tag2 , ,multi word , dup") + .set("spark.yarn.maxAppAttempts", "42") + val args = new ClientArguments(Array( + "--name", "foo-test-app", + "--queue", "staging-queue"), sparkConf) + + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + + val client = new Client(args, conf, sparkConf) + client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) + + appContext.getApplicationName should be ("foo-test-app") + appContext.getQueue should be ("staging-queue") + appContext.getAMContainerSpec should be (containerLaunchContext) + appContext.getApplicationType should be ("SPARK") + appContext.getClass.getMethods.filter(_.getName.equals("getApplicationTags")).foreach{ method => + val tags = method.invoke(appContext).asInstanceOf[java.util.Set[String]] + tags should contain allOf ("tag1", "dup", "tag2", "multi word") + tags.filter(!_.isEmpty).size should be (4) + } + appContext.getMaxAppAttempts should be (42) + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = |