aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
Diffstat (limited to 'yarn')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 738ff986d8..f53c13013f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler(
def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ && container.getResource.getVirtualCores >= workerCores)
}
def releaseContainer(container: Container) {
@@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryResource = Records.newRecord(classOf[Resource])
- memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, workerCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
for (i <- 0 until numWorkers) {
- requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting)
+ requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
}