aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md9
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala31
2 files changed, 39 insertions, 1 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 4fb4a90307..51c1339165 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -220,6 +220,15 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Otherwise, the client process will exit after submission.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.executor.nodeLabelExpression</code></td>
+ <td>(none)</td>
+ <td>
+ A YARN node label expression that restricts the set of nodes executors will be scheduled on.
+ Only versions of YARN greater than or equal to 2.6 support node label expressions, so when
+ running against earlier versions, this property will be ignored.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 88d68d5556..8a08f561a2 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -117,6 +117,24 @@ private[yarn] class YarnAllocator(
// For testing
private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
+ private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression")
+
+ // ContainerRequest constructor that can take a node label expression. We grab it through
+ // reflection because it's only available in later versions of YARN.
+ private val nodeLabelConstructor = labelExpression.flatMap { expr =>
+ try {
+ Some(classOf[ContainerRequest].getConstructor(classOf[Resource],
+ classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean],
+ classOf[String]))
+ } catch {
+ case e: NoSuchMethodException => {
+ logWarning(s"Node label expression $expr will be ignored because YARN version on" +
+ " classpath does not support it.")
+ None
+ }
+ }
+ }
+
def getNumExecutorsRunning: Int = numExecutorsRunning
def getNumExecutorsFailed: Int = numExecutorsFailed
@@ -211,7 +229,7 @@ private[yarn] class YarnAllocator(
s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
for (i <- 0 until missing) {
- val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+ val request = createContainerRequest(resource)
amClient.addContainerRequest(request)
val nodes = request.getNodes
val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
@@ -231,6 +249,17 @@ private[yarn] class YarnAllocator(
}
/**
+ * Creates a container request, handling the reflection required to use YARN features that were
+ * added in recent versions.
+ */
+ private def createContainerRequest(resource: Resource): ContainerRequest = {
+ nodeLabelConstructor.map { constructor =>
+ constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean,
+ labelExpression.orNull)
+ }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY))
+ }
+
+ /**
* Handle containers granted by the RM by launching executors on them.
*
* Due to the way the YARN allocation protocol works, certain healthy race conditions can result