diff options
author | Sandy Ryza <sandy@cloudera.com> | 2015-05-11 12:09:39 -0700 |
---|---|---|
committer | Sandy Ryza <sandy@cloudera.com> | 2015-05-11 12:09:39 -0700 |
commit | 82fee9d9aad2c9ba2fb4bd658579fe99218cafac (patch) | |
tree | ff383391eba6cc7a9038885fade81fe14b0c3182 /yarn/src | |
parent | 0a4844f90a712e796c9404b422cea76d21a5d2e3 (diff) | |
download | spark-82fee9d9aad2c9ba2fb4bd658579fe99218cafac.tar.gz spark-82fee9d9aad2c9ba2fb4bd658579fe99218cafac.tar.bz2 spark-82fee9d9aad2c9ba2fb4bd658579fe99218cafac.zip |
[SPARK-6470] [YARN] Add support for YARN node labels.
This is difficult to write a test for because it relies on the latest version of YARN, but I verified manually that the patch does pass along the label expression on this version and containers are successfully launched.
Author: Sandy Ryza <sandy@cloudera.com>
Closes #5242 from sryza/sandy-spark-6470 and squashes the following commits:
6af87b9 [Sandy Ryza] Change info to warning
6e22d99 [Sandy Ryza] [YARN] SPARK-6470. Add support for YARN node labels.
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 88d68d5556..8a08f561a2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -117,6 +117,24 @@ private[yarn] class YarnAllocator( // For testing private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true) + private val labelExpression = sparkConf.getOption("spark.yarn.executor.nodeLabelExpression") + + // ContainerRequest constructor that can take a node label expression. We grab it through + // reflection because it's only available in later versions of YARN. + private val nodeLabelConstructor = labelExpression.flatMap { expr => + try { + Some(classOf[ContainerRequest].getConstructor(classOf[Resource], + classOf[Array[String]], classOf[Array[String]], classOf[Priority], classOf[Boolean], + classOf[String])) + } catch { + case e: NoSuchMethodException => { + logWarning(s"Node label expression $expr will be ignored because YARN version on" + + " classpath does not support it.") + None + } + } + } + def getNumExecutorsRunning: Int = numExecutorsRunning def getNumExecutorsFailed: Int = numExecutorsFailed @@ -211,7 +229,7 @@ private[yarn] class YarnAllocator( s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead") for (i <- 0 until missing) { - val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY) + val request = createContainerRequest(resource) amClient.addContainerRequest(request) val nodes = request.getNodes val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last @@ -231,6 +249,17 @@ private[yarn] class YarnAllocator( } /** + * Creates a container request, handling the reflection required to use YARN features that were + * added in recent versions. + */ + private def createContainerRequest(resource: Resource): ContainerRequest = { + nodeLabelConstructor.map { constructor => + constructor.newInstance(resource, null, null, RM_REQUEST_PRIORITY, true: java.lang.Boolean, + labelExpression.orNull) + }.getOrElse(new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)) + } + + /** * Handle containers granted by the RM by launching executors on them. * * Due to the way the YARN allocation protocol works, certain healthy race conditions can result |