aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2014-09-03 08:22:50 -0500
committerThomas Graves <tgraves@apache.org>2014-09-03 08:22:50 -0500
commit6a72a36940311fcb3429bd34c8818bc7d513115c (patch)
tree84531f12871aa96be5c6e779e38a4f0e488ad46c /yarn
parentc64cc435e2a29c6f0ff66022fd4d5b4cb5011718 (diff)
downloadspark-6a72a36940311fcb3429bd34c8818bc7d513115c.tar.gz
spark-6a72a36940311fcb3429bd34c8818bc7d513115c.tar.bz2
spark-6a72a36940311fcb3429bd34c8818bc7d513115c.zip
[SPARK-3187] [yarn] Cleanup allocator code.
Move all shared logic to the base YarnAllocator class, and leave the version-specific logic in the version-specific module. Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #2169 from vanzin/SPARK-3187 and squashes the following commits: 46c2826 [Marcelo Vanzin] Hide the privates. 4dc9c83 [Marcelo Vanzin] Actually release containers. 8b1a077 [Marcelo Vanzin] Changes to the Yarn alpha allocator. f3f5f1d [Marcelo Vanzin] [SPARK-3187] [yarn] Cleanup allocator code.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala462
-rw-r--r--yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala425
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala402
3 files changed, 495 insertions, 794 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 629cd13f67..9f9e16c064 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,35 +17,21 @@
package org.apache.spark.deploy.yarn
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
+import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.AMRMProtocol
-import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId}
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest
import org.apache.hadoop.yarn.util.Records
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
/**
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
@@ -56,357 +42,20 @@ private[yarn] class YarnAllocationHandler(
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
- extends YarnAllocator with Logging {
-
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
-
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
-
- // Containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // Containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- // Additional memory overhead - in mb.
- private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
- private val numExecutorsRunning = new AtomicInteger()
- // Used to generate a unique id per executor
- private val executorIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
- private val numExecutorsFailed = new AtomicInteger()
-
- private val maxExecutors = args.numExecutors
- private val executorMemory = args.executorMemory
- private val executorCores = args.executorCores
- private val (preferredHostToCount, preferredRackToCount) =
- generateNodeToWeight(conf, preferredNodes)
-
- def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
-
- def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + memoryOverhead)
- }
-
- override def allocateResources() = {
- // We need to send the request only once from what I understand ... but for now, not modifying
- // this much.
- val executorsToRequest = Math.max(maxExecutors - numExecutorsRunning.get(), 0)
-
- // Keep polling the Resource Manager for containers
- val amResp = allocateExecutorResources(executorsToRequest).getAMResponse
-
- val _allocatedContainers = amResp.getAllocatedContainers()
-
- if (_allocatedContainers.size > 0) {
- logDebug("""
- Allocated containers: %d
- Current executor count: %d
- Containers released: %s
- Containers to be released: %s
- Cluster resources: %s
- """.format(
- _allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers,
- amResp.getAvailableResources))
-
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- // Ignore if not satisfying constraints {
- for (container <- _allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // allocatedContainers += container
-
- val host = container.getNodeId.getHost
- val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
-
- containers += container
- } else {
- // Add all ignored containers to released list
- releasedContainerList.add(container.getId())
- }
- }
-
- // Find the appropriate containers to use. Slightly non trivial groupBy ...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet)
- {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- var remainingContainers = hostToContainers.get(candidateHost).getOrElse(null)
- assert(remainingContainers != null)
-
- if (requiredHostCount >= remainingContainers.size){
- // Since we got <= required containers, add all to dataLocalContainers
- dataLocalContainers.put(candidateHost, remainingContainers)
- // all consumed
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size -
- // requiredHostCount) and rest as remainingContainer
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
- // remainingContainers = remaining
-
- // yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation
- // cycle will reallocate (but wont treat it as data local)
- for (container <- remaining) releasedContainerList.add(container.getId())
- remainingContainers = null
- }
-
- // Now rack local
- if (remainingContainers != null){
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
-
- if (rack != null){
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.get(rack).getOrElse(List()).size
-
-
- if (requiredRackCount >= remainingContainers.size){
- // Add all to dataLocalContainers
- dataLocalContainers.put(rack, remainingContainers)
- // All consumed
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size -
- // requiredRackCount) and rest as remainingContainer
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
- remainingContainers = remaining
- }
- }
- }
-
- // If still not consumed, then it is off rack host - add to that list.
- if (remainingContainers != null){
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order :
- // first host local, then rack local and then off rack (everything else).
- // Note that the list we create below tries to ensure that not all containers end up within a
- // host if there are sufficiently large number of hosts/containers.
-
- val allocatedContainers = new ArrayBuffer[Container](_allocatedContainers.size)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainers ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers
- for (container <- allocatedContainers) {
- val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
- val executorHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- assert( container.getResource.getMemory >=
- (executorMemory + memoryOverhead))
-
- if (numExecutorsRunningNow > maxExecutors) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, executorHostname))
- releasedContainerList.add(containerId)
- // reset counter back to old value.
- numExecutorsRunning.decrementAndGet()
- } else {
- // Deallocate + allocate can result in reusing id's wrongly - so use a different counter
- // (executorIdCounter)
- val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("launching container on " + containerId + " host " + executorHostname)
- // Just to be safe, simply remove it from pendingReleaseContainers.
- // Should not be there, but ..
- pendingReleaseContainers.remove(containerId)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
-
- new Thread(
- new ExecutorRunnable(container, conf, sparkConf, driverUrl, executorId,
- executorHostname, executorMemory, executorCores)
- ).start()
- }
- }
- logDebug("""
- Finished processing %d containers.
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
-
-
- val completedContainers = amResp.getCompletedContainersStatuses()
- if (completedContainers.size > 0){
- logDebug("Completed %d containers, to-be-released: %s".format(
- completedContainers.size, releasedContainerList))
- for (completedContainer <- completedContainers){
- val containerId = completedContainer.getContainerId
-
- // Was this released by us ? If yes, then simply remove from containerSet and move on.
- if (pendingReleaseContainers.containsKey(containerId)) {
- pendingReleaseContainers.remove(containerId)
- } else {
- // Simply decrement count - next iteration of ReporterThread will take care of allocating.
- numExecutorsRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus()))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
- numExecutorsFailed.incrementAndGet()
- }
- }
-
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val host = allocatedContainerToHostMap.get(containerId).getOrElse(null)
- assert (host != null)
-
- val containerSet = allocatedHostToContainersMap.get(host).getOrElse(null)
- assert (containerSet != null)
-
- containerSet -= containerId
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
-
- allocatedContainerToHostMap -= containerId
-
- // Doing this within locked context, sigh ... move to outside ?
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- completedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
- }
-
- def createRackResourceRequests(hostContainers: List[ResourceRequest]): List[ResourceRequest] = {
- // First generate modified racks and new set of hosts under it : then issue requests
- val rackToCounts = new HashMap[String, Int]()
-
- // Within this lock - used to read/write to the rack related maps too.
- for (container <- hostContainers) {
- val candidateHost = container.getHostName
- val candidateNumContainers = container.getNumContainers
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += candidateNumContainers
- rackToCounts.put(rack, count)
- }
- }
-
- val requestedContainers: ArrayBuffer[ResourceRequest] =
- new ArrayBuffer[ResourceRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts){
- requestedContainers +=
- createResourceRequest(AllocationType.RACK, rack, count,
- YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
- }
-
- requestedContainers.toList
- }
-
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
+ extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
-
- private def allocateExecutorResources(numExecutors: Int): AllocateResponse = {
+ private val lastResponseId = new AtomicInteger()
+ private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
+ override protected def allocateContainers(count: Int): YarnAllocateResponse = {
var resourceRequests: List[ResourceRequest] = null
- // default.
- if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numExecutors: " + numExecutors + ", host preferences: " +
+ // default.
+ if (count <= 0 || preferredHostToCount.isEmpty) {
+ logDebug("numExecutors: " + count + ", host preferences: " +
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
- AllocationType.ANY, null, numExecutors, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
+ AllocationType.ANY, null, count, YarnSparkHadoopUtil.RM_REQUEST_PRIORITY))
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
@@ -429,7 +78,7 @@ private[yarn] class YarnAllocationHandler(
val anyContainerRequests: ResourceRequest = createResourceRequest(
AllocationType.ANY,
resource = null,
- numExecutors,
+ count,
YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
@@ -451,8 +100,8 @@ private[yarn] class YarnAllocationHandler(
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
- if (numExecutors > 0) {
- logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
+ if (count > 0) {
+ logInfo("Allocating %d executor containers with %d of memory each.".format(count,
executorMemory + memoryOverhead))
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
@@ -466,9 +115,42 @@ private[yarn] class YarnAllocationHandler(
request.getPriority,
request.getCapability))
}
- resourceManager.allocate(req)
+ new AlphaAllocateResponse(resourceManager.allocate(req).getAMResponse())
}
+ override protected def releaseContainer(container: Container) = {
+ releaseList.add(container.getId())
+ }
+
+ private def createRackResourceRequests(hostContainers: List[ResourceRequest]):
+ List[ResourceRequest] = {
+ // First generate modified racks and new set of hosts under it : then issue requests
+ val rackToCounts = new HashMap[String, Int]()
+
+ // Within this lock - used to read/write to the rack related maps too.
+ for (container <- hostContainers) {
+ val candidateHost = container.getHostName
+ val candidateNumContainers = container.getNumContainers
+ assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+ if (rack != null) {
+ var count = rackToCounts.getOrElse(rack, 0)
+ count += candidateNumContainers
+ rackToCounts.put(rack, count)
+ }
+ }
+
+ val requestedContainers: ArrayBuffer[ResourceRequest] =
+ new ArrayBuffer[ResourceRequest](rackToCounts.size)
+ for ((rack, count) <- rackToCounts){
+ requestedContainers +=
+ createResourceRequest(AllocationType.RACK, rack, count,
+ YarnSparkHadoopUtil.RM_REQUEST_PRIORITY)
+ }
+
+ requestedContainers.toList
+ }
private def createResourceRequest(
requestType: AllocationType.AllocationType,
@@ -521,48 +203,24 @@ private[yarn] class YarnAllocationHandler(
rsrcRequest
}
- def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
-
+ private def createReleasedContainerList(): ArrayBuffer[ContainerId] = {
val retval = new ArrayBuffer[ContainerId](1)
// Iterator on COW list ...
- for (container <- releasedContainerList.iterator()){
+ for (container <- releaseList.iterator()){
retval += container
}
// Remove from the original list.
- if (! retval.isEmpty) {
- releasedContainerList.removeAll(retval)
- for (v <- retval) pendingReleaseContainers.put(v, true)
- logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
- pendingReleaseContainers)
+ if (!retval.isEmpty) {
+ releaseList.removeAll(retval)
+ logInfo("Releasing " + retval.size + " containers.")
}
-
retval
}
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]) :
- // host to count, rack to count
- (Map[String, Int], Map[String, Int]) = {
-
- if (input == null) return (Map[String, Int](), Map[String, Int]())
-
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
-
- (hostToCount.toMap, rackToCount.toMap)
+ private class AlphaAllocateResponse(response: AMResponse) extends YarnAllocateResponse {
+ override def getAllocatedContainers() = response.getAllocatedContainers()
+ override def getAvailableResources() = response.getAvailableResources()
+ override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
}
}
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index cad94e5e19..c74dd1c2b2 100644
--- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,18 +17,431 @@
package org.apache.spark.deploy.yarn
+import java.util.{List => JList}
+import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.atomic.AtomicInteger
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
+
+import org.apache.spark.{Logging, SparkConf, SparkEnv}
+import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
+import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
+
object AllocationType extends Enumeration {
type AllocationType = Value
val HOST, RACK, ANY = Value
}
+// TODO:
+// Too many params.
+// Needs to be mt-safe
+// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
+// make it more proactive and decoupled.
+
+// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
+
/**
- * Interface that defines a Yarn allocator.
+ * Common code for the Yarn container allocator. Contains all the version-agnostic code to
+ * manage container allocation for a running Spark application.
*/
-trait YarnAllocator {
+private[yarn] abstract class YarnAllocator(
+ conf: Configuration,
+ sparkConf: SparkConf,
+ args: ApplicationMasterArguments,
+ preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
+ extends Logging {
- def allocateResources(): Unit
- def getNumExecutorsFailed: Int
- def getNumExecutorsRunning: Int
+ // These three are locked on allocatedHostToContainersMap. Complementary data structures
+ // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
+ // allocatedContainerToHostMap: container to host mapping.
+ private val allocatedHostToContainersMap =
+ new HashMap[String, collection.mutable.Set[ContainerId]]()
-}
+ private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+
+ // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
+ // allocated node)
+ // As with the two data structures above, tightly coupled with them, and to be locked on
+ // allocatedHostToContainersMap
+ private val allocatedRackCount = new HashMap[String, Int]()
+
+ // Containers to be released in next request to RM
+ private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
+
+ // Additional memory overhead - in mb.
+ protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
+ YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
+
+ // Number of container requests that have been sent to, but not yet allocated by the
+ // ApplicationMaster.
+ private val numPendingAllocate = new AtomicInteger()
+ private val numExecutorsRunning = new AtomicInteger()
+ // Used to generate a unique id per executor
+ private val executorIdCounter = new AtomicInteger()
+ private val numExecutorsFailed = new AtomicInteger()
+
+ private val maxExecutors = args.numExecutors
+
+ protected val executorMemory = args.executorMemory
+ protected val executorCores = args.executorCores
+ protected val (preferredHostToCount, preferredRackToCount) =
+ generateNodeToWeight(conf, preferredNodes)
+
+ def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+
+ def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+
+ def allocateResources() = {
+ val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+
+ if (missing > 0) {
+ numPendingAllocate.addAndGet(missing)
+ logInfo("Will Allocate %d executor containers, each with %d memory".format(
+ missing,
+ (executorMemory + memoryOverhead)))
+ } else {
+ logDebug("Empty allocation request ...")
+ }
+
+ val allocateResponse = allocateContainers(missing)
+ val allocatedContainers = allocateResponse.getAllocatedContainers()
+
+ if (allocatedContainers.size > 0) {
+ var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
+
+ if (numPendingAllocateNow < 0) {
+ numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
+ }
+
+ logDebug("""
+ Allocated containers: %d
+ Current executor count: %d
+ Containers released: %s
+ Cluster resources: %s
+ """.format(
+ allocatedContainers.size,
+ numExecutorsRunning.get(),
+ releasedContainers,
+ allocateResponse.getAvailableResources))
+
+ val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ for (container <- allocatedContainers) {
+ if (isResourceConstraintSatisfied(container)) {
+ // Add the accepted `container` to the host's list of already accepted,
+ // allocated containers
+ val host = container.getNodeId.getHost
+ val containersForHost = hostToContainers.getOrElseUpdate(host,
+ new ArrayBuffer[Container]())
+ containersForHost += container
+ } else {
+ // Release container, since it doesn't satisfy resource constraints.
+ internalReleaseContainer(container)
+ }
+ }
+
+ // Find the appropriate containers to use.
+ // TODO: Cleanup this group-by...
+ val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
+ val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
+
+ for (candidateHost <- hostToContainers.keySet) {
+ val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
+ val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
+
+ val remainingContainersOpt = hostToContainers.get(candidateHost)
+ assert(remainingContainersOpt.isDefined)
+ var remainingContainers = remainingContainersOpt.get
+
+ if (requiredHostCount >= remainingContainers.size) {
+ // Since we have <= required containers, add all remaining containers to
+ // `dataLocalContainers`.
+ dataLocalContainers.put(candidateHost, remainingContainers)
+ // There are no more free containers remaining.
+ remainingContainers = null
+ } else if (requiredHostCount > 0) {
+ // Container list has more containers than we need for data locality.
+ // Split the list into two: one based on the data local container count,
+ // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+ // containers.
+ val (dataLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredHostCount)
+ dataLocalContainers.put(candidateHost, dataLocal)
+
+ // Invariant: remainingContainers == remaining
+
+ // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
+ // Add each container in `remaining` to list of containers to release. If we have an
+ // insufficient number of containers, then the next allocation cycle will reallocate
+ // (but won't treat it as data local).
+ // TODO(harvey): Rephrase this comment some more.
+ for (container <- remaining) internalReleaseContainer(container)
+ remainingContainers = null
+ }
+
+ // For rack local containers
+ if (remainingContainers != null) {
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
+ if (rack != null) {
+ val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
+ val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+ rackLocalContainers.getOrElse(rack, List()).size
+
+ if (requiredRackCount >= remainingContainers.size) {
+ // Add all remaining containers to to `dataLocalContainers`.
+ dataLocalContainers.put(rack, remainingContainers)
+ remainingContainers = null
+ } else if (requiredRackCount > 0) {
+ // Container list has more containers that we need for data locality.
+ // Split the list into two: one based on the data local container count,
+ // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
+ // containers.
+ val (rackLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredRackCount)
+ val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
+ new ArrayBuffer[Container]())
+
+ existingRackLocal ++= rackLocal
+
+ remainingContainers = remaining
+ }
+ }
+ }
+
+ if (remainingContainers != null) {
+ // Not all containers have been consumed - add them to the list of off-rack containers.
+ offRackContainers.put(candidateHost, remainingContainers)
+ }
+ }
+
+ // Now that we have split the containers into various groups, go through them in order:
+ // first host-local, then rack-local, and finally off-rack.
+ // Note that the list we create below tries to ensure that not all containers end up within
+ // a host if there is a sufficiently large number of hosts/containers.
+ val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
+ allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
+
+ // Run each of the allocated containers.
+ for (container <- allocatedContainersToProcess) {
+ val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
+ val executorHostname = container.getNodeId.getHost
+ val containerId = container.getId
+
+ val executorMemoryOverhead = (executorMemory + memoryOverhead)
+ assert(container.getResource.getMemory >= executorMemoryOverhead)
+
+ if (numExecutorsRunningNow > maxExecutors) {
+ logInfo("""Ignoring container %s at host %s, since we already have the required number of
+ containers for it.""".format(containerId, executorHostname))
+ internalReleaseContainer(container)
+ numExecutorsRunning.decrementAndGet()
+ } else {
+ val executorId = executorIdCounter.incrementAndGet().toString
+ val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
+ SparkEnv.driverActorSystemName,
+ sparkConf.get("spark.driver.host"),
+ sparkConf.get("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+
+ // To be safe, remove the container from `releasedContainers`.
+ releasedContainers.remove(containerId)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
+ allocatedHostToContainersMap.synchronized {
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ new HashSet[ContainerId]())
+
+ containerSet += containerId
+ allocatedContainerToHostMap.put(containerId, executorHostname)
+
+ if (rack != null) {
+ allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ }
+ }
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
+ driverUrl, executorHostname))
+ val executorRunnable = new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores)
+ new Thread(executorRunnable).start()
+ }
+ }
+ logDebug("""
+ Finished allocating %s containers (from %s originally).
+ Current number of executors running: %d,
+ Released containers: %s
+ """.format(
+ allocatedContainersToProcess,
+ allocatedContainers,
+ numExecutorsRunning.get(),
+ releasedContainers))
+ }
+
+ val completedContainers = allocateResponse.getCompletedContainersStatuses()
+ if (completedContainers.size > 0) {
+ logDebug("Completed %d containers".format(completedContainers.size))
+
+ for (completedContainer <- completedContainers) {
+ val containerId = completedContainer.getContainerId
+
+ if (releasedContainers.containsKey(containerId)) {
+ // YarnAllocationHandler already marked the container for release, so remove it from
+ // `releasedContainers`.
+ releasedContainers.remove(containerId)
+ } else {
+ // Decrement the number of executors running. The next iteration of
+ // the ApplicationMaster's reporting thread will take care of allocating.
+ numExecutorsRunning.decrementAndGet()
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus()))
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numExecutorsFailed.incrementAndGet()
+ }
+ }
+
+ allocatedHostToContainersMap.synchronized {
+ if (allocatedContainerToHostMap.containsKey(containerId)) {
+ val hostOpt = allocatedContainerToHostMap.get(containerId)
+ assert(hostOpt.isDefined)
+ val host = hostOpt.get
+
+ val containerSetOpt = allocatedHostToContainersMap.get(host)
+ assert(containerSetOpt.isDefined)
+ val containerSet = containerSetOpt.get
+
+ containerSet.remove(containerId)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
+
+ allocatedContainerToHostMap.remove(containerId)
+
+ // TODO: Move this part outside the synchronized block?
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+ if (rack != null) {
+ val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
+ if (rackCount > 0) {
+ allocatedRackCount.put(rack, rackCount)
+ } else {
+ allocatedRackCount.remove(rack)
+ }
+ }
+ }
+ }
+ }
+ logDebug("""
+ Finished processing %d completed containers.
+ Current number of executors running: %d,
+ Released containers: %s
+ """.format(
+ completedContainers.size,
+ numExecutorsRunning.get(),
+ releasedContainers))
+ }
+ }
+
+ protected def allocatedContainersOnHost(host: String): Int = {
+ var retval = 0
+ allocatedHostToContainersMap.synchronized {
+ retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
+ }
+ retval
+ }
+
+ protected def allocatedContainersOnRack(rack: String): Int = {
+ var retval = 0
+ allocatedHostToContainersMap.synchronized {
+ retval = allocatedRackCount.getOrElse(rack, 0)
+ }
+ retval
+ }
+
+ private def isResourceConstraintSatisfied(container: Container): Boolean = {
+ container.getResource.getMemory >= (executorMemory + memoryOverhead)
+ }
+
+ // A simple method to copy the split info map.
+ private def generateNodeToWeight(
+ conf: Configuration,
+ input: collection.Map[String, collection.Set[SplitInfo]]
+ ): (Map[String, Int], Map[String, Int]) = {
+
+ if (input == null) {
+ return (Map[String, Int](), Map[String, Int]())
+ }
+
+ val hostToCount = new HashMap[String, Int]
+ val rackToCount = new HashMap[String, Int]
+
+ for ((host, splits) <- input) {
+ val hostCount = hostToCount.getOrElse(host, 0)
+ hostToCount.put(host, hostCount + splits.size)
+
+ val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
+ if (rack != null) {
+ val rackCount = rackToCount.getOrElse(host, 0)
+ rackToCount.put(host, rackCount + splits.size)
+ }
+ }
+
+ (hostToCount.toMap, rackToCount.toMap)
+ }
+
+ private def internalReleaseContainer(container: Container) = {
+ releasedContainers.put(container.getId(), true)
+ releaseContainer(container)
+ }
+
+ /**
+ * Called to allocate containers in the cluster.
+ *
+ * @param count Number of containers to allocate.
+ * If zero, should still contact RM (as a heartbeat).
+ * @return Response to the allocation request.
+ */
+ protected def allocateContainers(count: Int): YarnAllocateResponse
+
+ /** Called to release a previously allocated container. */
+ protected def releaseContainer(container: Container): Unit
+
+ /**
+ * Defines the interface for an allocate response from the RM. This is needed since the alpha
+ * and stable interfaces differ here in ways that cannot be fixed using other routes.
+ */
+ protected trait YarnAllocateResponse {
+
+ def getAllocatedContainers(): JList[Container]
+
+ def getAvailableResources(): Resource
+
+ def getCompletedContainersStatuses(): JList[ContainerStatus]
+
+ }
+
+} \ No newline at end of file
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4d51449899..ed31457b61 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -17,36 +17,19 @@
package org.apache.spark.deploy.yarn
-import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap}
-import java.util.concurrent.atomic.AtomicInteger
-
import scala.collection.JavaConversions._
-import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
+import scala.collection.mutable.{ArrayBuffer, HashMap}
-import org.apache.spark.{Logging, SparkConf, SparkEnv}
-import org.apache.spark.scheduler.{SplitInfo,TaskSchedulerImpl}
-import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-import org.apache.spark.util.Utils
+import org.apache.spark.SparkConf
+import org.apache.spark.scheduler.SplitInfo
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId
-import org.apache.hadoop.yarn.api.records.{Container, ContainerId}
-import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest}
-import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse}
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.util.Records
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
/**
* Acquires resources for executors from a ResourceManager and launches executors in new containers.
*/
@@ -57,329 +40,22 @@ private[yarn] class YarnAllocationHandler(
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
- extends YarnAllocator with Logging {
-
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<containerid>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
-
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
-
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
-
- // Containers which have been released.
- private val releasedContainerList = new CopyOnWriteArrayList[ContainerId]()
- // Containers to be released in next request to RM
- private val pendingReleaseContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- // Additional memory overhead - in mb.
- private def memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
- YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD)
-
- // Number of container requests that have been sent to, but not yet allocated by the
- // ApplicationMaster.
- private val numPendingAllocate = new AtomicInteger()
- private val numExecutorsRunning = new AtomicInteger()
- // Used to generate a unique id per executor
- private val executorIdCounter = new AtomicInteger()
- private val lastResponseId = new AtomicInteger()
- private val numExecutorsFailed = new AtomicInteger()
-
- private val maxExecutors = args.numExecutors
- private val executorMemory = args.executorMemory
- private val executorCores = args.executorCores
- private val (preferredHostToCount, preferredRackToCount) =
- generateNodeToWeight(conf, preferredNodes)
-
- override def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
- override def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
-
- def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + memoryOverhead)
- }
-
- def releaseContainer(container: Container) {
- val containerId = container.getId
- pendingReleaseContainers.put(containerId, true)
- amClient.releaseAssignedContainer(containerId)
+ override protected def releaseContainer(container: Container) = {
+ amClient.releaseAssignedContainer(container.getId())
}
- override def allocateResources() = {
- addResourceRequests(maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get())
+ override protected def allocateContainers(count: Int): YarnAllocateResponse = {
+ addResourceRequests(count)
// We have already set the container request. Poll the ResourceManager for a response.
// This doubles as a heartbeat if there are no pending container requests.
val progressIndicator = 0.1f
- val allocateResponse = amClient.allocate(progressIndicator)
-
- val allocatedContainers = allocateResponse.getAllocatedContainers()
- if (allocatedContainers.size > 0) {
- var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
- if (numPendingAllocateNow < 0) {
- numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
- }
-
- logDebug("""
- Allocated containers: %d
- Current executor count: %d
- Containers released: %s
- Containers to-be-released: %s
- Cluster resources: %s
- """.format(
- allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers,
- allocateResponse.getAvailableResources))
-
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (container <- allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // Add the accepted `container` to the host's list of already accepted,
- // allocated containers
- val host = container.getNodeId.getHost
- val containersForHost = hostToContainers.getOrElseUpdate(host,
- new ArrayBuffer[Container]())
- containersForHost += container
- } else {
- // Release container, since it doesn't satisfy resource constraints.
- releaseContainer(container)
- }
- }
-
- // Find the appropriate containers to use.
- // TODO: Cleanup this group-by...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet) {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- val remainingContainersOpt = hostToContainers.get(candidateHost)
- assert(remainingContainersOpt.isDefined)
- var remainingContainers = remainingContainersOpt.get
-
- if (requiredHostCount >= remainingContainers.size) {
- // Since we have <= required containers, add all remaining containers to
- // `dataLocalContainers`.
- dataLocalContainers.put(candidateHost, remainingContainers)
- // There are no more free containers remaining.
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
-
- // Invariant: remainingContainers == remaining
-
- // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
- // Add each container in `remaining` to list of containers to release. If we have an
- // insufficient number of containers, then the next allocation cycle will reallocate
- // (but won't treat it as data local).
- // TODO(harvey): Rephrase this comment some more.
- for (container <- remaining) releaseContainer(container)
- remainingContainers = null
- }
-
- // For rack local containers
- if (remainingContainers != null) {
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.getOrElse(rack, List()).size
-
- if (requiredRackCount >= remainingContainers.size) {
- // Add all remaining containers to to `dataLocalContainers`.
- dataLocalContainers.put(rack, remainingContainers)
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // Container list has more containers that we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
-
- remainingContainers = remaining
- }
- }
- }
-
- if (remainingContainers != null) {
- // Not all containers have been consumed - add them to the list of off-rack containers.
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order:
- // first host-local, then rack-local, and finally off-rack.
- // Note that the list we create below tries to ensure that not all containers end up within
- // a host if there is a sufficiently large number of hosts/containers.
- val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers.
- for (container <- allocatedContainersToProcess) {
- val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
- val executorHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- val executorMemoryOverhead = (executorMemory + memoryOverhead)
- assert(container.getResource.getMemory >= executorMemoryOverhead)
-
- if (numExecutorsRunningNow > maxExecutors) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, executorHostname))
- releaseContainer(container)
- numExecutorsRunning.decrementAndGet()
- } else {
- val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
-
- // To be safe, remove the container from `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
-
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
- val executorRunnable = new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores)
- new Thread(executorRunnable).start()
- }
- }
- logDebug("""
- Finished allocating %s containers (from %s originally).
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- allocatedContainersToProcess,
- allocatedContainers,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
-
- val completedContainers = allocateResponse.getCompletedContainersStatuses()
- if (completedContainers.size > 0) {
- logDebug("Completed %d containers".format(completedContainers.size))
-
- for (completedContainer <- completedContainers) {
- val containerId = completedContainer.getContainerId
-
- if (pendingReleaseContainers.containsKey(containerId)) {
- // YarnAllocationHandler already marked the container for release, so remove it from
- // `pendingReleaseContainers`.
- pendingReleaseContainers.remove(containerId)
- } else {
- // Decrement the number of executors running. The next iteration of
- // the ApplicationMaster's reporting thread will take care of allocating.
- numExecutorsRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus()))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus() != 0) {
- logInfo("Container marked as failed: " + containerId)
- numExecutorsFailed.incrementAndGet()
- }
- }
-
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val hostOpt = allocatedContainerToHostMap.get(containerId)
- assert(hostOpt.isDefined)
- val host = hostOpt.get
-
- val containerSetOpt = allocatedHostToContainersMap.get(host)
- assert(containerSetOpt.isDefined)
- val containerSet = containerSetOpt.get
-
- containerSet.remove(containerId)
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
-
- allocatedContainerToHostMap.remove(containerId)
-
- // TODO: Move this part outside the synchronized block?
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of executors running: %d,
- releasedContainerList: %s,
- pendingReleaseContainers: %s
- """.format(
- completedContainers.size,
- numExecutorsRunning.get(),
- releasedContainerList,
- pendingReleaseContainers))
- }
+ new StableAllocateResponse(amClient.allocate(progressIndicator))
}
- def createRackResourceRequests(
+ private def createRackResourceRequests(
hostContainers: ArrayBuffer[ContainerRequest]
): ArrayBuffer[ContainerRequest] = {
// Generate modified racks and new set of hosts under it before issuing requests.
@@ -409,22 +85,6 @@ private[yarn] class YarnAllocationHandler(
requestedContainers
}
- def allocatedContainersOnHost(host: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedHostToContainersMap.getOrElse(host, Set()).size
- }
- retval
- }
-
- def allocatedContainersOnRack(rack: String): Int = {
- var retval = 0
- allocatedHostToContainersMap.synchronized {
- retval = allocatedRackCount.getOrElse(rack, 0)
- }
- retval
- }
-
private def addResourceRequests(numExecutors: Int) {
val containerRequests: List[ContainerRequest] =
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
@@ -472,15 +132,6 @@ private[yarn] class YarnAllocationHandler(
amClient.addContainerRequest(request)
}
- if (numExecutors > 0) {
- numPendingAllocate.addAndGet(numExecutors)
- logInfo("Will Allocate %d executor containers, each with %d memory".format(
- numExecutors,
- (executorMemory + memoryOverhead)))
- } else {
- logDebug("Empty allocation request ...")
- }
-
for (request <- containerRequests) {
val nodes = request.getNodes
var hostStr = if (nodes == null || nodes.isEmpty) {
@@ -549,31 +200,10 @@ private[yarn] class YarnAllocationHandler(
requests
}
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]]
- ): (Map[String, Int], Map[String, Int]) = {
-
- if (input == null) {
- return (Map[String, Int](), Map[String, Int]())
- }
-
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null){
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
- }
- }
-
- (hostToCount.toMap, rackToCount.toMap)
+ private class StableAllocateResponse(response: AllocateResponse) extends YarnAllocateResponse {
+ override def getAllocatedContainers() = response.getAllocatedContainers()
+ override def getAvailableResources() = response.getAvailableResources()
+ override def getCompletedContainersStatuses() = response.getCompletedContainersStatuses()
}
}