aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala733
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala3
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala41
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala5
-rw-r--r--yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala6
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala150
6 files changed, 388 insertions, 550 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index de65ef23ad..4c35b60c57 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -17,8 +17,8 @@
package org.apache.spark.deploy.yarn
+import java.util.Collections
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicInteger
import java.util.regex.Pattern
import scala.collection.JavaConversions._
@@ -28,33 +28,26 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
-import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
import org.apache.hadoop.yarn.client.api.AMRMClient
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
-import org.apache.hadoop.yarn.util.Records
+import org.apache.hadoop.yarn.util.RackResolver
-import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
+import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
-import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
-object AllocationType extends Enumeration {
- type AllocationType = Value
- val HOST, RACK, ANY = Value
-}
-
-// TODO:
-// Too many params.
-// Needs to be mt-safe
-// Need to refactor this to make it 'cleaner' ... right now, all computation is reactive - should
-// make it more proactive and decoupled.
-
-// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
-// more info on how we are requesting for containers.
-
/**
- * Acquires resources for executors from a ResourceManager and launches executors in new containers.
+ * YarnAllocator is charged with requesting containers from the YARN ResourceManager and deciding
+ * what to do with containers when YARN fulfills these requests.
+ *
+ * This class makes use of YARN's AMRMClient APIs. We interact with the AMRMClient in three ways:
+ * * Making our resource needs known, which updates local bookkeeping about containers requested.
+ * * Calling "allocate", which syncs our local container requests with the RM, and returns any
+ * containers that YARN has granted to us. This also functions as a heartbeat.
+ * * Processing the containers granted to us to possibly launch executors inside of them.
+ *
+ * The public methods of this class are thread-safe. All methods that mutate state are
+ * synchronized.
*/
private[yarn] class YarnAllocator(
conf: Configuration,
@@ -62,50 +55,42 @@ private[yarn] class YarnAllocator(
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
- preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends Logging {
import YarnAllocator._
- // These three are locked on allocatedHostToContainersMap. Complementary data structures
- // allocatedHostToContainersMap : containers which are running : host, Set<ContainerId>
- // allocatedContainerToHostMap: container to host mapping.
- private val allocatedHostToContainersMap =
- new HashMap[String, collection.mutable.Set[ContainerId]]()
+ // These two complementary data structures are locked on allocatedHostToContainersMap.
+ // Visible for testing.
+ val allocatedHostToContainersMap =
+ new HashMap[String, collection.mutable.Set[ContainerId]]
+ val allocatedContainerToHostMap = new HashMap[ContainerId, String]
- private val allocatedContainerToHostMap = new HashMap[ContainerId, String]()
+ // Containers that we no longer care about. We've either already told the RM to release them or
+ // will on the next heartbeat. Containers get removed from this map after the RM tells us they've
+ // completed.
+ private val releasedContainers = Collections.newSetFromMap[ContainerId](
+ new ConcurrentHashMap[ContainerId, java.lang.Boolean])
- // allocatedRackCount is populated ONLY if allocation happens (or decremented if this is an
- // allocated node)
- // As with the two data structures above, tightly coupled with them, and to be locked on
- // allocatedHostToContainersMap
- private val allocatedRackCount = new HashMap[String, Int]()
+ @volatile private var numExecutorsRunning = 0
+ // Used to generate a unique ID per executor
+ private var executorIdCounter = 0
+ @volatile private var numExecutorsFailed = 0
- // Containers to be released in next request to RM
- private val releasedContainers = new ConcurrentHashMap[ContainerId, Boolean]
-
- // Number of container requests that have been sent to, but not yet allocated by the
- // ApplicationMaster.
- private val numPendingAllocate = new AtomicInteger()
- private val numExecutorsRunning = new AtomicInteger()
- // Used to generate a unique id per executor
- private val executorIdCounter = new AtomicInteger()
- private val numExecutorsFailed = new AtomicInteger()
-
- private var maxExecutors = args.numExecutors
+ @volatile private var maxExecutors = args.numExecutors
// Keep track of which container is running which executor to remove the executors later
private val executorIdToContainer = new HashMap[String, Container]
+ // Executor memory in MB.
protected val executorMemory = args.executorMemory
- protected val executorCores = args.executorCores
- protected val (preferredHostToCount, preferredRackToCount) =
- generateNodeToWeight(conf, preferredNodes)
-
- // Additional memory overhead - in mb.
+ // Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
+ // Number of cores per executor.
+ protected val executorCores = args.executorCores
+ // Resource capability requested for each executors
+ private val resource = Resource.newInstance(executorMemory + memoryOverhead, executorCores)
private val launcherPool = new ThreadPoolExecutor(
// max pool size of Integer.MAX_VALUE is ignored because we use an unbounded queue
@@ -115,26 +100,34 @@ private[yarn] class YarnAllocator(
new ThreadFactoryBuilder().setNameFormat("ContainerLauncher #%d").setDaemon(true).build())
launcherPool.allowCoreThreadTimeOut(true)
- def getNumExecutorsRunning: Int = numExecutorsRunning.intValue
+ private val driverUrl = "akka.tcp://sparkDriver@%s:%s/user/%s".format(
+ sparkConf.get("spark.driver.host"),
+ sparkConf.get("spark.driver.port"),
+ CoarseGrainedSchedulerBackend.ACTOR_NAME)
+
+ // For testing
+ private val launchContainers = sparkConf.getBoolean("spark.yarn.launchContainers", true)
- def getNumExecutorsFailed: Int = numExecutorsFailed.intValue
+ def getNumExecutorsRunning: Int = numExecutorsRunning
+
+ def getNumExecutorsFailed: Int = numExecutorsFailed
+
+ /**
+ * Number of container requests that have not yet been fulfilled.
+ */
+ def getNumPendingAllocate: Int = getNumPendingAtLocation(ANY_HOST)
+
+ /**
+ * Number of container requests at the given location that have not yet been fulfilled.
+ */
+ private def getNumPendingAtLocation(location: String): Int =
+ amClient.getMatchingRequests(RM_REQUEST_PRIORITY, location, resource).map(_.size).sum
/**
* Request as many executors from the ResourceManager as needed to reach the desired total.
- * This takes into account executors already running or pending.
*/
def requestTotalExecutors(requestedTotal: Int): Unit = synchronized {
- val currentTotal = numPendingAllocate.get + numExecutorsRunning.get
- if (requestedTotal > currentTotal) {
- maxExecutors += (requestedTotal - currentTotal)
- // We need to call `allocateResources` here to avoid the following race condition:
- // If we request executors twice before `allocateResources` is called, then we will end up
- // double counting the number requested because `numPendingAllocate` is not updated yet.
- allocateResources()
- } else {
- logInfo(s"Not allocating more executors because there are already $currentTotal " +
- s"(application requested $requestedTotal total)")
- }
+ maxExecutors = requestedTotal
}
/**
@@ -144,7 +137,7 @@ private[yarn] class YarnAllocator(
if (executorIdToContainer.contains(executorId)) {
val container = executorIdToContainer.remove(executorId).get
internalReleaseContainer(container)
- numExecutorsRunning.decrementAndGet()
+ numExecutorsRunning -= 1
maxExecutors -= 1
assert(maxExecutors >= 0, "Allocator killed more executors than are allocated!")
} else {
@@ -153,498 +146,236 @@ private[yarn] class YarnAllocator(
}
/**
- * Allocate missing containers based on the number of executors currently pending and running.
+ * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
+ * equal to maxExecutors.
*
- * This method prioritizes the allocated container responses from the RM based on node and
- * rack locality. Additionally, it releases any extra containers allocated for this application
- * but are not needed. This must be synchronized because variables read in this block are
- * mutated by other methods.
+ * Deal with any containers YARN has granted to us by possibly launching executors in them.
+ *
+ * This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
- val missing = maxExecutors - numPendingAllocate.get() - numExecutorsRunning.get()
+ val numPendingAllocate = getNumPendingAllocate
+ val missing = maxExecutors - numPendingAllocate - numExecutorsRunning
if (missing > 0) {
- val totalExecutorMemory = executorMemory + memoryOverhead
- numPendingAllocate.addAndGet(missing)
- logInfo(s"Will allocate $missing executor containers, each with $totalExecutorMemory MB " +
- s"memory including $memoryOverhead MB overhead")
- } else {
- logDebug("Empty allocation request ...")
+ logInfo(s"Will request $missing executor containers, each with ${resource.getVirtualCores} " +
+ s"cores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead")
}
- val allocateResponse = allocateContainers(missing)
+ addResourceRequests(missing)
+ val progressIndicator = 0.1f
+ // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
+ // requests.
+ val allocateResponse = amClient.allocate(progressIndicator)
+
val allocatedContainers = allocateResponse.getAllocatedContainers()
if (allocatedContainers.size > 0) {
- var numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * allocatedContainers.size)
-
- if (numPendingAllocateNow < 0) {
- numPendingAllocateNow = numPendingAllocate.addAndGet(-1 * numPendingAllocateNow)
- }
-
- logDebug("""
- Allocated containers: %d
- Current executor count: %d
- Containers released: %s
- Cluster resources: %s
- """.format(
+ logDebug("Allocated containers: %d. Current executor count: %d. Cluster resources: %s."
+ .format(
allocatedContainers.size,
- numExecutorsRunning.get(),
- releasedContainers,
+ numExecutorsRunning,
allocateResponse.getAvailableResources))
- val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (container <- allocatedContainers) {
- if (isResourceConstraintSatisfied(container)) {
- // Add the accepted `container` to the host's list of already accepted,
- // allocated containers
- val host = container.getNodeId.getHost
- val containersForHost = hostToContainers.getOrElseUpdate(host,
- new ArrayBuffer[Container]())
- containersForHost += container
- } else {
- // Release container, since it doesn't satisfy resource constraints.
- internalReleaseContainer(container)
- }
- }
-
- // Find the appropriate containers to use.
- // TODO: Cleanup this group-by...
- val dataLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val rackLocalContainers = new HashMap[String, ArrayBuffer[Container]]()
- val offRackContainers = new HashMap[String, ArrayBuffer[Container]]()
-
- for (candidateHost <- hostToContainers.keySet) {
- val maxExpectedHostCount = preferredHostToCount.getOrElse(candidateHost, 0)
- val requiredHostCount = maxExpectedHostCount - allocatedContainersOnHost(candidateHost)
-
- val remainingContainersOpt = hostToContainers.get(candidateHost)
- assert(remainingContainersOpt.isDefined)
- var remainingContainers = remainingContainersOpt.get
-
- if (requiredHostCount >= remainingContainers.size) {
- // Since we have <= required containers, add all remaining containers to
- // `dataLocalContainers`.
- dataLocalContainers.put(candidateHost, remainingContainers)
- // There are no more free containers remaining.
- remainingContainers = null
- } else if (requiredHostCount > 0) {
- // Container list has more containers than we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (dataLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredHostCount)
- dataLocalContainers.put(candidateHost, dataLocal)
-
- // Invariant: remainingContainers == remaining
-
- // YARN has a nasty habit of allocating a ton of containers on a host - discourage this.
- // Add each container in `remaining` to list of containers to release. If we have an
- // insufficient number of containers, then the next allocation cycle will reallocate
- // (but won't treat it as data local).
- // TODO(harvey): Rephrase this comment some more.
- for (container <- remaining) internalReleaseContainer(container)
- remainingContainers = null
- }
-
- // For rack local containers
- if (remainingContainers != null) {
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
- rackLocalContainers.getOrElse(rack, List()).size
-
- if (requiredRackCount >= remainingContainers.size) {
- // Add all remaining containers to to `dataLocalContainers`.
- dataLocalContainers.put(rack, remainingContainers)
- remainingContainers = null
- } else if (requiredRackCount > 0) {
- // Container list has more containers that we need for data locality.
- // Split the list into two: one based on the data local container count,
- // (`remainingContainers.size` - `requiredHostCount`), and the other to hold remaining
- // containers.
- val (rackLocal, remaining) = remainingContainers.splitAt(
- remainingContainers.size - requiredRackCount)
- val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
- new ArrayBuffer[Container]())
-
- existingRackLocal ++= rackLocal
-
- remainingContainers = remaining
- }
- }
- }
-
- if (remainingContainers != null) {
- // Not all containers have been consumed - add them to the list of off-rack containers.
- offRackContainers.put(candidateHost, remainingContainers)
- }
- }
-
- // Now that we have split the containers into various groups, go through them in order:
- // first host-local, then rack-local, and finally off-rack.
- // Note that the list we create below tries to ensure that not all containers end up within
- // a host if there is a sufficiently large number of hosts/containers.
- val allocatedContainersToProcess = new ArrayBuffer[Container](allocatedContainers.size)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(dataLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(rackLocalContainers)
- allocatedContainersToProcess ++= TaskSchedulerImpl.prioritizeContainers(offRackContainers)
-
- // Run each of the allocated containers.
- for (container <- allocatedContainersToProcess) {
- val numExecutorsRunningNow = numExecutorsRunning.incrementAndGet()
- val executorHostname = container.getNodeId.getHost
- val containerId = container.getId
-
- val executorMemoryOverhead = (executorMemory + memoryOverhead)
- assert(container.getResource.getMemory >= executorMemoryOverhead)
-
- if (numExecutorsRunningNow > maxExecutors) {
- logInfo("""Ignoring container %s at host %s, since we already have the required number of
- containers for it.""".format(containerId, executorHostname))
- internalReleaseContainer(container)
- numExecutorsRunning.decrementAndGet()
- } else {
- val executorId = executorIdCounter.incrementAndGet().toString
- val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format(
- SparkEnv.driverActorSystemName,
- sparkConf.get("spark.driver.host"),
- sparkConf.get("spark.driver.port"),
- CoarseGrainedSchedulerBackend.ACTOR_NAME)
-
- logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
- executorIdToContainer(executorId) = container
-
- // To be safe, remove the container from `releasedContainers`.
- releasedContainers.remove(containerId)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, executorHostname)
- allocatedHostToContainersMap.synchronized {
- val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
- new HashSet[ContainerId]())
-
- containerSet += containerId
- allocatedContainerToHostMap.put(containerId, executorHostname)
-
- if (rack != null) {
- allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
- }
- }
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
- driverUrl, executorHostname))
- val executorRunnable = new ExecutorRunnable(
- container,
- conf,
- sparkConf,
- driverUrl,
- executorId,
- executorHostname,
- executorMemory,
- executorCores,
- appAttemptId.getApplicationId.toString,
- securityMgr)
- launcherPool.execute(executorRunnable)
- }
- }
- logDebug("""
- Finished allocating %s containers (from %s originally).
- Current number of executors running: %d,
- Released containers: %s
- """.format(
- allocatedContainersToProcess,
- allocatedContainers,
- numExecutorsRunning.get(),
- releasedContainers))
+ handleAllocatedContainers(allocatedContainers)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
- for (completedContainer <- completedContainers) {
- val containerId = completedContainer.getContainerId
-
- if (releasedContainers.containsKey(containerId)) {
- // Already marked the container for release, so remove it from
- // `releasedContainers`.
- releasedContainers.remove(containerId)
- } else {
- // Decrement the number of executors running. The next iteration of
- // the ApplicationMaster's reporting thread will take care of allocating.
- numExecutorsRunning.decrementAndGet()
- logInfo("Completed container %s (state: %s, exit status: %s)".format(
- containerId,
- completedContainer.getState,
- completedContainer.getExitStatus))
- // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
- // there are some exit status' we shouldn't necessarily count against us, but for
- // now I think its ok as none of the containers are expected to exit
- if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- VMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
- logWarning(memLimitExceededLogMessage(
- completedContainer.getDiagnostics,
- PMEM_EXCEEDED_PATTERN))
- } else if (completedContainer.getExitStatus != 0) {
- logInfo("Container marked as failed: " + containerId +
- ". Exit status: " + completedContainer.getExitStatus +
- ". Diagnostics: " + completedContainer.getDiagnostics)
- numExecutorsFailed.incrementAndGet()
- }
- }
+ processCompletedContainers(completedContainers)
- allocatedHostToContainersMap.synchronized {
- if (allocatedContainerToHostMap.containsKey(containerId)) {
- val hostOpt = allocatedContainerToHostMap.get(containerId)
- assert(hostOpt.isDefined)
- val host = hostOpt.get
-
- val containerSetOpt = allocatedHostToContainersMap.get(host)
- assert(containerSetOpt.isDefined)
- val containerSet = containerSetOpt.get
-
- containerSet.remove(containerId)
- if (containerSet.isEmpty) {
- allocatedHostToContainersMap.remove(host)
- } else {
- allocatedHostToContainersMap.update(host, containerSet)
- }
-
- allocatedContainerToHostMap.remove(containerId)
-
- // TODO: Move this part outside the synchronized block?
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) {
- allocatedRackCount.put(rack, rackCount)
- } else {
- allocatedRackCount.remove(rack)
- }
- }
- }
- }
- }
- logDebug("""
- Finished processing %d completed containers.
- Current number of executors running: %d,
- Released containers: %s
- """.format(
- completedContainers.size,
- numExecutorsRunning.get(),
- releasedContainers))
+ logDebug("Finished processing %d completed containers. Current running executor count: %d."
+ .format(completedContainers.size, numExecutorsRunning))
}
}
- private def allocatedContainersOnHost(host: String): Int = {
- allocatedHostToContainersMap.synchronized {
- allocatedHostToContainersMap.getOrElse(host, Set()).size
+ /**
+ * Request numExecutors additional containers from YARN. Visible for testing.
+ */
+ def addResourceRequests(numExecutors: Int): Unit = {
+ for (i <- 0 until numExecutors) {
+ val request = new ContainerRequest(resource, null, null, RM_REQUEST_PRIORITY)
+ amClient.addContainerRequest(request)
+ val nodes = request.getNodes
+ val hostStr = if (nodes == null || nodes.isEmpty) "Any" else nodes.last
+ logInfo("Container request (host: %s, capability: %s".format(hostStr, resource))
}
}
- private def allocatedContainersOnRack(rack: String): Int = {
- allocatedHostToContainersMap.synchronized {
- allocatedRackCount.getOrElse(rack, 0)
+ /**
+ * Handle containers granted by the RM by launching executors on them.
+ *
+ * Due to the way the YARN allocation protocol works, certain healthy race conditions can result
+ * in YARN granting containers that we no longer need. In this case, we release them.
+ *
+ * Visible for testing.
+ */
+ def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
+ val containersToUse = new ArrayBuffer[Container](allocatedContainers.size)
+
+ // Match incoming requests by host
+ val remainingAfterHostMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- allocatedContainers) {
+ matchContainerToRequest(allocatedContainer, allocatedContainer.getNodeId.getHost,
+ containersToUse, remainingAfterHostMatches)
}
- }
-
- private def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (executorMemory + memoryOverhead)
- }
- // A simple method to copy the split info map.
- private def generateNodeToWeight(
- conf: Configuration,
- input: collection.Map[String, collection.Set[SplitInfo]])
- : (Map[String, Int], Map[String, Int]) = {
- if (input == null) {
- return (Map[String, Int](), Map[String, Int]())
+ // Match remaining by rack
+ val remainingAfterRackMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- remainingAfterHostMatches) {
+ val rack = RackResolver.resolve(conf, allocatedContainer.getNodeId.getHost).getNetworkLocation
+ matchContainerToRequest(allocatedContainer, rack, containersToUse,
+ remainingAfterRackMatches)
}
- val hostToCount = new HashMap[String, Int]
- val rackToCount = new HashMap[String, Int]
-
- for ((host, splits) <- input) {
- val hostCount = hostToCount.getOrElse(host, 0)
- hostToCount.put(host, hostCount + splits.size)
+ // Assign remaining that are neither node-local nor rack-local
+ val remainingAfterOffRackMatches = new ArrayBuffer[Container]
+ for (allocatedContainer <- remainingAfterRackMatches) {
+ matchContainerToRequest(allocatedContainer, ANY_HOST, containersToUse,
+ remainingAfterOffRackMatches)
+ }
- val rack = YarnSparkHadoopUtil.lookupRack(conf, host)
- if (rack != null) {
- val rackCount = rackToCount.getOrElse(host, 0)
- rackToCount.put(host, rackCount + splits.size)
+ if (!remainingAfterOffRackMatches.isEmpty) {
+ logDebug(s"Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
+ s"allocated to us")
+ for (container <- remainingAfterOffRackMatches) {
+ internalReleaseContainer(container)
}
}
- (hostToCount.toMap, rackToCount.toMap)
- }
+ runAllocatedContainers(containersToUse)
- private def internalReleaseContainer(container: Container): Unit = {
- releasedContainers.put(container.getId(), true)
- amClient.releaseAssignedContainer(container.getId())
+ logInfo("Received %d containers from YARN, launching executors on %d of them."
+ .format(allocatedContainers.size, containersToUse.size))
}
/**
- * Called to allocate containers in the cluster.
+ * Looks for requests for the given location that match the given container allocation. If it
+ * finds one, removes the request so that it won't be submitted again. Places the container into
+ * containersToUse or remaining.
*
- * @param count Number of containers to allocate.
- * If zero, should still contact RM (as a heartbeat).
- * @return Response to the allocation request.
+ * @param allocatedContainer container that was given to us by YARN
+ * @location resource name, either a node, rack, or *
+ * @param containersToUse list of containers that will be used
+ * @param remaining list of containers that will not be used
*/
- private def allocateContainers(count: Int): AllocateResponse = {
- addResourceRequests(count)
-
- // We have already set the container request. Poll the ResourceManager for a response.
- // This doubles as a heartbeat if there are no pending container requests.
- val progressIndicator = 0.1f
- amClient.allocate(progressIndicator)
+ private def matchContainerToRequest(
+ allocatedContainer: Container,
+ location: String,
+ containersToUse: ArrayBuffer[Container],
+ remaining: ArrayBuffer[Container]): Unit = {
+ val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
+ allocatedContainer.getResource)
+
+ // Match the allocation to a request
+ if (!matchingRequests.isEmpty) {
+ val containerRequest = matchingRequests.get(0).iterator.next
+ amClient.removeContainerRequest(containerRequest)
+ containersToUse += allocatedContainer
+ } else {
+ remaining += allocatedContainer
+ }
}
- private def createRackResourceRequests(hostContainers: ArrayBuffer[ContainerRequest])
- : ArrayBuffer[ContainerRequest] = {
- // Generate modified racks and new set of hosts under it before issuing requests.
- val rackToCounts = new HashMap[String, Int]()
-
- for (container <- hostContainers) {
- val candidateHost = container.getNodes.last
- assert(YarnSparkHadoopUtil.ANY_HOST != candidateHost)
-
- val rack = YarnSparkHadoopUtil.lookupRack(conf, candidateHost)
- if (rack != null) {
- var count = rackToCounts.getOrElse(rack, 0)
- count += 1
- rackToCounts.put(rack, count)
+ /**
+ * Launches executors in the allocated containers.
+ */
+ private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
+ for (container <- containersToUse) {
+ numExecutorsRunning += 1
+ assert(numExecutorsRunning <= maxExecutors)
+ val executorHostname = container.getNodeId.getHost
+ val containerId = container.getId
+ executorIdCounter += 1
+ val executorId = executorIdCounter.toString
+
+ assert(container.getResource.getMemory >= resource.getMemory)
+
+ logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
+
+ val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+ new HashSet[ContainerId])
+
+ containerSet += containerId
+ allocatedContainerToHostMap.put(containerId, executorHostname)
+
+ val executorRunnable = new ExecutorRunnable(
+ container,
+ conf,
+ sparkConf,
+ driverUrl,
+ executorId,
+ executorHostname,
+ executorMemory,
+ executorCores,
+ appAttemptId.getApplicationId.toString,
+ securityMgr)
+ if (launchContainers) {
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
+ driverUrl, executorHostname))
+ launcherPool.execute(executorRunnable)
}
}
-
- val requestedContainers = new ArrayBuffer[ContainerRequest](rackToCounts.size)
- for ((rack, count) <- rackToCounts) {
- requestedContainers ++= createResourceRequests(
- AllocationType.RACK,
- rack,
- count,
- RM_REQUEST_PRIORITY)
- }
-
- requestedContainers
}
- private def addResourceRequests(numExecutors: Int): Unit = {
- val containerRequests: List[ContainerRequest] =
- if (numExecutors <= 0) {
- logDebug("numExecutors: " + numExecutors)
- List()
- } else if (preferredHostToCount.isEmpty) {
- logDebug("host preferences is empty")
- createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- RM_REQUEST_PRIORITY).toList
+ private def processCompletedContainers(completedContainers: Seq[ContainerStatus]): Unit = {
+ for (completedContainer <- completedContainers) {
+ val containerId = completedContainer.getContainerId
+
+ if (releasedContainers.contains(containerId)) {
+ // Already marked the container for release, so remove it from
+ // `releasedContainers`.
+ releasedContainers.remove(containerId)
} else {
- // Request for all hosts in preferred nodes and for numExecutors -
- // candidates.size, request by default allocation policy.
- val hostContainerRequests = new ArrayBuffer[ContainerRequest](preferredHostToCount.size)
- for ((candidateHost, candidateCount) <- preferredHostToCount) {
- val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
-
- if (requiredCount > 0) {
- hostContainerRequests ++= createResourceRequests(
- AllocationType.HOST,
- candidateHost,
- requiredCount,
- RM_REQUEST_PRIORITY)
- }
+ // Decrement the number of executors running. The next iteration of
+ // the ApplicationMaster's reporting thread will take care of allocating.
+ numExecutorsRunning -= 1
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus))
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus == -103) { // vmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ VMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus == -104) { // pmem limit exceeded
+ logWarning(memLimitExceededLogMessage(
+ completedContainer.getDiagnostics,
+ PMEM_EXCEEDED_PATTERN))
+ } else if (completedContainer.getExitStatus != 0) {
+ logInfo("Container marked as failed: " + containerId +
+ ". Exit status: " + completedContainer.getExitStatus +
+ ". Diagnostics: " + completedContainer.getDiagnostics)
+ numExecutorsFailed += 1
}
- val rackContainerRequests: List[ContainerRequest] = createRackResourceRequests(
- hostContainerRequests).toList
-
- val anyContainerRequests = createResourceRequests(
- AllocationType.ANY,
- resource = null,
- numExecutors,
- RM_REQUEST_PRIORITY)
-
- val containerRequestBuffer = new ArrayBuffer[ContainerRequest](
- hostContainerRequests.size + rackContainerRequests.size + anyContainerRequests.size)
-
- containerRequestBuffer ++= hostContainerRequests
- containerRequestBuffer ++= rackContainerRequests
- containerRequestBuffer ++= anyContainerRequests
- containerRequestBuffer.toList
}
- for (request <- containerRequests) {
- amClient.addContainerRequest(request)
- }
+ allocatedHostToContainersMap.synchronized {
+ if (allocatedContainerToHostMap.containsKey(containerId)) {
+ val host = allocatedContainerToHostMap.get(containerId).get
+ val containerSet = allocatedHostToContainersMap.get(host).get
- for (request <- containerRequests) {
- val nodes = request.getNodes
- val hostStr = if (nodes == null || nodes.isEmpty) {
- "Any"
- } else {
- nodes.last
- }
- logInfo("Container request (host: %s, priority: %s, capability: %s".format(
- hostStr,
- request.getPriority().getPriority,
- request.getCapability))
- }
- }
+ containerSet.remove(containerId)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
- private def createResourceRequests(
- requestType: AllocationType.AllocationType,
- resource: String,
- numExecutors: Int,
- priority: Int): ArrayBuffer[ContainerRequest] = {
- // If hostname is specified, then we need at least two requests - node local and rack local.
- // There must be a third request, which is ANY. That will be specially handled.
- requestType match {
- case AllocationType.HOST => {
- assert(YarnSparkHadoopUtil.ANY_HOST != resource)
- val hostname = resource
- val nodeLocal = constructContainerRequests(
- Array(hostname),
- racks = null,
- numExecutors,
- priority)
-
- // Add `hostname` to the global (singleton) host->rack mapping in YarnAllocationHandler.
- YarnSparkHadoopUtil.populateRackInfo(conf, hostname)
- nodeLocal
- }
- case AllocationType.RACK => {
- val rack = resource
- constructContainerRequests(hosts = null, Array(rack), numExecutors, priority)
+ allocatedContainerToHostMap.remove(containerId)
+ }
}
- case AllocationType.ANY => constructContainerRequests(
- hosts = null, racks = null, numExecutors, priority)
- case _ => throw new IllegalArgumentException(
- "Unexpected/unsupported request type: " + requestType)
}
}
- private def constructContainerRequests(
- hosts: Array[String],
- racks: Array[String],
- numExecutors: Int,
- priority: Int
- ): ArrayBuffer[ContainerRequest] = {
- val memoryRequest = executorMemory + memoryOverhead
- val resource = Resource.newInstance(memoryRequest, executorCores)
-
- val prioritySetting = Records.newRecord(classOf[Priority])
- prioritySetting.setPriority(priority)
-
- val requests = new ArrayBuffer[ContainerRequest]()
- for (i <- 0 until numExecutors) {
- requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
- }
- requests
+ private def internalReleaseContainer(container: Container): Unit = {
+ releasedContainers.add(container.getId())
+ amClient.releaseAssignedContainer(container.getId())
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
index b45e599588..b134751366 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala
@@ -72,8 +72,7 @@ private[spark] class YarnRMClient(args: ApplicationMasterArguments) extends Logg
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
registered = true
}
- new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args,
- preferredNodeLocations, securityMgr)
+ new YarnAllocator(conf, sparkConf, amClient, getAttemptId(), args, securityMgr)
}
/**
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index d7cf904db1..4bff846123 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType
+import org.apache.hadoop.yarn.api.records.{Priority, ApplicationAccessType}
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration
@@ -99,13 +99,7 @@ object YarnSparkHadoopUtil {
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
- val RM_REQUEST_PRIORITY = 1
-
- // Host to rack map - saved from allocation requests. We are expecting this not to change.
- // Note that it is possible for this to change : and ResourceManager will indicate that to us via
- // update response to allocate. But we are punting on handling that for now.
- private val hostToRack = new ConcurrentHashMap[String, String]()
- private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
+ val RM_REQUEST_PRIORITY = Priority.newInstance(1)
/**
* Add a path variable to the given environment map.
@@ -184,37 +178,6 @@ object YarnSparkHadoopUtil {
}
}
- def lookupRack(conf: Configuration, host: String): String = {
- if (!hostToRack.contains(host)) {
- populateRackInfo(conf, host)
- }
- hostToRack.get(host)
- }
-
- def populateRackInfo(conf: Configuration, hostname: String) {
- Utils.checkHost(hostname)
-
- if (!hostToRack.containsKey(hostname)) {
- // If there are repeated failures to resolve, all to an ignore list.
- val rackInfo = RackResolver.resolve(conf, hostname)
- if (rackInfo != null && rackInfo.getNetworkLocation != null) {
- val rack = rackInfo.getNetworkLocation
- hostToRack.put(hostname, rack)
- if (! rackToHostSet.containsKey(rack)) {
- rackToHostSet.putIfAbsent(rack,
- Collections.newSetFromMap(new ConcurrentHashMap[String, JBoolean]()))
- }
- rackToHostSet.get(rack).add(hostname)
-
- // TODO(harvey): Figure out what this comment means...
- // Since RackResolver caches, we are disabling this for now ...
- } /* else {
- // right ? Else we will keep calling rack resolver in case we cant resolve rack info ...
- hostToRack.put(hostname, null)
- } */
- }
- }
-
def getApplicationAclsForYarn(securityMgr: SecurityManager)
: Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
index 254774a6b8..2fa24cc433 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala
@@ -17,8 +17,9 @@
package org.apache.spark.scheduler.cluster
+import org.apache.hadoop.yarn.util.RackResolver
+
import org.apache.spark._
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -30,6 +31,6 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext) extends TaskSc
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
index 4157ff95c2..be55d26f1c 100644
--- a/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
+++ b/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala
@@ -17,8 +17,10 @@
package org.apache.spark.scheduler.cluster
+import org.apache.hadoop.yarn.util.RackResolver
+
import org.apache.spark._
-import org.apache.spark.deploy.yarn.{ApplicationMaster, YarnSparkHadoopUtil}
+import org.apache.spark.deploy.yarn.ApplicationMaster
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.util.Utils
@@ -39,7 +41,7 @@ private[spark] class YarnClusterScheduler(sc: SparkContext) extends TaskSchedule
// By default, rack is unknown
override def getRackForHost(hostPort: String): Option[String] = {
val host = Utils.parseHostPort(hostPort)._1
- Option(YarnSparkHadoopUtil.lookupRack(sc.hadoopConfiguration, host))
+ Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
}
override def postStartHook() {
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 8d184a09d6..024b25f9d3 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -17,18 +17,160 @@
package org.apache.spark.deploy.yarn
+import java.util.{Arrays, List => JList}
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic
+import org.apache.hadoop.net.DNSToSwitchMapping
+import org.apache.hadoop.yarn.api.records._
+import org.apache.hadoop.yarn.client.api.AMRMClient
+import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+
+import org.apache.spark.SecurityManager
+import org.apache.spark.SparkConf
+import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.YarnAllocator._
-import org.scalatest.FunSuite
+import org.apache.spark.scheduler.SplitInfo
+
+import org.scalatest.{BeforeAndAfterEach, FunSuite, Matchers}
+
+class MockResolver extends DNSToSwitchMapping {
+
+ override def resolve(names: JList[String]): JList[String] = {
+ if (names.size > 0 && names.get(0) == "host3") Arrays.asList("/rack2")
+ else Arrays.asList("/rack1")
+ }
+
+ override def reloadCachedMappings() {}
+
+ def reloadCachedMappings(names: JList[String]) {}
+}
+
+class YarnAllocatorSuite extends FunSuite with Matchers with BeforeAndAfterEach {
+ val conf = new Configuration()
+ conf.setClass(
+ CommonConfigurationKeysPublic.NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
+ classOf[MockResolver], classOf[DNSToSwitchMapping])
+
+ val sparkConf = new SparkConf()
+ sparkConf.set("spark.driver.host", "localhost")
+ sparkConf.set("spark.driver.port", "4040")
+ sparkConf.set("spark.yarn.jar", "notarealjar.jar")
+ sparkConf.set("spark.yarn.launchContainers", "false")
+
+ val appAttemptId = ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 0), 0)
+
+ // Resource returned by YARN. YARN can give larger containers than requested, so give 6 cores
+ // instead of the 5 requested and 3 GB instead of the 2 requested.
+ val containerResource = Resource.newInstance(3072, 6)
+
+ var rmClient: AMRMClient[ContainerRequest] = _
+
+ var containerNum = 0
+
+ override def beforeEach() {
+ rmClient = AMRMClient.createAMRMClient()
+ rmClient.init(conf)
+ rmClient.start()
+ }
+
+ override def afterEach() {
+ rmClient.stop()
+ }
+
+ class MockSplitInfo(host: String) extends SplitInfo(null, host, null, 1, null) {
+ override def equals(other: Any) = false
+ }
+
+ def createAllocator(maxExecutors: Int = 5): YarnAllocator = {
+ val args = Array(
+ "--num-executors", s"$maxExecutors",
+ "--executor-cores", "5",
+ "--executor-memory", "2048",
+ "--jar", "somejar.jar",
+ "--class", "SomeClass")
+ new YarnAllocator(
+ conf,
+ sparkConf,
+ rmClient,
+ appAttemptId,
+ new ApplicationMasterArguments(args),
+ new SecurityManager(sparkConf))
+ }
+
+ def createContainer(host: String): Container = {
+ val containerId = ContainerId.newInstance(appAttemptId, containerNum)
+ containerNum += 1
+ val nodeId = NodeId.newInstance(host, 1000)
+ Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null)
+ }
+
+ test("single container allocated") {
+ // request a single container and receive it
+ val handler = createAllocator()
+ handler.addResourceRequests(1)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (1)
+
+ val container = createContainer("host1")
+ handler.handleAllocatedContainers(Array(container))
+
+ handler.getNumExecutorsRunning should be (1)
+ handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1")
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container.getId)
+ rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size should be (0)
+ }
+
+ test("some containers allocated") {
+ // request a few containers and receive some of them
+ val handler = createAllocator()
+ handler.addResourceRequests(4)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (4)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host1")
+ val container3 = createContainer("host2")
+ handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+ handler.getNumExecutorsRunning should be (3)
+ handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host2")
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container2.getId)
+ handler.allocatedHostToContainersMap.get("host2").get should contain (container3.getId)
+ }
+
+ test("receive more containers than requested") {
+ val handler = createAllocator(2)
+ handler.addResourceRequests(2)
+ handler.getNumExecutorsRunning should be (0)
+ handler.getNumPendingAllocate should be (2)
+
+ val container1 = createContainer("host1")
+ val container2 = createContainer("host2")
+ val container3 = createContainer("host4")
+ handler.handleAllocatedContainers(Array(container1, container2, container3))
+
+ handler.getNumExecutorsRunning should be (2)
+ handler.allocatedContainerToHostMap.get(container1.getId).get should be ("host1")
+ handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2")
+ handler.allocatedContainerToHostMap.contains(container3.getId) should be (false)
+ handler.allocatedHostToContainersMap.get("host1").get should contain (container1.getId)
+ handler.allocatedHostToContainersMap.get("host2").get should contain (container2.getId)
+ handler.allocatedHostToContainersMap.contains("host4") should be (false)
+ }
-class YarnAllocatorSuite extends FunSuite {
test("memory exceeded diagnostic regexes") {
val diagnostics =
"Container [pid=12465,containerID=container_1412887393566_0003_01_000002] is running " +
- "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
- "5.8 GB of 4.2 GB virtual memory used. Killing container."
+ "beyond physical memory limits. Current usage: 2.1 MB of 2 GB physical memory used; " +
+ "5.8 GB of 4.2 GB virtual memory used. Killing container."
val vmemMsg = memLimitExceededLogMessage(diagnostics, VMEM_EXCEEDED_PATTERN)
val pmemMsg = memLimitExceededLogMessage(diagnostics, PMEM_EXCEEDED_PATTERN)
assert(vmemMsg.contains("5.8 GB of 4.2 GB virtual memory used."))
assert(pmemMsg.contains("2.1 MB of 2 GB physical memory used."))
}
+
}