path: root/yarn/stable
diff options
authorMarcelo Vanzin <vanzin@cloudera.com>2014-09-03 08:22:50 -0500
committerThomas Graves <tgraves@apache.org>2014-09-03 08:22:50 -0500
commit6a72a36940311fcb3429bd34c8818bc7d513115c (patch)
tree84531f12871aa96be5c6e779e38a4f0e488ad46c /yarn/stable
parentc64cc435e2a29c6f0ff66022fd4d5b4cb5011718 (diff)
[SPARK-3187] [yarn] Cleanup allocator code.
Move all shared logic to the base YarnAllocator class, and leave the version-specific logic in the version-specific module. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2169 from vanzin/SPARK-3187 and squashes the following commits: 46c2826 [Marcelo Vanzin] Hide the privates. 4dc9c83 [Marcelo Vanzin] Actually release containers. 8b1a077 [Marcelo Vanzin] Changes to the Yarn alpha allocator. f3f5f1d [Marcelo Vanzin] [SPARK-3187] [yarn] Cleanup allocator code.
Diffstat (limited to 'yarn/stable')
1 files changed, 16 insertions, 386 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4d51449899..ed31457b61 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,36 +17,19 @@
package org.apache.spark.deploy.yarn
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.Records
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
@@ -57,329 +40,22 @@ private[yarn] class YarnAllocationHandler(
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
- extends YarnAllocator with Logging {
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
- // Containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // Containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
- // Additional memory overhead - in mb.
- private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- // Number of container requests that have been sent to, but not yet allocated by the
- // ApplicationMaster.
- private val numPendingAllocate = new AtomicInteger()
- private val numExecutorsRunning = new AtomicInteger()
- // Used to generate a unique id per executor
- private val executorIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
- private val numExecutorsFailed = new AtomicInteger()
- private val maxExecutors = args.numExecutors
- private val executorMemory = args.executorMemory
- private val executorCores = args.executorCores
- private val (preferredHostToCount, preferredRackToCount) =
- generateNodeToWeight(conf, preferredNodes)
- override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
- override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + memoryOverhead)
- }
- def releaseContainer(container: Container) {
- val containerId = container.getId
- pendingReleaseContainers.put(containerId, true)
- amClient.releaseAssignedContainer(containerId)
+ override protected def releaseContainer(container: Container) = {
+ amClient.releaseAssignedContainer(container.getId())
- override def allocateResources() = {
- addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+ override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+ addResourceRequests(count)
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
val progressIndicator = 0.1f
- val allocateResponse = amClient.allocate(progressIndicator)
- val allocatedContainers = allocateResponse.getAllocatedContainers()
- if (allocatedContainers.size > 0) {
- var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
- if (numPendingAllocateNow < 0) {
- numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
- }
- logDebug("""
- Allocated containers: %d
- Current executor count: %d
- Containers released: %s
- Containers to-be-released: %s
- Cluster resources: %s
- """.format(
- allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers,
- allocateResponse.getAvailableResources))
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
- for (container <- allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // Add the accepted `container` to the host's list of already accepted,
- // allocated containers
- val host = container.getNodeId.getHost
- val containersForHost = hostToContainers.getOrElseUpdate(host,
- new ArrayBuffer[Container]())
- containersForHost += container
- } else {
- // Release container, since it doesn't satisfy resource constraints.
- releaseContainer(container)
- }
- }
- // Find the appropriate containers to use.
- // TODO: Cleanup this group-by...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
- for (candidateHost <- hostToContainers.keySet) {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
- val remainingContainersOpt = hostToContainers.get(candidateHost)
- assert(remainingContainersOpt.isDefined)
- var remainingContainers = remainingContainersOpt.get
- if (requiredHostCount >= remainingContainers.size) {
- // Since we have <= required containers, add all remaining containers to
- // `dataLocalContainers`.
- dataLocalContainers.put(candidateHost, remainingContainers)
- // There are no more free containers remaining.
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
- // Invariant: remainingContainers == remaining
- // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
- // Add each container in `remaining` to list of containers to release. If we have an
- // insufficient number of containers, then the next allocation cycle will reallocate
- // (but won't treat it as data local).
- // TODO(harvey): Rephrase this comment some more.
- for (container <- remaining) releaseContainer(container)
- remainingContainers = null
- }
- // For rack local containers
- if (remainingContainers != null) {
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.getOrElse(rack, List()).size
- if (requiredRackCount >= remainingContainers.size) {
- // Add all remaining containers to to `dataLocalContainers`.
- dataLocalContainers.put(rack, remainingContainers)
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // Container list has more containers that we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
- existingRackLocal ++= rackLocal
- remainingContainers = remaining
- }
- }
- }
- if (remainingContainers != null) {
- // Not all containers have been consumed - add them to the list of off-rack containers.
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
- // Now that we have split the containers into various groups, go through them in order:
- // first host-local, then rack-local, and finally off-rack.
- // Note that the list we create below tries to ensure that not all containers end up within
- // a host if there is a sufficiently large number of hosts/containers.
- val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
- // Run each of the allocated containers.
- for (container <- allocatedContainersToProcess) {
- val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
- val executorHostname = container.getNodeId.getHost
- val containerId = container.getId
- val executorMemoryOverhead = (executorMemory + memoryOverhead)
- assert(container.getResource.getMemory >= executorMemoryOverhead)
- if (numExecutorsRunningNow > maxExecutors) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, executorHostname))
- releaseContainer(container)
- numExecutorsRunning.decrementAndGet()
- } else {
- val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
- // To be safe, remove the container from `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
- val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId]())
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
- val executorRunnable = new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores)
- new Thread(executorRunnable).start()
- }
- }
- logDebug("""
- Finished allocating %s containers (from %s originally).
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- allocatedContainersToProcess,
- allocatedContainers,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
- val completedContainers = allocateResponse.getCompletedContainersStatuses()
- if (completedContainers.size > 0) {
- logDebug("Completed %d containers".format(completedContainers.size))
- for (completedContainer <- completedContainers) {
- val containerId = completedContainer.getContainerId
- if (pendingReleaseContainers.containsKey(containerId)) {
- // YarnAllocationHandler already marked the container for release, so remove it from
- // `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
- } else {
- // Decrement the number of executors running. The next iteration of
- // the ApplicationMaster's reporting thread will take care of allocating.
- numExecutorsRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus()))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
- numExecutorsFailed.incrementAndGet()
- }
- }
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val hostOpt = allocatedContainerToHostMap.get(containerId)
- assert(hostOpt.isDefined)
- val host = hostOpt.get
- val containerSetOpt = allocatedHostToContainersMap.get(host)
- assert(containerSetOpt.isDefined)
- val containerSet = containerSetOpt.get
- containerSet.remove(containerId)
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
- allocatedContainerToHostMap.remove(containerId)
- // TODO: Move this part outside the synchronized block?
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- completedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
+ new StableAllocateResponse(amClient.allocate(progressIndicator))
- def createRackResourceRequests(
+ private def createRackResourceRequests(
hostContainers: ArrayBuffer[ContainerRequest]
): ArrayBuffer[ContainerRequest] = {
// Generate modified racks and new set of hosts under it before issuing requests.
@@ -409,22 +85,6 @@ private[yarn] class YarnAllocationHandler(
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
private def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
@@ -472,15 +132,6 @@ private[yarn] class YarnAllocationHandler(
- if (numExecutors > 0) {
- numPendingAllocate.addAndGet(numExecutors)
- logInfo("Will Allocate %d executor containers, each with %d memory".format(
- numExecutors,
- (executorMemory + memoryOverhead)))
- } else {
- logDebug("Empty allocation request ...")
- }
for (request <- containerRequests) {
val nodes = request.getNodes
var hostStr = if (nodes == null || nodes.isEmpty) {
@@ -549,31 +200,10 @@ private[yarn] class YarnAllocationHandler(
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]
- ): (Map[String, Int], Map[String, Int]) = {
- if (input == null) {
- return (Map[String, Int](), Map[String, Int]())
- }
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
- (hostToCount.toMap, rackToCount.toMap)
+ private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse {
+ override def getAllocatedContainers() = response.getAllocatedContainers()
+ override def getAvailableResources() = response.getAvailableResources()
+ override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()