aboutsummaryrefslogtreecommitdiff
path: root/yarn/stable
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-01-19 10:16:25 -0800
committerSandy Ryza <sandy@cloudera.com>2014-01-20 14:42:32 -0800
commit3e85b87d9033e6d9a2634f7598abc3acee77486f (patch)
tree0f25e57a3b9b70e9fa83c44920cf00dc3f820717 /yarn/stable
parent792d9084e2bc9f778a00a56fa7dcfe4084153aea (diff)
downloadspark-3e85b87d9033e6d9a2634f7598abc3acee77486f.tar.gz
spark-3e85b87d9033e6d9a2634f7598abc3acee77486f.tar.bz2
spark-3e85b87d9033e6d9a2634f7598abc3acee77486f.zip
SPARK-1033. Ask for cores in Yarn container requests
Diffstat (limited to 'yarn/stable')
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 738ff986d8..f53c13013f 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -102,7 +102,8 @@ private[yarn] class YarnAllocationHandler(
def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
- container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ (container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ && container.getResource.getVirtualCores >= workerCores)
}
def releaseContainer(container: Container) {
@@ -532,15 +533,15 @@ private[yarn] class YarnAllocationHandler(
priority: Int
): ArrayBuffer[ContainerRequest] = {
- val memoryResource = Records.newRecord(classOf[Resource])
- memoryResource.setMemory(workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ val memoryRequest = workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD
+ val resource = Resource.newInstance(memoryRequest, workerCores)
val prioritySetting = Records.newRecord(classOf[Priority])
prioritySetting.setPriority(priority)
val requests = new ArrayBuffer[ContainerRequest]()
for (i <- 0 until numWorkers) {
- requests += new ContainerRequest(memoryResource, hosts, racks, prioritySetting)
+ requests += new ContainerRequest(resource, hosts, racks, prioritySetting)
}
requests
}