aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorjerryshao <sshao@hortonworks.com>2015-11-23 10:41:17 -0800
committerMarcelo Vanzin <vanzin@cloudera.com>2015-11-23 10:41:17 -0800
commit5fd86e4fc2e06d2403ca538ae417580c93b69e06 (patch)
treeb6daea5f6d8da1ccc293e3cafff00ea6efd5d8e3 /yarn
parent946b406519af58c79041217e6f93854b6cf80acd (diff)
downloadspark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.tar.gz
spark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.tar.bz2
spark-5fd86e4fc2e06d2403ca538ae417580c93b69e06.zip
[SPARK-7173][YARN] Add label expression support for application master
Add label expression support for AM to restrict it runs on the specific set of nodes. I tested it locally and works fine. sryza and vanzin please help to review, thanks a lot. Author: jerryshao <sshao@hortonworks.com> Closes #9800 from jerryshao/SPARK-7173.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala26
1 files changed, 25 insertions, 1 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index ba799884f5..a77a3e2420 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -225,7 +225,31 @@ private[spark] class Client(
val capability = Records.newRecord(classOf[Resource])
capability.setMemory(args.amMemory + amMemoryOverhead)
capability.setVirtualCores(args.amCores)
- appContext.setResource(capability)
+
+ if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
+ try {
+ val amRequest = Records.newRecord(classOf[ResourceRequest])
+ amRequest.setResourceName(ResourceRequest.ANY)
+ amRequest.setPriority(Priority.newInstance(0))
+ amRequest.setCapability(capability)
+ amRequest.setNumContainers(1)
+ val amLabelExpression = sparkConf.get("spark.yarn.am.nodeLabelExpression")
+ val method = amRequest.getClass.getMethod("setNodeLabelExpression", classOf[String])
+ method.invoke(amRequest, amLabelExpression)
+
+ val setResourceRequestMethod =
+ appContext.getClass.getMethod("setAMContainerResourceRequest", classOf[ResourceRequest])
+ setResourceRequestMethod.invoke(appContext, amRequest)
+ } catch {
+ case e: NoSuchMethodException =>
+ logWarning("Ignoring spark.yarn.am.nodeLabelExpression because the version " +
+ "of YARN does not support it")
+ appContext.setResource(capability)
+ }
+ } else {
+ appContext.setResource(capability)
+ }
+
appContext
}