diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-12-22 12:23:43 -0800 |
---|---|---|
committer | Andrew Or <andrew@databricks.com> | 2014-12-22 12:23:43 -0800 |
commit | d62da642ace17b37283eab64149545723c8474a7 (patch) | |
tree | 1cfac90a4350181f6dec25832d09e319e2eff476 /yarn/src | |
parent | fb8e85e80e50904e1e93daf30dcadef62c3b7ca1 (diff) | |
download | spark-d62da642ace17b37283eab64149545723c8474a7.tar.gz spark-d62da642ace17b37283eab64149545723c8474a7.tar.bz2 spark-d62da642ace17b37283eab64149545723c8474a7.zip |
SPARK-4447. Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
Author: Sandy Ryza <sandy@cloudera.com>
Closes #3652 from sryza/sandy-spark-4447 and squashes the following commits:
2791158 [Sandy Ryza] Review feedback
c23507b [Sandy Ryza] Strip margin from client arguments help string
18be7ba [Sandy Ryza] SPARK-4447
Diffstat (limited to 'yarn/src')
6 files changed, 234 insertions, 372 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index dc7a078446..b2e45435c4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -511,7 +511,7 @@ object ApplicationMaster extends Logging { SignalLogger.register(log) val amArgs = new ApplicationMasterArguments(args) SparkHadoopUtil.get.runAsSparkUser { () => - master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs)) + master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs)) System.exit(master.run()) } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index c439969510..7305249f80 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -197,6 +197,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) | to work with. | --files files Comma separated list of files to be distributed with the job. | --archives archives Comma separated list of archives to be distributed with the job. - """ + """.stripMargin } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala deleted file mode 100644 index 2bbf5d7db8..0000000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ /dev/null @@ -1,213 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} - -import org.apache.spark.{SecurityManager, SparkConf} -import org.apache.spark.scheduler.SplitInfo - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.util.Records - -/** - * Acquires resources for executors from a ResourceManager and launches executors in new containers. - */ -private[yarn] class YarnAllocationHandler( - conf: Configuration, - sparkConf: SparkConf, - amClient: AMRMClient[ContainerRequest], - appAttemptId: ApplicationAttemptId, - args: ApplicationMasterArguments, - preferredNodes: collection.Map[String, collection.Set[SplitInfo]], - securityMgr: SecurityManager) - extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) { - - override protected def releaseContainer(container: Container) = { - amClient.releaseAssignedContainer(container.getId()) - } - - // pending isn't used on stable as the AMRMClient handles incremental asks - override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = { - addResourceRequests(count) - - // We have already set the container request. Poll the ResourceManager for a response. - // This doubles as a heartbeat if there are no pending container requests. - val progressIndicator = 0.1f - new StableAllocateResponse(amClient.allocate(progressIndicator)) - } - - private def createRackResourceRequests( - hostContainers: ArrayBuffer[ContainerRequest] - ): ArrayBuffer[ContainerRequest] = { - // Generate modified racks and new set of hosts under it before issuing requests. - val rackToCounts = new HashMap[String, Int]() - - for (container <- hostContainers) { - val candidateHost = container.getNodes.last - assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost) - - val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) - if (rack != null) { - var count = rackToCounts.getOrElse(rack, 0) - count += 1 - rackToCounts.put(rack, count) - } - } - - val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size) - for ((rack, count) <- rackToCounts) { - requestedContainers ++= createResourceRequests( - AllocationType.RACK, - rack, - count, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - } - - requestedContainers - } - - private def addResourceRequests(numExecutors: Int) { - val containerRequests: List[ContainerRequest] = - if (numExecutors <= 0) { - logDebug("numExecutors: " + numExecutors) - List() - } else if (preferredHostToCount.isEmpty) { - logDebug("host preferences is empty") - createResourceRequests( - AllocationType.ANY, - resource = null, - numExecutors, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList - } else { - // Request for all hosts in preferred nodes and for numExecutors - - // candidates.size, request by default allocation policy. - val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) - for ((candidateHost, candidateCount) <- preferredHostToCount) { - val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) - - if (requiredCount > 0) { - hostContainerRequests ++= createResourceRequests( - AllocationType.HOST, - candidateHost, - requiredCount, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - } - } - val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests( - hostContainerRequests).toList - - val anyContainerRequests = createResourceRequests( - AllocationType.ANY, - resource = null, - numExecutors, - YarnSparkHadoopUtil.RM_REQUEST_PRIORITY) - - val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( - hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size) - - containerRequestBuffer ++= hostContainerRequests - containerRequestBuffer ++= rackContainerRequests - containerRequestBuffer ++= anyContainerRequests - containerRequestBuffer.toList - } - - for (request <- containerRequests) { - amClient.addContainerRequest(request) - } - - for (request <- containerRequests) { - val nodes = request.getNodes - var hostStr = if (nodes == null || nodes.isEmpty) { - "Any" - } else { - nodes.last - } - logInfo("Container request (host: %s, priority: %s, capability: %s".format( - hostStr, - request.getPriority().getPriority, - request.getCapability)) - } - } - - private def createResourceRequests( - requestType: AllocationType.AllocationType, - resource: String, - numExecutors: Int, - priority: Int - ): ArrayBuffer[ContainerRequest] = { - - // If hostname is specified, then we need at least two requests - node local and rack local. - // There must be a third request, which is ANY. That will be specially handled. - requestType match { - case AllocationType.HOST => { - assert(YarnSparkHadoopUtil.ANY_HOST != resource) - val hostname = resource - val nodeLocal = constructContainerRequests( - Array(hostname), - racks = null, - numExecutors, - priority) - - // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. - YarnSparkHadoopUtil.populateRackInfo(conf, hostname) - nodeLocal - } - case AllocationType.RACK => { - val rack = resource - constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) - } - case AllocationType.ANY => constructContainerRequests( - hosts = null, racks = null, numExecutors, priority) - case _ => throw new IllegalArgumentException( - "Unexpected/unsupported request type: " + requestType) - } - } - - private def constructContainerRequests( - hosts: Array[String], - racks: Array[String], - numExecutors: Int, - priority: Int - ): ArrayBuffer[ContainerRequest] = { - - val memoryRequest = executorMemory + memoryOverhead - val resource = Resource.newInstance(memoryRequest, executorCores) - - val prioritySetting = Records.newRecord(classOf[Priority]) - prioritySetting.setPriority(priority) - - val requests = new ArrayBuffer[ContainerRequest]() - for (i <- 0 until numExecutors) { - requests += new ContainerRequest(resource, hosts, racks, prioritySetting) - } - requests - } - - private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse { - override def getAllocatedContainers() = response.getAllocatedContainers() - override def getAvailableResources() = response.getAvailableResources() - override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses() - } - -} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index b32e15738f..de65ef23ad 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.util.{List => JList} import java.util.concurrent._ import java.util.concurrent.atomic.AtomicInteger import java.util.regex.Pattern @@ -25,17 +24,20 @@ import java.util.regex.Pattern import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} +import com.google.common.util.concurrent.ThreadFactoryBuilder + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv} +import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl} import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend -import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ - object AllocationType extends Enumeration { type AllocationType = Value val HOST, RACK, ANY = Value @@ -52,12 +54,12 @@ object AllocationType extends Enumeration { // more info on how we are requesting for containers. /** - * Common code for the Yarn container allocator. Contains all the version-agnostic code to - * manage container allocation for a running Spark application. + * Acquires resources for executors from a ResourceManager and launches executors in new containers. */ -private[yarn] abstract class YarnAllocator( +private[yarn] class YarnAllocator( conf: Configuration, sparkConf: SparkConf, + amClient: AMRMClient[ContainerRequest], appAttemptId: ApplicationAttemptId, args: ApplicationMasterArguments, preferredNodes: collection.Map[String, collection.Set[SplitInfo]], @@ -67,7 +69,7 @@ private[yarn] abstract class YarnAllocator( import YarnAllocator._ // These three are locked on allocatedHostToContainersMap. Complementary data structures - // allocatedHostToContainersMap : containers which are running : host, Set<containerid> + // allocatedHostToContainersMap : containers which are running : host, Set<ContainerId> // allocatedContainerToHostMap: container to host mapping. private val allocatedHostToContainersMap = new HashMap[String, collection.mutable.Set[ContainerId]]() @@ -161,8 +163,6 @@ private[yarn] abstract class YarnAllocator( def allocateResources(): Unit = synchronized { val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get() - // this is needed by alpha, do it here since we add numPending right after this - val executorsPending = numPendingAllocate.get() if (missing > 0) { val totalExecutorMemory = executorMemory + memoryOverhead numPendingAllocate.addAndGet(missing) @@ -172,7 +172,7 @@ private[yarn] abstract class YarnAllocator( logDebug("Empty allocation request ...") } - val allocateResponse = allocateContainers(missing, executorsPending) + val allocateResponse = allocateContainers(missing) val allocatedContainers = allocateResponse.getAllocatedContainers() if (allocatedContainers.size > 0) { @@ -368,7 +368,7 @@ private[yarn] abstract class YarnAllocator( val containerId = completedContainer.getContainerId if (releasedContainers.containsKey(containerId)) { - // YarnAllocationHandler already marked the container for release, so remove it from + // Already marked the container for release, so remove it from // `releasedContainers`. releasedContainers.remove(containerId) } else { @@ -441,20 +441,16 @@ private[yarn] abstract class YarnAllocator( } } - protected def allocatedContainersOnHost(host: String): Int = { - var retval = 0 + private def allocatedContainersOnHost(host: String): Int = { allocatedHostToContainersMap.synchronized { - retval = allocatedHostToContainersMap.getOrElse(host, Set()).size + allocatedHostToContainersMap.getOrElse(host, Set()).size } - retval } - protected def allocatedContainersOnRack(rack: String): Int = { - var retval = 0 + private def allocatedContainersOnRack(rack: String): Int = { allocatedHostToContainersMap.synchronized { - retval = allocatedRackCount.getOrElse(rack, 0) + allocatedRackCount.getOrElse(rack, 0) } - retval } private def isResourceConstraintSatisfied(container: Container): Boolean = { @@ -464,9 +460,8 @@ private[yarn] abstract class YarnAllocator( // A simple method to copy the split info map. private def generateNodeToWeight( conf: Configuration, - input: collection.Map[String, collection.Set[SplitInfo]] - ): (Map[String, Int], Map[String, Int]) = { - + input: collection.Map[String, collection.Set[SplitInfo]]) + : (Map[String, Int], Map[String, Int]) = { if (input == null) { return (Map[String, Int](), Map[String, Int]()) } @@ -488,9 +483,9 @@ private[yarn] abstract class YarnAllocator( (hostToCount.toMap, rackToCount.toMap) } - private def internalReleaseContainer(container: Container) = { + private def internalReleaseContainer(container: Container): Unit = { releasedContainers.put(container.getId(), true) - releaseContainer(container) + amClient.releaseAssignedContainer(container.getId()) } /** @@ -498,26 +493,158 @@ private[yarn] abstract class YarnAllocator( * * @param count Number of containers to allocate. * If zero, should still contact RM (as a heartbeat). - * @param pending Number of containers pending allocate. Only used on alpha. * @return Response to the allocation request. */ - protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse + private def allocateContainers(count: Int): AllocateResponse = { + addResourceRequests(count) + + // We have already set the container request. Poll the ResourceManager for a response. + // This doubles as a heartbeat if there are no pending container requests. + val progressIndicator = 0.1f + amClient.allocate(progressIndicator) + } - /** Called to release a previously allocated container. */ - protected def releaseContainer(container: Container): Unit + private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest]) + : ArrayBuffer[ContainerRequest] = { + // Generate modified racks and new set of hosts under it before issuing requests. + val rackToCounts = new HashMap[String, Int]() - /** - * Defines the interface for an allocate response from the RM. This is needed since the alpha - * and stable interfaces differ here in ways that cannot be fixed using other routes. - */ - protected trait YarnAllocateResponse { + for (container <- hostContainers) { + val candidateHost = container.getNodes.last + assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost) + + val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost) + if (rack != null) { + var count = rackToCounts.getOrElse(rack, 0) + count += 1 + rackToCounts.put(rack, count) + } + } + + val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size) + for ((rack, count) <- rackToCounts) { + requestedContainers ++= createResourceRequests( + AllocationType.RACK, + rack, + count, + RM_REQUEST_PRIORITY) + } + + requestedContainers + } + + private def addResourceRequests(numExecutors: Int): Unit = { + val containerRequests: List[ContainerRequest] = + if (numExecutors <= 0) { + logDebug("numExecutors: " + numExecutors) + List() + } else if (preferredHostToCount.isEmpty) { + logDebug("host preferences is empty") + createResourceRequests( + AllocationType.ANY, + resource = null, + numExecutors, + RM_REQUEST_PRIORITY).toList + } else { + // Request for all hosts in preferred nodes and for numExecutors - + // candidates.size, request by default allocation policy. + val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size) + for ((candidateHost, candidateCount) <- preferredHostToCount) { + val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) + + if (requiredCount > 0) { + hostContainerRequests ++= createResourceRequests( + AllocationType.HOST, + candidateHost, + requiredCount, + RM_REQUEST_PRIORITY) + } + } + val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests( + hostContainerRequests).toList + + val anyContainerRequests = createResourceRequests( + AllocationType.ANY, + resource = null, + numExecutors, + RM_REQUEST_PRIORITY) + + val containerRequestBuffer = new ArrayBuffer[ContainerRequest]( + hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size) + + containerRequestBuffer ++= hostContainerRequests + containerRequestBuffer ++= rackContainerRequests + containerRequestBuffer ++= anyContainerRequests + containerRequestBuffer.toList + } - def getAllocatedContainers(): JList[Container] + for (request <- containerRequests) { + amClient.addContainerRequest(request) + } - def getAvailableResources(): Resource + for (request <- containerRequests) { + val nodes = request.getNodes + val hostStr = if (nodes == null || nodes.isEmpty) { + "Any" + } else { + nodes.last + } + logInfo("Container request (host: %s, priority: %s, capability: %s".format( + hostStr, + request.getPriority().getPriority, + request.getCapability)) + } + } - def getCompletedContainersStatuses(): JList[ContainerStatus] + private def createResourceRequests( + requestType: AllocationType.AllocationType, + resource: String, + numExecutors: Int, + priority: Int): ArrayBuffer[ContainerRequest] = { + // If hostname is specified, then we need at least two requests - node local and rack local. + // There must be a third request, which is ANY. That will be specially handled. + requestType match { + case AllocationType.HOST => { + assert(YarnSparkHadoopUtil.ANY_HOST != resource) + val hostname = resource + val nodeLocal = constructContainerRequests( + Array(hostname), + racks = null, + numExecutors, + priority) + + // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler. + YarnSparkHadoopUtil.populateRackInfo(conf, hostname) + nodeLocal + } + case AllocationType.RACK => { + val rack = resource + constructContainerRequests(hosts = null, Array(rack), numExecutors, priority) + } + case AllocationType.ANY => constructContainerRequests( + hosts = null, racks = null, numExecutors, priority) + case _ => throw new IllegalArgumentException( + "Unexpected/unsupported request type: " + requestType) + } + } + private def constructContainerRequests( + hosts: Array[String], + racks: Array[String], + numExecutors: Int, + priority: Int + ): ArrayBuffer[ContainerRequest] = { + val memoryRequest = executorMemory + memoryOverhead + val resource = Resource.newInstance(memoryRequest, executorCores) + + val prioritySetting = Records.newRecord(classOf[Priority]) + prioritySetting.setPriority(priority) + + val requests = new ArrayBuffer[ContainerRequest]() + for (i <- 0 until numExecutors) { + requests += new ContainerRequest(resource, hosts, racks, prioritySetting) + } + requests } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala index 2510b9c9ce..bf4e15908b 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala @@ -17,19 +17,33 @@ package org.apache.spark.deploy.yarn +import java.util.{List => JList} + +import scala.collection.JavaConversions._ import scala.collection.{Map, Set} +import scala.util.Try -import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records._ +import org.apache.hadoop.yarn.client.api.AMRMClient +import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.hadoop.yarn.webapp.util.WebAppUtils -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{Logging, SecurityManager, SparkConf} import org.apache.spark.scheduler.SplitInfo +import org.apache.spark.util.Utils /** - * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that - * is used by Spark's AM. + * Handles registering and unregistering the application with the YARN ResourceManager. */ -trait YarnRMClient { +private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging { + + private var amClient: AMRMClient[ContainerRequest] = _ + private var uiHistoryAddress: String = _ + private var registered: Boolean = false /** * Registers the application master with the RM. @@ -46,7 +60,21 @@ trait YarnRMClient { preferredNodeLocations: Map[String, Set[SplitInfo]], uiAddress: String, uiHistoryAddress: String, - securityMgr: SecurityManager): YarnAllocator + securityMgr: SecurityManager + ): YarnAllocator = { + amClient = AMRMClient.createAMRMClient() + amClient.init(conf) + amClient.start() + this.uiHistoryAddress = uiHistoryAddress + + logInfo("Registering the ApplicationMaster") + synchronized { + amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) + registered = true + } + new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, + preferredNodeLocations, securityMgr) + } /** * Unregister the AM. Guaranteed to only be called once. @@ -54,15 +82,45 @@ trait YarnRMClient { * @param status The final status of the AM. * @param diagnostics Diagnostics message to include in the final status. */ - def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit + def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized { + if (registered) { + amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) + } + } /** Returns the attempt ID. */ - def getAttemptId(): ApplicationAttemptId + def getAttemptId(): ApplicationAttemptId = { + val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) + val containerId = ConverterUtils.toContainerId(containerIdString) + containerId.getApplicationAttemptId() + } /** Returns the configuration for the AmIpFilter to add to the Spark UI. */ - def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] + def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = { + // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, + // so not all stable releases have it. + val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) + .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") + + // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. + try { + val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", + classOf[Configuration]) + val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] + val hosts = proxies.map { proxy => proxy.split(":")(0) } + val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } + Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) + } catch { + case e: NoSuchMethodException => + val proxy = WebAppUtils.getProxyHostAndPort(conf) + val parts = proxy.split(":") + val uriBase = prefix + proxy + proxyBase + Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) + } + } /** Returns the maximum number of attempts to register the AM. */ - def getMaxRegAttempts(conf: YarnConfiguration): Int + def getMaxRegAttempts(conf: YarnConfiguration): Int = + conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala deleted file mode 100644 index 8d4b96ed79..0000000000 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.deploy.yarn - -import java.util.{List => JList} - -import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ -import scala.util._ - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api._ -import org.apache.hadoop.yarn.api.protocolrecords._ -import org.apache.hadoop.yarn.api.records._ -import org.apache.hadoop.yarn.client.api.AMRMClient -import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest -import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.hadoop.yarn.webapp.util.WebAppUtils - -import org.apache.spark.{Logging, SecurityManager, SparkConf} -import org.apache.spark.scheduler.SplitInfo -import org.apache.spark.util.Utils - - -/** - * YarnRMClient implementation for the Yarn stable API. - */ -private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging { - - private var amClient: AMRMClient[ContainerRequest] = _ - private var uiHistoryAddress: String = _ - private var registered: Boolean = false - - override def register( - conf: YarnConfiguration, - sparkConf: SparkConf, - preferredNodeLocations: Map[String, Set[SplitInfo]], - uiAddress: String, - uiHistoryAddress: String, - securityMgr: SecurityManager) = { - amClient = AMRMClient.createAMRMClient() - amClient.init(conf) - amClient.start() - this.uiHistoryAddress = uiHistoryAddress - - logInfo("Registering the ApplicationMaster") - synchronized { - amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress) - registered = true - } - new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args, - preferredNodeLocations, securityMgr) - } - - override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized { - if (registered) { - amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress) - } - } - - override def getAttemptId() = { - val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) - val containerId = ConverterUtils.toContainerId(containerIdString) - val appAttemptId = containerId.getApplicationAttemptId() - appAttemptId - } - - override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = { - // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2, - // so not all stable releases have it. - val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration]) - .invoke(null, conf).asInstanceOf[String]).getOrElse("http://") - - // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses. - try { - val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter", - classOf[Configuration]) - val proxies = method.invoke(null, conf).asInstanceOf[JList[String]] - val hosts = proxies.map { proxy => proxy.split(":")(0) } - val uriBases = proxies.map { proxy => prefix + proxy + proxyBase } - Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(",")) - } catch { - case e: NoSuchMethodException => - val proxy = WebAppUtils.getProxyHostAndPort(conf) - val parts = proxy.split(":") - val uriBase = prefix + proxy + proxyBase - Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase) - } - } - - override def getMaxRegAttempts(conf: YarnConfiguration) = - conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS) - -} |