aboutsummaryrefslogtreecommitdiff
path: root/yarn/alpha
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-04-24 15:07:23 -0700
committerReynold Xin <rxin@apache.org>2014-04-24 15:07:23 -0700
commita03ac222d84025a1036750e1179136a13f75dea7 (patch)
treecc7f041b35b7804b7d62520f279cc6e53e40d73c /yarn/alpha
parentc5c1916dd1b77e22759d58b5b361c56672983e3e (diff)
downloadspark-a03ac222d84025a1036750e1179136a13f75dea7.tar.gz
spark-a03ac222d84025a1036750e1179136a13f75dea7.tar.bz2
spark-a03ac222d84025a1036750e1179136a13f75dea7.zip
Fix Scala Style
Any comments are welcome Author: Sandeep <sandeep@techaddict.me> Closes #531 from techaddict/stylefix-1 and squashes the following commits: 7492730 [Sandeep] Pass 4 98b2428 [Sandeep] fix rxin suggestions b5e2e6f [Sandeep] Pass 3 05932d7 [Sandeep] fix if else styling 2 08690e5 [Sandeep] fix if else styling
Diffstat (limited to 'yarn/alpha')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala9
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala3
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala67
3 files changed, 41 insertions, 38 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2f74965900..fc13dbecb4 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))
-
+
localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
- }
+ }
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
@@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.allocateContainers(missingExecutorCount)
+ } else {
+ sendProgress()
}
- else sendProgress()
Thread.sleep(sleepTime)
}
}
@@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
-
+
logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index ea356f33eb..65b7215afb 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
+ } else {
+ sendProgress()
}
- else sendProgress()
Thread.sleep(sleepTime)
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 95f0f9d0ff..856391e52b 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,12 +60,12 @@ object AllocationType extends Enumeration {
*/
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
- val resourceManager: AMRMProtocol,
+ val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
val maxExecutors: Int,
val executorMemory: Int,
val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
+ val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
extends Logging {
@@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler(
val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
containers += container
+ } else {
+ // Add all ignored containers to released list
+ releasedContainerList.add(container.getId())
}
- // Add all ignored containers to released list
- else releasedContainerList.add(container.getId())
}
// Find the appropriate containers to use. Slightly non trivial groupBy ...
@@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(candidateHost, remainingContainers)
// all consumed
remainingContainers = null
- }
- else if (requiredHostCount > 0) {
+ } else if (requiredHostCount > 0) {
// Container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredHostCount) and rest as remainingContainer
@@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler(
// remainingContainers = remaining
// yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation
+ // add remaining to release list. If we have insufficient containers, next allocation
// cycle will reallocate (but wont treat it as data local)
for (container <- remaining) releasedContainerList.add(container.getId())
remainingContainers = null
@@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler(
if (rack != null){
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+ val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
rackLocalContainers.get(rack).getOrElse(List()).size
@@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(rack, remainingContainers)
// All consumed
remainingContainers = null
- }
- else if (requiredRackCount > 0) {
+ } else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredRackCount) and rest as remainingContainer
@@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler(
}
}
- // Now that we have split the containers into various groups, go through them in order :
+ // Now that we have split the containers into various groups, go through them in order :
// first host local, then rack local and then off rack (everything else).
// Note that the list we create below tries to ensure that not all containers end up within a
// host if there are sufficiently large number of hosts/containers.
@@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler(
releasedContainerList.add(containerId)
// reset counter back to old value.
numExecutorsRunning.decrementAndGet()
- }
- else {
+ } else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
@@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler(
// Was this released by us ? If yes, then simply remove from containerSet and move on.
if (pendingReleaseContainers.containsKey(containerId)) {
pendingReleaseContainers.remove(containerId)
- }
- else {
+ } else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
@@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler(
assert (containerSet != null)
containerSet -= containerId
- if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
- else allocatedHostToContainersMap.update(host, containerSet)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
allocatedContainerToHostMap -= containerId
@@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler(
val rack = YarnAllocationHandler.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
- else allocatedRackCount.remove(rack)
+ if (rackCount > 0) {
+ allocatedRackCount.put(rack, rackCount)
+ } else {
+ allocatedRackCount.remove(rack)
+ }
}
}
}
@@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler(
}
}
- val requestedContainers: ArrayBuffer[ResourceRequest] =
+ val requestedContainers: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](rackToCounts.size)
for ((rack, count) <- rackToCounts){
- requestedContainers +=
+ requestedContainers +=
createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
}
@@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler(
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
- }
- else {
- // request for all hosts in preferred nodes and for numExecutors -
+ } else {
+ // request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
- val hostContainerRequests: ArrayBuffer[ResourceRequest] =
+ val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
@@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler(
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- }
- else {
+ } else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
@@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
- requestType: AllocationType.AllocationType,
+ requestType: AllocationType.AllocationType,
resource:String,
numExecutors: Int,
priority: Int): ResourceRequest = {
@@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler(
if (! retval.isEmpty) {
releasedContainerList.removeAll(retval)
for (v <- retval) pendingReleaseContainers.put(v, true)
- logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
+ logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
pendingReleaseContainers)
}
@@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler(
object YarnAllocationHandler {
val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
+ // All requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val PRIORITY = 1
@@ -548,7 +549,7 @@ object YarnAllocationHandler {
// Host to rack map - saved from allocation requests
// We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
+ // Note that it is possible for this to change : and RM will indicate that to us via update
// response to allocate. But we are punting on handling that for now.
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
@@ -565,7 +566,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numExecutors,
+ args.numExecutors,
args.executorMemory,
args.executorCores,
Map[String, Int](),
@@ -587,7 +588,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numExecutors,
+ args.numExecutors,
args.executorMemory,
args.executorCores,
hostToCount,