From 2250c7acbd857efeda99d9e850cb0faff34e19f0 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 24 Apr 2014 15:07:23 -0700 Subject: Fix Scala Style Any comments are welcome Author: Sandeep Closes #531 from techaddict/stylefix-1 and squashes the following commits: 7492730 [Sandeep] Pass 4 98b2428 [Sandeep] fix rxin suggestions b5e2e6f [Sandeep] Pass 3 05932d7 [Sandeep] fix if else styling 2 08690e5 [Sandeep] fix if else styling (cherry picked from commit a03ac222d84025a1036750e1179136a13f75dea7) Signed-off-by: Reynold Xin --- .../spark/deploy/yarn/ApplicationMaster.scala | 9 +-- .../spark/deploy/yarn/ExecutorLauncher.scala | 3 +- .../spark/deploy/yarn/YarnAllocationHandler.scala | 67 +++++++++++----------- 3 files changed, 41 insertions(+), 38 deletions(-) (limited to 'yarn') diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2f74965900..fc13dbecb4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .orElse(Option(System.getenv("LOCAL_DIRS"))) - + localDirs match { case None => throw new Exception("Yarn Local dirs can't be empty") case Some(l) => l } - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() @@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Allocating %d containers to make up for (potentially) lost containers". format(missingExecutorCount)) yarnAllocator.allocateContainers(missingExecutorCount) + } else { + sendProgress() } - else sendProgress() Thread.sleep(sleepTime) } } @@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, return } isFinished = true - + logInfo("finishApplicationMaster with " + status) if (registered) { val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index ea356f33eb..65b7215afb 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") yarnAllocator.allocateContainers(missingExecutorCount) + } else { + sendProgress() } - else sendProgress() Thread.sleep(sleepTime) } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 95f0f9d0ff..856391e52b 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -60,12 +60,12 @@ object AllocationType extends Enumeration { */ private[yarn] class YarnAllocationHandler( val conf: Configuration, - val resourceManager: AMRMProtocol, + val resourceManager: AMRMProtocol, val appAttemptId: ApplicationAttemptId, val maxExecutors: Int, val executorMemory: Int, val executorCores: Int, - val preferredHostToCount: Map[String, Int], + val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) extends Logging { @@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler( val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]()) containers += container + } else { + // Add all ignored containers to released list + releasedContainerList.add(container.getId()) } - // Add all ignored containers to released list - else releasedContainerList.add(container.getId()) } // Find the appropriate containers to use. Slightly non trivial groupBy ... @@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler( dataLocalContainers.put(candidateHost, remainingContainers) // all consumed remainingContainers = null - } - else if (requiredHostCount > 0) { + } else if (requiredHostCount > 0) { // Container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - // requiredHostCount) and rest as remainingContainer @@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler( // remainingContainers = remaining // yarn has nasty habit of allocating a tonne of containers on a host - discourage this : - // add remaining to release list. If we have insufficient containers, next allocation + // add remaining to release list. If we have insufficient containers, next allocation // cycle will reallocate (but wont treat it as data local) for (container <- remaining) releasedContainerList.add(container.getId()) remainingContainers = null @@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler( if (rack != null){ val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) - val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - + val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - rackLocalContainers.get(rack).getOrElse(List()).size @@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler( dataLocalContainers.put(rack, remainingContainers) // All consumed remainingContainers = null - } - else if (requiredRackCount > 0) { + } else if (requiredRackCount > 0) { // container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - // requiredRackCount) and rest as remainingContainer @@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler( } } - // Now that we have split the containers into various groups, go through them in order : + // Now that we have split the containers into various groups, go through them in order : // first host local, then rack local and then off rack (everything else). // Note that the list we create below tries to ensure that not all containers end up within a // host if there are sufficiently large number of hosts/containers. @@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler( releasedContainerList.add(containerId) // reset counter back to old value. numExecutorsRunning.decrementAndGet() - } - else { + } else { // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (executorIdCounter) val executorId = executorIdCounter.incrementAndGet().toString @@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler( // Was this released by us ? If yes, then simply remove from containerSet and move on. if (pendingReleaseContainers.containsKey(containerId)) { pendingReleaseContainers.remove(containerId) - } - else { + } else { // Simply decrement count - next iteration of ReporterThread will take care of allocating. numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( @@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler( assert (containerSet != null) containerSet -= containerId - if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host) - else allocatedHostToContainersMap.update(host, containerSet) + if (containerSet.isEmpty) { + allocatedHostToContainersMap.remove(host) + } else { + allocatedHostToContainersMap.update(host, containerSet) + } allocatedContainerToHostMap -= containerId @@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler( val rack = YarnAllocationHandler.lookupRack(conf, host) if (rack != null) { val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 - if (rackCount > 0) allocatedRackCount.put(rack, rackCount) - else allocatedRackCount.remove(rack) + if (rackCount > 0) { + allocatedRackCount.put(rack, rackCount) + } else { + allocatedRackCount.remove(rack) + } } } } @@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler( } } - val requestedContainers: ArrayBuffer[ResourceRequest] = + val requestedContainers: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](rackToCounts.size) for ((rack, count) <- rackToCounts){ - requestedContainers += + requestedContainers += createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY) } @@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler( preferredHostToCount.isEmpty) resourceRequests = List(createResourceRequest( AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) - } - else { - // request for all hosts in preferred nodes and for numExecutors - + } else { + // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. - val hostContainerRequests: ArrayBuffer[ResourceRequest] = + val hostContainerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) @@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) - } - else { + } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequest( - requestType: AllocationType.AllocationType, + requestType: AllocationType.AllocationType, resource:String, numExecutors: Int, priority: Int): ResourceRequest = { @@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler( if (! retval.isEmpty) { releasedContainerList.removeAll(retval) for (v <- retval) pendingReleaseContainers.put(v, true) - logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + + logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + pendingReleaseContainers) } @@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler( object YarnAllocationHandler { val ANY_HOST = "*" - // All requests are issued with same priority : we do not (yet) have any distinction between + // All requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) val PRIORITY = 1 @@ -548,7 +549,7 @@ object YarnAllocationHandler { // Host to rack map - saved from allocation requests // We are expecting this not to change. - // Note that it is possible for this to change : and RM will indicate that to us via update + // Note that it is possible for this to change : and RM will indicate that to us via update // response to allocate. But we are punting on handling that for now. private val hostToRack = new ConcurrentHashMap[String, String]() private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() @@ -565,7 +566,7 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numExecutors, + args.numExecutors, args.executorMemory, args.executorCores, Map[String, Int](), @@ -587,7 +588,7 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numExecutors, + args.numExecutors, args.executorMemory, args.executorCores, hostToCount, -- cgit v1.2.3