aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala10
1 files changed, 8 insertions, 2 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 12c62a659d..55bfbcd9cb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -290,8 +290,14 @@ private[yarn] class YarnAllocator(
location: String,
containersToUse: ArrayBuffer[Container],
remaining: ArrayBuffer[Container]): Unit = {
+ // SPARK-6050: certain Yarn configurations return a virtual core count that doesn't match the
+ // request; for example, capacity scheduler + DefaultResourceCalculator. So match on requested
+ // memory, but use the asked vcore count for matching, effectively disabling matching on vcore
+ // count.
+ val matchingResource = Resource.newInstance(allocatedContainer.getResource.getMemory,
+ resource.getVirtualCores)
val matchingRequests = amClient.getMatchingRequests(allocatedContainer.getPriority, location,
- allocatedContainer.getResource)
+ matchingResource)
// Match the allocation to a request
if (!matchingRequests.isEmpty) {
@@ -318,7 +324,7 @@ private[yarn] class YarnAllocator(
assert(container.getResource.getMemory >= resource.getMemory)
logInfo("Launching container %s for on host %s".format(containerId, executorHostname))
- executorIdToContainer(executorId) = container
+ executorIdToContainer(executorId) = container
val containerSet = allocatedHostToContainersMap.getOrElseUpdate(executorHostname,
new HashSet[ContainerId])