diff options
Diffstat (limited to 'yarn/stable')
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 738ff986d8..f53c13013f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler( def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + && container.getResource.getVirtualCores >= workerCores) } def releaseContainer(container: Container) { @@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryResource = Records.newRecord(classOf[Resource]) - memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, workerCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() for (i <- 0 until numWorkers) { - requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting) + requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests } |