diff options
author | jerryshao <sshao@hortonworks.com> | 2015-11-23 10:41:17 -0800 |
---|---|---|
committer | Marcelo Vanzin <vanzin@cloudera.com> | 2015-11-23 10:41:17 -0800 |
commit | 5fd86e4fc2e06d2403ca538ae417580c93b69e06 (patch) | |
tree | b6daea5f6d8da1ccc293e3cafff00ea6efd5d8e3 | |
parent | 946b406519af58c79041217e6f93854b6cf80acd (diff) | |
download | spark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.tar.gz spark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.tar.bz2 spark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.zip |
[SPARK-7173][YARN] Add label expression support for application master
Add label expression support for AM to restrict it runs on the specific set of nodes. I tested it locally and works fine.
sryza and vanzin please help to review, thanks a lot.
Author: jerryshao <sshao@hortonworks.com>
Closes #9800 from jerryshao/SPARK-7173.
-rw-r--r-- | docs/running-on-yarn.md | 9 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 26 |
2 files changed, 34 insertions, 1 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index db6bfa69ee..925a1e0ba6 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -327,6 +327,15 @@ If you need a reference to the proper location to put log files in the YARN so t </td> </tr> <tr> + <td><code>spark.yarn.am.nodeLabelExpression</code></td> + <td>(none)</td> + <td> + A YARN node label expression that restricts the set of nodes AM will be scheduled on. + Only versions of YARN greater than or equal to 2.6 support node label expressions, so when + running against earlier versions, this property will be ignored. + </td> +</tr> +<tr> <td><code>spark.yarn.executor.nodeLabelExpression</code></td> <td>(none)</td> <td> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index ba799884f5..a77a3e2420 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -225,7 +225,31 @@ private[spark] class Client( val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) capability.setVirtualCores(args.amCores) - appContext.setResource(capability) + + if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) { + try { + val amRequest = Records.newRecord(classOf[ResourceRequest]) + amRequest.setResourceName(ResourceRequest.ANY) + amRequest.setPriority(Priority.newInstance(0)) + amRequest.setCapability(capability) + amRequest.setNumContainers(1) + val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression") + val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String]) + method.invoke(amRequest, amLabelExpression) + + val setResourceRequestMethod = + appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest]) + setResourceRequestMethod.invoke(appContext, amRequest) + } catch { + case e: NoSuchMethodException => + logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " + + "of YARN does not support it") + appContext.setResource(capability) + } + } else { + appContext.setResource(capability) + } + appContext } |