diff options
author | Sandy Ryza <sandy@cloudera.com> | 2014-01-19 10:16:25 -0800 |
---|---|---|
committer | Sandy Ryza <sandy@cloudera.com> | 2014-01-20 14:42:32 -0800 |
commit | 3e85b87d9033e6d9a2634f7598abc3acee77486f (patch) | |
tree | 0f25e57a3b9b70e9fa83c44920cf00dc3f820717 /yarn/stable/src | |
parent | 792d9084e2bc9f778a00a56fa7dcfe4084153aea (diff) | |
download | spark-3e85b87d9033e6d9a2634f7598abc3acee77486f.tar.gz spark-3e85b87d9033e6d9a2634f7598abc3acee77486f.tar.bz2 spark-3e85b87d9033e6d9a2634f7598abc3acee77486f.zip |
SPARK-1033. Ask for cores in Yarn container requests
Diffstat (limited to 'yarn/stable/src')
-rw-r--r-- | yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 9 |
1 files changed, 5 insertions, 4 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 738ff986d8..f53c13013f 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler( def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { - container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + && container.getResource.getVirtualCores >= workerCores) } def releaseContainer(container: Container) { @@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler( priority: Int ): ArrayBuffer[ContainerRequest] = { - val memoryResource = Records.newRecord(classOf[Resource]) - memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD + val resource = Resource.newInstance(memoryRequest, workerCores) val prioritySetting = Records.newRecord(classOf[Priority]) prioritySetting.setPriority(priority) val requests = new ArrayBuffer[ContainerRequest]() for (i <- 0 until numWorkers) { - requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting) + requests += new ContainerRequest(resource, hosts, racks, prioritySetting) } requests } |