aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2015-01-21 10:31:54 -0600
committerThomas Graves <tgraves@apache.org>2015-01-21 10:31:54 -0600
commit2eeada373e59d63b774ba92eb5d75fcd3a1cf8f4 (patch)
treea2fe6ff0c066512ae7d90447a4ce5e3e5dde5386 /yarn
parent8c06a5faacfc71050461273133b9cf9a9dd8986f (diff)
downloadspark-2eeada373e59d63b774ba92eb5d75fcd3a1cf8f4.tar.gz
spark-2eeada373e59d63b774ba92eb5d75fcd3a1cf8f4.tar.bz2
spark-2eeada373e59d63b774ba92eb5d75fcd3a1cf8f4.zip
SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnA...
...llocator The goal of this PR is to simplify YarnAllocator as much as possible and get it up to the level of code quality we see in the rest of Spark. In service of this, it does a few things: * Uses AMRMClient APIs for matching containers to requests. * Adds calls to AMRMClient.removeContainerRequest so that, when we use a container, we don't end up requesting it again. * Removes YarnAllocator's host->rack cache. YARN's RackResolver already does this caching, so this is redundant. * Adds tests for basic YarnAllocator functionality. * Breaks up the allocateResources method, which was previously nearly 300 lines. * A little bit of stylistic cleanup. * Fixes a bug that causes three times the requests to be filed when preferred host locations are given. The patch is lossy. In particular, it loses the logic for trying to avoid containers bunching up on nodes. As I understand it, the logic that's gone is: * If, in a single response from the RM, we receive a set of containers on a node, and prefer some number of containers on that node greater than 0 but less than the number we received, give back the delta between what we preferred and what we received. This seems like a weird way to avoid bunching E.g. it does nothing to avoid bunching when we don't request containers on particular nodes. Author: Sandy Ryza <sandy@cloudera.com> Closes #3765 from sryza/sandy-spark-1714 and squashes the following commits: 32a5942 [Sandy Ryza] Muffle RackResolver logs 74f56dd [Sandy Ryza] Fix a couple comments and simplify requestTotalExecutors 60ea4bd [Sandy Ryza] Fix scalastyle ca35b53 [Sandy Ryza] Simplify further e9cf8a6 [Sandy Ryza] Fix YarnClusterSuite 257acf3 [Sandy Ryza] Remove locality stuff and more cleanup 59a3c5e [Sandy Ryza] Take out rack stuff 5f72fd5 [Sandy Ryza] Further documentation and cleanup 89edd68 [Sandy Ryza] SPARK-1714. Take advantage of AMRMClient APIs to simplify logic in YarnAllocator
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala733
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala41
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala150
6 files changed, 388 insertions, 550 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index de65ef23ad..4c35b60c57 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,8 +17,8 @@
package org.apache.spark.deploy.yarn
+import java.util.Collections
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.collection.JavaConversions._
@@ -28,33 +28,26 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
+ * what to do with containers when YARN fulfills these requests.
+ *
+ * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
+ * * Making our resource needs known, which updates local bookkeeping about containers requested.
+ * * Calling "allocate", which syncs our local container requests with the RM, and returns any
+ * containers that YARN has granted to us. This also functions as a heartbeat.
+ * * Processing the containers granted to us to possibly launch executors inside of them.
+ *
+ * The public methods of this class are thread-safe. All methods that mutate state are
+ * synchronized.
*/
private[yarn] class YarnAllocator(
conf: Configuration,
@@ -62,50 +55,42 @@ private[yarn] class YarnAllocator(
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends Logging {
import YarnAllocator._
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<ContainerId>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
+ // These two complementary data structures are locked on allocatedHostToContainersMap.
+ // Visible for testing.
+ val allocatedHostToContainersMap =
+ new HashMap[String, collection.mutable.Set[ContainerId]]
+ val allocatedContainerToHostMap = new HashMap[ContainerId, String]
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+ // Containers that we no longer care about. We've either already told the RM to release them or
+ // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
+ // completed.
+ private val releasedContainers = Collections.newSetFromMap[ContainerId](
+ new ConcurrentHashMap[ContainerId, java.lang.Boolean])
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
+ @volatile private var numExecutorsRunning = 0
+ // Used to generate a unique ID per executor
+ private var executorIdCounter = 0
+ @volatile private var numExecutorsFailed = 0
- // Containers to be released in next request to RM
- private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- // Number of container requests that have been sent to, but not yet allocated by the
- // ApplicationMaster.
- private val numPendingAllocate = new AtomicInteger()
- private val numExecutorsRunning = new AtomicInteger()
- // Used to generate a unique id per executor
- private val executorIdCounter = new AtomicInteger()
- private val numExecutorsFailed = new AtomicInteger()
-
- private var maxExecutors = args.numExecutors
+ @volatile private var maxExecutors = args.numExecutors
// Keep track of which container is running which executor to remove the executors later
private val executorIdToContainer = new HashMap[String, Container]
+ // Executor memory in MB.
protected val executorMemory = args.executorMemory
- protected val executorCores = args.executorCores
- protected val (preferredHostToCount, preferredRackToCount) =
- generateNodeToWeight(conf, preferredNodes)
-
- // Additional memory overhead - in mb.
+ // Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ // Number of cores per executor.
+ protected val executorCores = args.executorCores
+ // Resource capability requested for each executors
+ private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -115,26 +100,34 @@ private[yarn] class YarnAllocator(
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)
- def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+ sparkConf.get("spark.driver.host"),
+ sparkConf.get("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ // For testing
+ private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
- def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning
+
+ def getNumExecutorsFailed: Int = numExecutorsFailed
+
+ /**
+ * Number of container requests that have not yet been fulfilled.
+ */
+ def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+
+ /**
+ * Number of container requests at the given location that have not yet been fulfilled.
+ */
+ private def getNumPendingAtLocation(location: String): Int =
+ amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
/**
* Request as many executors from the ResourceManager as needed to reach the desired total.
- * This takes into account executors already running or pending.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
- val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
- if (requestedTotal > currentTotal) {
- maxExecutors += (requestedTotal - currentTotal)
- // We need to call `allocateResources` here to avoid the following race condition:
- // If we request executors twice before `allocateResources` is called, then we will end up
- // double counting the number requested because `numPendingAllocate` is not updated yet.
- allocateResources()
- } else {
- logInfo(s"Not allocating more executors because there are already $currentTotal " +
- s"(application requested $requestedTotal total)")
- }
+ maxExecutors = requestedTotal
}
/**
@@ -144,7 +137,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
- numExecutorsRunning.decrementAndGet()
+ numExecutorsRunning -= 1
maxExecutors -= 1
assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
@@ -153,498 +146,236 @@ private[yarn] class YarnAllocator(
}
/**
- * Allocate missing containers based on the number of executors currently pending and running.
+ * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
+ * equal to maxExecutors.
*
- * This method prioritizes the allocated container responses from the RM based on node and
- * rack locality. Additionally, it releases any extra containers allocated for this application
- * but are not needed. This must be synchronized because variables read in this block are
- * mutated by other methods.
+ * Deal with any containers YARN has granted to us by possibly launching executors in them.
+ *
+ * This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
- val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+ val numPendingAllocate = getNumPendingAllocate
+ val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
- val totalExecutorMemory = executorMemory + memoryOverhead
- numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
- s"memory including $memoryOverhead MB overhead")
- } else {
- logDebug("Empty allocation request ...")
+ logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
+ s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
}
- val allocateResponse = allocateContainers(missing)
+ addResourceRequests(missing)
+ val progressIndicator = 0.1f
+ // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
+ // requests.
+ val allocateResponse = amClient.allocate(progressIndicator)
+
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
- var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
- if (numPendingAllocateNow < 0) {
- numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
- }
-
- logDebug("""
- Allocated containers: %d
- Current executor count: %d
- Containers released: %s
- Cluster resources: %s
- """.format(
+ logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+ .format(
allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainers,
+ numExecutorsRunning,
allocateResponse.getAvailableResources))
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (container <- allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // Add the accepted `container` to the host's list of already accepted,
- // allocated containers
- val host = container.getNodeId.getHost
- val containersForHost = hostToContainers.getOrElseUpdate(host,
- new ArrayBuffer[Container]())
- containersForHost += container
- } else {
- // Release container, since it doesn't satisfy resource constraints.
- internalReleaseContainer(container)
- }
- }
-
- // Find the appropriate containers to use.
- // TODO: Cleanup this group-by...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet) {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- val remainingContainersOpt = hostToContainers.get(candidateHost)
- assert(remainingContainersOpt.isDefined)
- var remainingContainers = remainingContainersOpt.get
-
- if (requiredHostCount >= remainingContainers.size) {
- // Since we have <= required containers, add all remaining containers to
- // `dataLocalContainers`.
- dataLocalContainers.put(candidateHost, remainingContainers)
- // There are no more free containers remaining.
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
-
- // Invariant: remainingContainers == remaining
-
- // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
- // Add each container in `remaining` to list of containers to release. If we have an
- // insufficient number of containers, then the next allocation cycle will reallocate
- // (but won't treat it as data local).
- // TODO(harvey): Rephrase this comment some more.
- for (container <- remaining) internalReleaseContainer(container)
- remainingContainers = null
- }
-
- // For rack local containers
- if (remainingContainers != null) {
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.getOrElse(rack, List()).size
-
- if (requiredRackCount >= remainingContainers.size) {
- // Add all remaining containers to to `dataLocalContainers`.
- dataLocalContainers.put(rack, remainingContainers)
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // Container list has more containers that we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
-
- remainingContainers = remaining
- }
- }
- }
-
- if (remainingContainers != null) {
- // Not all containers have been consumed - add them to the list of off-rack containers.
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order:
- // first host-local, then rack-local, and finally off-rack.
- // Note that the list we create below tries to ensure that not all containers end up within
- // a host if there is a sufficiently large number of hosts/containers.
- val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers.
- for (container <- allocatedContainersToProcess) {
- val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
- val executorHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- val executorMemoryOverhead = (executorMemory + memoryOverhead)
- assert(container.getResource.getMemory >= executorMemoryOverhead)
-
- if (numExecutorsRunningNow > maxExecutors) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, executorHostname))
- internalReleaseContainer(container)
- numExecutorsRunning.decrementAndGet()
- } else {
- val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
- executorIdToContainer(executorId) = container
-
- // To be safe, remove the container from `releasedContainers`.
- releasedContainers.remove(containerId)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
-
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
- val executorRunnable = new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores,
- appAttemptId.getApplicationId.toString,
- securityMgr)
- launcherPool.execute(executorRunnable)
- }
- }
- logDebug("""
- Finished allocating %s containers (from %s originally).
- Current number of executors running: %d,
- Released containers: %s
- """.format(
- allocatedContainersToProcess,
- allocatedContainers,
- numExecutorsRunning.get(),
- releasedContainers))
+ handleAllocatedContainers(allocatedContainers)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
- for (completedContainer <- completedContainers) {
- val containerId = completedContainer.getContainerId
-
- if (releasedContainers.containsKey(containerId)) {
- // Already marked the container for release, so remove it from
- // `releasedContainers`.
- releasedContainers.remove(containerId)
- } else {
- // Decrement the number of executors running. The next iteration of
- // the ApplicationMaster's reporting thread will take care of allocating.
- numExecutorsRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- VMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- PMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus != 0) {
- logInfo("Container marked as failed: " + containerId +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
- numExecutorsFailed.incrementAndGet()
- }
- }
+ processCompletedContainers(completedContainers)
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val hostOpt = allocatedContainerToHostMap.get(containerId)
- assert(hostOpt.isDefined)
- val host = hostOpt.get
-
- val containerSetOpt = allocatedHostToContainersMap.get(host)
- assert(containerSetOpt.isDefined)
- val containerSet = containerSetOpt.get
-
- containerSet.remove(containerId)
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
-
- allocatedContainerToHostMap.remove(containerId)
-
- // TODO: Move this part outside the synchronized block?
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of executors running: %d,
- Released containers: %s
- """.format(
- completedContainers.size,
- numExecutorsRunning.get(),
- releasedContainers))
+ logDebug("Finished processing %d completed containers. Current running executor count: %d."
+ .format(completedContainers.size, numExecutorsRunning))
}
}
- private def allocatedContainersOnHost(host: String): Int = {
- allocatedHostToContainersMap.synchronized {
- allocatedHostToContainersMap.getOrElse(host, Set()).size
+ /**
+ * Request numExecutors additional containers from YARN. Visible for testing.
+ */
+ def addResourceRequests(numExecutors: Int): Unit = {
+ for (i <- 0 until numExecutors) {
+ val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+ logInfo("Container request (host: %s, capability: %s".format(hostStr, resource))
}
}
- private def allocatedContainersOnRack(rack: String): Int = {
- allocatedHostToContainersMap.synchronized {
- allocatedRackCount.getOrElse(rack, 0)
+ /**
+ * Handle containers granted by the RM by launching executors on them.
+ *
+ * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
+ * in YARN granting containers that we no longer need. In this case, we release them.
+ *
+ * Visible for testing.
+ */
+ def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
+ val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
+
+ // Match incoming requests by host
+ val remainingAfterHostMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- allocatedContainers) {
+ matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
+ containersToUse, remainingAfterHostMatches)
}
- }
-
- private def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + memoryOverhead)
- }
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]])
- : (Map[String, Int], Map[String, Int]) = {
- if (input == null) {
- return (Map[String, Int](), Map[String, Int]())
+ // Match remaining by rack
+ val remainingAfterRackMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- remainingAfterHostMatches) {
+ val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
+ matchContainerToRequest(allocatedContainer, rack, containersToUse,
+ remainingAfterRackMatches)
}
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
+ // Assign remaining that are neither node-local nor rack-local
+ val remainingAfterOffRackMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- remainingAfterRackMatches) {
+ matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
+ remainingAfterOffRackMatches)
+ }
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
+ if (!remainingAfterOffRackMatches.isEmpty) {
+ logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
+ s"allocated to us")
+ for (container <- remainingAfterOffRackMatches) {
+ internalReleaseContainer(container)
}
}
- (hostToCount.toMap, rackToCount.toMap)
- }
+ runAllocatedContainers(containersToUse)
- private def internalReleaseContainer(container: Container): Unit = {
- releasedContainers.put(container.getId(), true)
- amClient.releaseAssignedContainer(container.getId())
+ logInfo("Received %d containers from YARN, launching executors on %d of them."
+ .format(allocatedContainers.size, containersToUse.size))
}
/**
- * Called to allocate containers in the cluster.
+ * Looks for requests for the given location that match the given container allocation. If it
+ * finds one, removes the request so that it won't be submitted again. Places the container into
+ * containersToUse or remaining.
*
- * @param count Number of containers to allocate.
- * If zero, should still contact RM (as a heartbeat).
- * @return Response to the allocation request.
+ * @param allocatedContainer container that was given to us by YARN
+ * @location resource name, either a node, rack, or *
+ * @param containersToUse list of containers that will be used
+ * @param remaining list of containers that will not be used
*/
- private def allocateContainers(count: Int): AllocateResponse = {
- addResourceRequests(count)
-
- // We have already set the container request. Poll the ResourceManager for a response.
- // This doubles as a heartbeat if there are no pending container requests.
- val progressIndicator = 0.1f
- amClient.allocate(progressIndicator)
+ private def matchContainerToRequest(
+ allocatedContainer: Container,
+ location: String,
+ containersToUse: ArrayBuffer[Container],
+ remaining: ArrayBuffer[Container]): Unit = {
+ val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
+ allocatedContainer.getResource)
+
+ // Match the allocation to a request
+ if (!matchingRequests.isEmpty) {
+ val containerRequest = matchingRequests.get(0).iterator.next
+ amClient.removeContainerRequest(containerRequest)
+ containersToUse += allocatedContainer
+ } else {
+ remaining += allocatedContainer
+ }
}
- private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest])
- : ArrayBuffer[ContainerRequest] = {
- // Generate modified racks and new set of hosts under it before issuing requests.
- val rackToCounts = new HashMap[String, Int]()
-
- for (container <- hostContainers) {
- val candidateHost = container.getNodes.last
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += 1
- rackToCounts.put(rack, count)
+ /**
+ * Launches executors in the allocated containers.
+ */
+ private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
+ for (container <- containersToUse) {
+ numExecutorsRunning += 1
+ assert(numExecutorsRunning <= maxExecutors)
+ val executorHostname = container.getNodeId.getHost
+ val containerId = container.getId
+ executorIdCounter += 1
+ val executorId = executorIdCounter.toString
+
+ assert(container.getResource.getMemory >= resource.getMemory)
+
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ new HashSet[ContainerId])
+
+ containerSet += containerId
+ allocatedContainerToHostMap.put(containerId, executorHostname)
+
+ val executorRunnable = new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr)
+ if (launchContainers) {
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
+ driverUrl, executorHostname))
+ launcherPool.execute(executorRunnable)
}
}
-
- val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts) {
- requestedContainers ++= createResourceRequests(
- AllocationType.RACK,
- rack,
- count,
- RM_REQUEST_PRIORITY)
- }
-
- requestedContainers
}
- private def addResourceRequests(numExecutors: Int): Unit = {
- val containerRequests: List[ContainerRequest] =
- if (numExecutors <= 0) {
- logDebug("numExecutors: " + numExecutors)
- List()
- } else if (preferredHostToCount.isEmpty) {
- logDebug("host preferences is empty")
- createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- RM_REQUEST_PRIORITY).toList
+ private def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+ for (completedContainer <- completedContainers) {
+ val containerId = completedContainer.getContainerId
+
+ if (releasedContainers.contains(containerId)) {
+ // Already marked the container for release, so remove it from
+ // `releasedContainers`.
+ releasedContainers.remove(containerId)
} else {
- // Request for all hosts in preferred nodes and for numExecutors -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests ++= createResourceRequests(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- RM_REQUEST_PRIORITY)
- }
+ // Decrement the number of executors running. The next iteration of
+ // the ApplicationMaster's reporting thread will take care of allocating.
+ numExecutorsRunning -= 1
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus))
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus != 0) {
+ logInfo("Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+ numExecutorsFailed += 1
}
- val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
- hostContainerRequests).toList
-
- val anyContainerRequests = createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- RM_REQUEST_PRIORITY)
-
- val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
- hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size)
-
- containerRequestBuffer ++= hostContainerRequests
- containerRequestBuffer ++= rackContainerRequests
- containerRequestBuffer ++= anyContainerRequests
- containerRequestBuffer.toList
}
- for (request <- containerRequests) {
- amClient.addContainerRequest(request)
- }
+ allocatedHostToContainersMap.synchronized {
+ if (allocatedContainerToHostMap.containsKey(containerId)) {
+ val host = allocatedContainerToHostMap.get(containerId).get
+ val containerSet = allocatedHostToContainersMap.get(host).get
- for (request <- containerRequests) {
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) {
- "Any"
- } else {
- nodes.last
- }
- logInfo("Container request (host: %s, priority: %s, capability: %s".format(
- hostStr,
- request.getPriority().getPriority,
- request.getCapability))
- }
- }
+ containerSet.remove(containerId)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
- private def createResourceRequests(
- requestType: AllocationType.AllocationType,
- resource: String,
- numExecutors: Int,
- priority: Int): ArrayBuffer[ContainerRequest] = {
- // If hostname is specified, then we need at least two requests - node local and rack local.
- // There must be a third request, which is ANY. That will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnSparkHadoopUtil.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = constructContainerRequests(
- Array(hostname),
- racks = null,
- numExecutors,
- priority)
-
- // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
+ allocatedContainerToHostMap.remove(containerId)
+ }
}
- case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numExecutors, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
}
}
- private def constructContainerRequests(
- hosts: Array[String],
- racks: Array[String],
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = executorMemory + memoryOverhead
- val resource = Resource.newInstance(memoryRequest, executorCores)
-
- val prioritySetting = Records.newRecord(classOf[Priority])
- prioritySetting.setPriority(priority)
-
- val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numExecutors) {
- requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
- }
- requests
+ private def internalReleaseContainer(container: Container): Unit = {
+ releasedContainers.add(container.getId())
+ amClient.releaseAssignedContainer(container.getId())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b45e599588..b134751366 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
- preferredNodeLocations, securityMgr)
+ new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d7cf904db1..4bff846123 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
- val RM_REQUEST_PRIORITY = 1
-
- // Host to rack map - saved from allocation requests. We are expecting this not to change.
- // Note that it is possible for this to change : and ResourceManager will indicate that to us via
- // update response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+ val RM_REQUEST_PRIORITY = Priority.newInstance(1)
/**
* Add a path variable to the given environment map.
@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
}
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) {
- populateRackInfo(conf, host)
- }
- hostToRack.get(host)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list.
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out what this comment means...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
-
def getApplicationAclsForYarn(securityMgr: SecurityManager)
: Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 254774a6b8..2fa24cc433 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -17,8 +17,9 @@
package org.apache.spark.scheduler.cluster
+import org.apache.hadoop.yarn.util.RackResolver
+
import org.apache.spark._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 4157ff95c2..be55d26f1c 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,8 +17,10 @@
package org.apache.spark.scheduler.cluster
+import org.apache.hadoop.yarn.util.RackResolver
+
import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
override def postStartHook() {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 8d184a09d6..024b25f9d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,18 +17,160 @@
package org.apache.spark.deploy.yarn
+import java.util.{Arrays, List => JList}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.scalatest.FunSuite
+import org.apache.spark.scheduler.SplitInfo
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+
+class MockResolver extends DNSToSwitchMapping {
+
+ override def resolve(names: JList[String]): JList[String] = {
+ if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
+ else Arrays.asList("/rack1")
+ }
+
+ override def reloadCachedMappings() {}
+
+ def reloadCachedMappings(names: JList[String]) {}
+}
+
+class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
+ val conf = new Configuration()
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.driver.host", "localhost")
+ sparkConf.set("spark.driver.port", "4040")
+ sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+ sparkConf.set("spark.yarn.launchContainers", "false")
+
+ val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
+
+ // Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores
+ // instead of the 5 requested and 3 GB instead of the 2 requested.
+ val containerResource = Resource.newInstance(3072, 6)
+
+ var rmClient: AMRMClient[ContainerRequest] = _
+
+ var containerNum = 0
+
+ override def beforeEach() {
+ rmClient = AMRMClient.createAMRMClient()
+ rmClient.init(conf)
+ rmClient.start()
+ }
+
+ override def afterEach() {
+ rmClient.stop()
+ }
+
+ class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
+ override def equals(other: Any) = false
+ }
+
+ def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+ val args = Array(
+ "--num-executors", s"$maxExecutors",
+ "--executor-cores", "5",
+ "--executor-memory", "2048",
+ "--jar", "somejar.jar",
+ "--class", "SomeClass")
+ new YarnAllocator(
+ conf,
+ sparkConf,
+ rmClient,
+ appAttemptId,
+ new ApplicationMasterArguments(args),
+ new SecurityManager(sparkConf))
+ }
+
+ def createContainer(host: String): Container = {
+ val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+ containerNum += 1
+ val nodeId = NodeId.newInstance(host, 1000)
+ Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
+ }
+
+ test("single container allocated") {
+ // request a single container and receive it
+ val handler = createAllocator()
+ handler.addResourceRequests(1)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (1)
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container))
+
+ handler.getNumExecutorsRunning should be (1)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+ rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
+ }
+
+ test("some containers allocated") {
+ // request a few containers and receive some of them
+ val handler = createAllocator()
+ handler.addResourceRequests(4)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host1")
+ val container3 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+ handler.getNumExecutorsRunning should be (3)
+ handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
+ handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
+ }
+
+ test("receive more containers than requested") {
+ val handler = createAllocator(2)
+ handler.addResourceRequests(2)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (2)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ val container3 = createContainer("host4")
+ handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+ handler.getNumExecutorsRunning should be (2)
+ handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
+ handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+ handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
+ handler.allocatedHostToContainersMap.contains("host4") should be (false)
+ }
-class YarnAllocatorSuite extends FunSuite {
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
- "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
- "5.8 GB of 4.2 GB virtual memory used. Killing container."
+ "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
+ "5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
+
}