aboutsummaryrefslogtreecommitdiff
path: root/yarn/src
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-12-22 12:23:43 -0800
committerAndrew Or <andrew@databricks.com>2014-12-22 12:23:43 -0800
commitd62da642ace17b37283eab64149545723c8474a7 (patch)
tree1cfac90a4350181f6dec25832d09e319e2eff476 /yarn/src
parentfb8e85e80e50904e1e93daf30dcadef62c3b7ca1 (diff)
downloadspark-d62da642ace17b37283eab64149545723c8474a7.tar.gz
spark-d62da642ace17b37283eab64149545723c8474a7.tar.bz2
spark-d62da642ace17b37283eab64149545723c8474a7.zip
SPARK-4447. Remove layers of abstraction in YARN code no longer needed after dropping yarn-alpha
Author: Sandy Ryza <sandy@cloudera.com> Closes #3652 from sryza/sandy-spark-4447 and squashes the following commits: 2791158 [Sandy Ryza] Review feedback c23507b [Sandy Ryza] Strip margin from client arguments help string 18be7ba [Sandy Ryza] SPARK-4447
Diffstat (limited to 'yarn/src')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala2
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala213
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala201
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala78
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala110
6 files changed, 234 insertions, 372 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index dc7a078446..b2e45435c4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -511,7 +511,7 @@ object ApplicationMaster extends Logging {
SignalLogger.register(log)
val amArgs = new ApplicationMasterArguments(args)
SparkHadoopUtil.get.runAsSparkUser { () =>
- master = new ApplicationMaster(amArgs, new YarnRMClientImpl(amArgs))
+ master = new ApplicationMaster(amArgs, new YarnRMClient(amArgs))
System.exit(master.run())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
index c439969510..7305249f80 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
@@ -197,6 +197,6 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
| to work with.
| --files files Comma separated list of files to be distributed with the job.
| --archives archives Comma separated list of archives to be distributed with the job.
- """
+ """.stripMargin
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
deleted file mode 100644
index 2bbf5d7db8..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
-import org.apache.spark.{SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
-
-/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
- */
-private[yarn] class YarnAllocationHandler(
- conf: Configuration,
- sparkConf: SparkConf,
- amClient: AMRMClient[ContainerRequest],
- appAttemptId: ApplicationAttemptId,
- args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
- securityMgr: SecurityManager)
- extends YarnAllocator(conf, sparkConf, appAttemptId, args, preferredNodes, securityMgr) {
-
- override protected def releaseContainer(container: Container) = {
- amClient.releaseAssignedContainer(container.getId())
- }
-
- // pending isn't used on stable as the AMRMClient handles incremental asks
- override protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse = {
- addResourceRequests(count)
-
- // We have already set the container request. Poll the ResourceManager for a response.
- // This doubles as a heartbeat if there are no pending container requests.
- val progressIndicator = 0.1f
- new StableAllocateResponse(amClient.allocate(progressIndicator))
- }
-
- private def createRackResourceRequests(
- hostContainers: ArrayBuffer[ContainerRequest]
- ): ArrayBuffer[ContainerRequest] = {
- // Generate modified racks and new set of hosts under it before issuing requests.
- val rackToCounts = new HashMap[String, Int]()
-
- for (container <- hostContainers) {
- val candidateHost = container.getNodes.last
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += 1
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts) {
- requestedContainers ++= createResourceRequests(
- AllocationType.RACK,
- rack,
- count,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
-
- requestedContainers
- }
-
- private def addResourceRequests(numExecutors: Int) {
- val containerRequests: List[ContainerRequest] =
- if (numExecutors <= 0) {
- logDebug("numExecutors: " + numExecutors)
- List()
- } else if (preferredHostToCount.isEmpty) {
- logDebug("host preferences is empty")
- createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY).toList
- } else {
- // Request for all hosts in preferred nodes and for numExecutors -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests ++= createResourceRequests(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
- }
- val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
- hostContainerRequests).toList
-
- val anyContainerRequests = createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
-
- val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
- hostContainerRequests.size + rackContainerRequests.size() + anyContainerRequests.size)
-
- containerRequestBuffer ++= hostContainerRequests
- containerRequestBuffer ++= rackContainerRequests
- containerRequestBuffer ++= anyContainerRequests
- containerRequestBuffer.toList
- }
-
- for (request <- containerRequests) {
- amClient.addContainerRequest(request)
- }
-
- for (request <- containerRequests) {
- val nodes = request.getNodes
- var hostStr = if (nodes == null || nodes.isEmpty) {
- "Any"
- } else {
- nodes.last
- }
- logInfo("Container request (host: %s, priority: %s, capability: %s".format(
- hostStr,
- request.getPriority().getPriority,
- request.getCapability))
- }
- }
-
- private def createResourceRequests(
- requestType: AllocationType.AllocationType,
- resource: String,
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- // If hostname is specified, then we need at least two requests - node local and rack local.
- // There must be a third request, which is ANY. That will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnSparkHadoopUtil.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = constructContainerRequests(
- Array(hostname),
- racks = null,
- numExecutors,
- priority)
-
- // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
- }
- case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numExecutors, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
- }
- }
-
- private def constructContainerRequests(
- hosts: Array[String],
- racks: Array[String],
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
-
- val memoryRequest = executorMemory + memoryOverhead
- val resource = Resource.newInstance(memoryRequest, executorCores)
-
- val prioritySetting = Records.newRecord(classOf[Priority])
- prioritySetting.setPriority(priority)
-
- val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numExecutors) {
- requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
- }
- requests
- }
-
- private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse {
- override def getAllocatedContainers() = response.getAllocatedContainers()
- override def getAvailableResources() = response.getAvailableResources()
- override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
- }
-
-}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b32e15738f..de65ef23ad 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,7 +17,6 @@
package org.apache.spark.deploy.yarn
-import java.util.{List => JList}
import java.util.concurrent._
import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
@@ -25,17 +24,20 @@ import java.util.regex.Pattern
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import com.google.common.util.concurrent.ThreadFactoryBuilder
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.util.Records
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import com.google.common.util.concurrent.ThreadFactoryBuilder
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-
object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
@@ -52,12 +54,12 @@ object AllocationType extends Enumeration {
// more info on how we are requesting for containers.
/**
- * Common code for the Yarn container allocator. Contains all the version-agnostic code to
- * manage container allocation for a running Spark application.
+ * Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
-private[yarn] abstract class YarnAllocator(
+private[yarn] class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
+ amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
@@ -67,7 +69,7 @@ private[yarn] abstract class YarnAllocator(
import YarnAllocator._
// These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+ // allocatedHostToContainersMap : containers which are running : host, Set<ContainerId>
// allocatedContainerToHostMap: container to host mapping.
private val allocatedHostToContainersMap =
new HashMap[String, collection.mutable.Set[ContainerId]]()
@@ -161,8 +163,6 @@ private[yarn] abstract class YarnAllocator(
def allocateResources(): Unit = synchronized {
val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
- // this is needed by alpha, do it here since we add numPending right after this
- val executorsPending = numPendingAllocate.get()
if (missing > 0) {
val totalExecutorMemory = executorMemory + memoryOverhead
numPendingAllocate.addAndGet(missing)
@@ -172,7 +172,7 @@ private[yarn] abstract class YarnAllocator(
logDebug("Empty allocation request ...")
}
- val allocateResponse = allocateContainers(missing, executorsPending)
+ val allocateResponse = allocateContainers(missing)
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
@@ -368,7 +368,7 @@ private[yarn] abstract class YarnAllocator(
val containerId = completedContainer.getContainerId
if (releasedContainers.containsKey(containerId)) {
- // YarnAllocationHandler already marked the container for release, so remove it from
+ // Already marked the container for release, so remove it from
// `releasedContainers`.
releasedContainers.remove(containerId)
} else {
@@ -441,20 +441,16 @@ private[yarn] abstract class YarnAllocator(
}
}
- protected def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
+ private def allocatedContainersOnHost(host: String): Int = {
allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+ allocatedHostToContainersMap.getOrElse(host, Set()).size
}
- retval
}
- protected def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
+ private def allocatedContainersOnRack(rack: String): Int = {
allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
+ allocatedRackCount.getOrElse(rack, 0)
}
- retval
}
private def isResourceConstraintSatisfied(container: Container): Boolean = {
@@ -464,9 +460,8 @@ private[yarn] abstract class YarnAllocator(
// A simple method to copy the split info map.
private def generateNodeToWeight(
conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]
- ): (Map[String, Int], Map[String, Int]) = {
-
+ input: collection.Map[String, collection.Set[SplitInfo]])
+ : (Map[String, Int], Map[String, Int]) = {
if (input == null) {
return (Map[String, Int](), Map[String, Int]())
}
@@ -488,9 +483,9 @@ private[yarn] abstract class YarnAllocator(
(hostToCount.toMap, rackToCount.toMap)
}
- private def internalReleaseContainer(container: Container) = {
+ private def internalReleaseContainer(container: Container): Unit = {
releasedContainers.put(container.getId(), true)
- releaseContainer(container)
+ amClient.releaseAssignedContainer(container.getId())
}
/**
@@ -498,26 +493,158 @@ private[yarn] abstract class YarnAllocator(
*
* @param count Number of containers to allocate.
* If zero, should still contact RM (as a heartbeat).
- * @param pending Number of containers pending allocate. Only used on alpha.
* @return Response to the allocation request.
*/
- protected def allocateContainers(count: Int, pending: Int): YarnAllocateResponse
+ private def allocateContainers(count: Int): AllocateResponse = {
+ addResourceRequests(count)
+
+ // We have already set the container request. Poll the ResourceManager for a response.
+ // This doubles as a heartbeat if there are no pending container requests.
+ val progressIndicator = 0.1f
+ amClient.allocate(progressIndicator)
+ }
- /** Called to release a previously allocated container. */
- protected def releaseContainer(container: Container): Unit
+ private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest])
+ : ArrayBuffer[ContainerRequest] = {
+ // Generate modified racks and new set of hosts under it before issuing requests.
+ val rackToCounts = new HashMap[String, Int]()
- /**
- * Defines the interface for an allocate response from the RM. This is needed since the alpha
- * and stable interfaces differ here in ways that cannot be fixed using other routes.
- */
- protected trait YarnAllocateResponse {
+ for (container <- hostContainers) {
+ val candidateHost = container.getNodes.last
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+ if (rack != null) {
+ var count = rackToCounts.getOrElse(rack, 0)
+ count += 1
+ rackToCounts.put(rack, count)
+ }
+ }
+
+ val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
+ for ((rack, count) <- rackToCounts) {
+ requestedContainers ++= createResourceRequests(
+ AllocationType.RACK,
+ rack,
+ count,
+ RM_REQUEST_PRIORITY)
+ }
+
+ requestedContainers
+ }
+
+ private def addResourceRequests(numExecutors: Int): Unit = {
+ val containerRequests: List[ContainerRequest] =
+ if (numExecutors <= 0) {
+ logDebug("numExecutors: " + numExecutors)
+ List()
+ } else if (preferredHostToCount.isEmpty) {
+ logDebug("host preferences is empty")
+ createResourceRequests(
+ AllocationType.ANY,
+ resource = null,
+ numExecutors,
+ RM_REQUEST_PRIORITY).toList
+ } else {
+ // Request for all hosts in preferred nodes and for numExecutors -
+ // candidates.size, request by default allocation policy.
+ val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
+ for ((candidateHost, candidateCount) <- preferredHostToCount) {
+ val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
+
+ if (requiredCount > 0) {
+ hostContainerRequests ++= createResourceRequests(
+ AllocationType.HOST,
+ candidateHost,
+ requiredCount,
+ RM_REQUEST_PRIORITY)
+ }
+ }
+ val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
+ hostContainerRequests).toList
+
+ val anyContainerRequests = createResourceRequests(
+ AllocationType.ANY,
+ resource = null,
+ numExecutors,
+ RM_REQUEST_PRIORITY)
+
+ val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
+ hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size)
+
+ containerRequestBuffer ++= hostContainerRequests
+ containerRequestBuffer ++= rackContainerRequests
+ containerRequestBuffer ++= anyContainerRequests
+ containerRequestBuffer.toList
+ }
- def getAllocatedContainers(): JList[Container]
+ for (request <- containerRequests) {
+ amClient.addContainerRequest(request)
+ }
- def getAvailableResources(): Resource
+ for (request <- containerRequests) {
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) {
+ "Any"
+ } else {
+ nodes.last
+ }
+ logInfo("Container request (host: %s, priority: %s, capability: %s".format(
+ hostStr,
+ request.getPriority().getPriority,
+ request.getCapability))
+ }
+ }
- def getCompletedContainersStatuses(): JList[ContainerStatus]
+ private def createResourceRequests(
+ requestType: AllocationType.AllocationType,
+ resource: String,
+ numExecutors: Int,
+ priority: Int): ArrayBuffer[ContainerRequest] = {
+ // If hostname is specified, then we need at least two requests - node local and rack local.
+ // There must be a third request, which is ANY. That will be specially handled.
+ requestType match {
+ case AllocationType.HOST => {
+ assert(YarnSparkHadoopUtil.ANY_HOST != resource)
+ val hostname = resource
+ val nodeLocal = constructContainerRequests(
+ Array(hostname),
+ racks = null,
+ numExecutors,
+ priority)
+
+ // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
+ YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
+ nodeLocal
+ }
+ case AllocationType.RACK => {
+ val rack = resource
+ constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
+ }
+ case AllocationType.ANY => constructContainerRequests(
+ hosts = null, racks = null, numExecutors, priority)
+ case _ => throw new IllegalArgumentException(
+ "Unexpected/unsupported request type: " + requestType)
+ }
+ }
+ private def constructContainerRequests(
+ hosts: Array[String],
+ racks: Array[String],
+ numExecutors: Int,
+ priority: Int
+ ): ArrayBuffer[ContainerRequest] = {
+ val memoryRequest = executorMemory + memoryOverhead
+ val resource = Resource.newInstance(memoryRequest, executorCores)
+
+ val prioritySetting = Records.newRecord(classOf[Priority])
+ prioritySetting.setPriority(priority)
+
+ val requests = new ArrayBuffer[ContainerRequest]()
+ for (i <- 0 until numExecutors) {
+ requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
+ }
+ requests
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index 2510b9c9ce..bf4e15908b 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -17,19 +17,33 @@
package org.apache.spark.deploy.yarn
+import java.util.{List => JList}
+
+import scala.collection.JavaConversions._
import scala.collection.{Map, Set}
+import scala.util.Try
-import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.apache.hadoop.yarn.conf.YarnConfiguration
+import org.apache.hadoop.yarn.util.ConverterUtils
+import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
+import org.apache.spark.util.Utils
/**
- * Interface that defines a Yarn RM client. Abstracts away Yarn version-specific functionality that
- * is used by Spark's AM.
+ * Handles registering and unregistering the application with the YARN ResourceManager.
*/
-trait YarnRMClient {
+private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logging {
+
+ private var amClient: AMRMClient[ContainerRequest] = _
+ private var uiHistoryAddress: String = _
+ private var registered: Boolean = false
/**
* Registers the application master with the RM.
@@ -46,7 +60,21 @@ trait YarnRMClient {
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String,
- securityMgr: SecurityManager): YarnAllocator
+ securityMgr: SecurityManager
+ ): YarnAllocator = {
+ amClient = AMRMClient.createAMRMClient()
+ amClient.init(conf)
+ amClient.start()
+ this.uiHistoryAddress = uiHistoryAddress
+
+ logInfo("Registering the ApplicationMaster")
+ synchronized {
+ amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
+ registered = true
+ }
+ new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
+ preferredNodeLocations, securityMgr)
+ }
/**
* Unregister the AM. Guaranteed to only be called once.
@@ -54,15 +82,45 @@ trait YarnRMClient {
* @param status The final status of the AM.
* @param diagnostics Diagnostics message to include in the final status.
*/
- def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit
+ def unregister(status: FinalApplicationStatus, diagnostics: String = ""): Unit = synchronized {
+ if (registered) {
+ amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
+ }
+ }
/** Returns the attempt ID. */
- def getAttemptId(): ApplicationAttemptId
+ def getAttemptId(): ApplicationAttemptId = {
+ val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
+ val containerId = ConverterUtils.toContainerId(containerIdString)
+ containerId.getApplicationAttemptId()
+ }
/** Returns the configuration for the AmIpFilter to add to the Spark UI. */
- def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String]
+ def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String): Map[String, String] = {
+ // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
+ // so not all stable releases have it.
+ val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
+ .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
+
+ // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
+ try {
+ val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
+ classOf[Configuration])
+ val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
+ val hosts = proxies.map { proxy => proxy.split(":")(0) }
+ val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
+ Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
+ } catch {
+ case e: NoSuchMethodException =>
+ val proxy = WebAppUtils.getProxyHostAndPort(conf)
+ val parts = proxy.split(":")
+ val uriBase = prefix + proxy + proxyBase
+ Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
+ }
+ }
/** Returns the maximum number of attempts to register the AM. */
- def getMaxRegAttempts(conf: YarnConfiguration): Int
+ def getMaxRegAttempts(conf: YarnConfiguration): Int =
+ conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
deleted file mode 100644
index 8d4b96ed79..0000000000
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.deploy.yarn
-
-import java.util.{List => JList}
-
-import scala.collection.{Map, Set}
-import scala.collection.JavaConversions._
-import scala.util._
-
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api._
-import org.apache.hadoop.yarn.api.protocolrecords._
-import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.client.api.AMRMClient
-import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.hadoop.yarn.webapp.util.WebAppUtils
-
-import org.apache.spark.{Logging, SecurityManager, SparkConf}
-import org.apache.spark.scheduler.SplitInfo
-import org.apache.spark.util.Utils
-
-
-/**
- * YarnRMClient implementation for the Yarn stable API.
- */
-private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMClient with Logging {
-
- private var amClient: AMRMClient[ContainerRequest] = _
- private var uiHistoryAddress: String = _
- private var registered: Boolean = false
-
- override def register(
- conf: YarnConfiguration,
- sparkConf: SparkConf,
- preferredNodeLocations: Map[String, Set[SplitInfo]],
- uiAddress: String,
- uiHistoryAddress: String,
- securityMgr: SecurityManager) = {
- amClient = AMRMClient.createAMRMClient()
- amClient.init(conf)
- amClient.start()
- this.uiHistoryAddress = uiHistoryAddress
-
- logInfo("Registering the ApplicationMaster")
- synchronized {
- amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
- registered = true
- }
- new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
- preferredNodeLocations, securityMgr)
- }
-
- override def unregister(status: FinalApplicationStatus, diagnostics: String = "") = synchronized {
- if (registered) {
- amClient.unregisterApplicationMaster(status, diagnostics, uiHistoryAddress)
- }
- }
-
- override def getAttemptId() = {
- val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name())
- val containerId = ConverterUtils.toContainerId(containerIdString)
- val appAttemptId = containerId.getApplicationAttemptId()
- appAttemptId
- }
-
- override def getAmIpFilterParams(conf: YarnConfiguration, proxyBase: String) = {
- // Figure out which scheme Yarn is using. Note the method seems to have been added after 2.2,
- // so not all stable releases have it.
- val prefix = Try(classOf[WebAppUtils].getMethod("getHttpSchemePrefix", classOf[Configuration])
- .invoke(null, conf).asInstanceOf[String]).getOrElse("http://")
-
- // If running a new enough Yarn, use the HA-aware API for retrieving the RM addresses.
- try {
- val method = classOf[WebAppUtils].getMethod("getProxyHostsAndPortsForAmFilter",
- classOf[Configuration])
- val proxies = method.invoke(null, conf).asInstanceOf[JList[String]]
- val hosts = proxies.map { proxy => proxy.split(":")(0) }
- val uriBases = proxies.map { proxy => prefix + proxy + proxyBase }
- Map("PROXY_HOSTS" -> hosts.mkString(","), "PROXY_URI_BASES" -> uriBases.mkString(","))
- } catch {
- case e: NoSuchMethodException =>
- val proxy = WebAppUtils.getProxyHostAndPort(conf)
- val parts = proxy.split(":")
- val uriBase = prefix + proxy + proxyBase
- Map("PROXY_HOST" -> parts(0), "PROXY_URI_BASE" -> uriBase)
- }
- }
-
- override def getMaxRegAttempts(conf: YarnConfiguration) =
- conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)
-
-}