From a03ac222d84025a1036750e1179136a13f75dea7 Mon Sep 17 00:00:00 2001 From: Sandeep Date: Thu, 24 Apr 2014 15:07:23 -0700 Subject: Fix Scala Style Any comments are welcome Author: Sandeep Closes #531 from techaddict/stylefix-1 and squashes the following commits: 7492730 [Sandeep] Pass 4 98b2428 [Sandeep] fix rxin suggestions b5e2e6f [Sandeep] Pass 3 05932d7 [Sandeep] fix if else styling 2 08690e5 [Sandeep] fix if else styling --- .../main/scala/org/apache/spark/Accumulators.scala | 7 ++- .../apache/spark/deploy/SparkSubmitArguments.scala | 3 +- .../org/apache/spark/deploy/master/Master.scala | 3 +- .../apache/spark/deploy/worker/DriverRunner.scala | 8 ++- .../apache/spark/deploy/worker/ui/LogPage.scala | 16 +++--- .../org/apache/spark/storage/BlockManager.scala | 8 ++- .../apache/spark/util/BoundedPriorityQueue.scala | 12 ++-- .../scala/org/apache/spark/util/FileLogger.scala | 4 +- .../main/scala/org/apache/spark/util/Utils.scala | 3 +- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 4 +- .../scala/org/apache/spark/examples/LogQuery.scala | 3 +- .../examples/clickstream/PageViewStream.scala | 7 ++- .../org/apache/spark/graphx/GraphOpsSuite.scala | 7 ++- .../org/apache/spark/repl/SparkExprTyper.scala | 13 +++-- .../spark/sql/parquet/ParquetTableOperations.scala | 5 +- .../spark/sql/parquet/ParquetTableSupport.scala | 7 ++- .../streaming/scheduler/ReceiverTracker.scala | 3 +- .../spark/deploy/yarn/ApplicationMaster.scala | 9 +-- .../spark/deploy/yarn/ExecutorLauncher.scala | 3 +- .../spark/deploy/yarn/YarnAllocationHandler.scala | 67 +++++++++++----------- 20 files changed, 109 insertions(+), 83 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index d5f3e3f6ec..6d652faae1 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -104,8 +104,11 @@ class Accumulable[R, T] ( * Set the accumulator's value; only allowed on master. */ def value_= (newValue: R) { - if (!deserialized) value_ = newValue - else throw new UnsupportedOperationException("Can't assign accumulator value in task") + if (!deserialized) { + value_ = newValue + } else { + throw new UnsupportedOperationException("Can't assign accumulator value in task") + } } /** diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index cc976565cc..c3e8c6b8c6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) { if (k.startsWith("spark")) { defaultProperties(k) = v if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v") - } - else { + } else { SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v") } } diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 81f990bfa6..fdb633bd33 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -237,8 +237,7 @@ private[spark] class Master( if (waitingDrivers.contains(d)) { waitingDrivers -= d self ! DriverStateChanged(driverId, DriverState.KILLED, None) - } - else { + } else { // We just notify the worker to kill the driver here. The final bookkeeping occurs // on the return path when the worker submits a state change back to the master // to notify it that the driver was successfully killed. diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index f918b42c83..662d37871e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -91,9 +91,11 @@ private[spark] class DriverRunner( } val state = - if (killed) { DriverState.KILLED } - else if (finalException.isDefined) { DriverState.ERROR } - else { + if (killed) { + DriverState.KILLED + } else if (finalException.isDefined) { + DriverState.ERROR + } else { finalExitCode match { case Some(0) => DriverState.FINISHED case _ => DriverState.FAILED diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala index fec1207948..8381f59672 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala @@ -89,8 +89,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { Previous {Utils.bytesToString(math.min(byteLength, startByte))} - } - else { + } else { @@ -104,8 +103,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))} - } - else { + } else { @@ -137,9 +135,13 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") { val logLength = file.length() val getOffset = offset.getOrElse(logLength - defaultBytes) val startByte = - if (getOffset < 0) 0L - else if (getOffset > logLength) logLength - else getOffset + if (getOffset < 0) { + 0L + } else if (getOffset > logLength) { + logLength + } else { + getOffset + } val logPageLength = math.min(byteLength, maxBytes) val endByte = math.min(startByte + logPageLength, logLength) (startByte, endByte) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index ccd5c5320a..02ba5ecf52 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -281,7 +281,9 @@ private[spark] class BlockManager( val onDiskSize = status.diskSize master.updateBlockInfo( blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize) - } else true + } else { + true + } } /** @@ -676,7 +678,7 @@ private[spark] class BlockManager( tachyonStore.putValues(blockId, iterator, level, false) case ArrayBufferValues(array) => tachyonStore.putValues(blockId, array, level, false) - case ByteBufferValues(bytes) => + case ByteBufferValues(bytes) => bytes.rewind() tachyonStore.putBytes(blockId, bytes, level) } @@ -695,7 +697,7 @@ private[spark] class BlockManager( diskStore.putValues(blockId, iterator, level, askForBytes) case ArrayBufferValues(array) => diskStore.putValues(blockId, array, level, askForBytes) - case ByteBufferValues(bytes) => + case ByteBufferValues(bytes) => bytes.rewind() diskStore.putBytes(blockId, bytes, level) } diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala index b9f4a5d720..1b2b1932e0 100644 --- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala +++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala @@ -43,8 +43,11 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin } override def +=(elem: A): this.type = { - if (size < maxSize) underlying.offer(elem) - else maybeReplaceLowest(elem) + if (size < maxSize) { + underlying.offer(elem) + } else { + maybeReplaceLowest(elem) + } this } @@ -59,7 +62,8 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin if (head != null && ord.gt(a, head)) { underlying.poll() underlying.offer(a) - } else false + } else { + false + } } } - diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 7d47b2a72a..1ed3b70bb2 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -113,7 +113,9 @@ private[spark] class FileLogger( * @param withTime Whether to prepend message with a timestamp */ def log(msg: String, withTime: Boolean = false) { - val writeInfo = if (!withTime) msg else { + val writeInfo = if (!withTime) { + msg + } else { val date = new Date(System.currentTimeMillis()) dateFormat.get.format(date) + ": " + msg } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index a3af4e7b91..d333e2a88c 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -811,8 +811,7 @@ private[spark] object Utils extends Logging { } else { el.getMethodName } - } - else { + } else { firstUserLine = el.getLineNumber firstUserFile = el.getFileName firstUserClass = el.getClassName diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index d7c90346d8..2676558bfc 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -381,8 +381,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { val prng42 = new Random(42) val prng43 = new Random(43) Array(1, 2, 3, 4, 5, 6).filter{i => - if (i < 4) 0 == prng42.nextInt(3) - else 0 == prng43.nextInt(3)} + if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3) + } } assert(sample.size === checkSample.size) for (i <- 0 until sample.size) assert(sample(i) === checkSample(i)) diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala index 820e87d04f..f77a444ff7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala @@ -49,8 +49,7 @@ object LogQuery { System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq) val dataSet = - if (args.length == 2) sc.textFile(args(1)) - else sc.parallelize(exampleApacheLogs) + if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs) // scalastyle:off val apacheLogRegex = """^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index edc769c597..673013f7cf 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -69,8 +69,11 @@ object PageViewStream { val normalCount = statuses.filter(_ == 200).size val errorCount = statuses.size - normalCount val errorRatio = errorCount.toFloat / statuses.size - if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)} - else {"%s: %s".format(zip, errorRatio)} + if (errorRatio > 0.05) { + "%s: **%s**".format(zip, errorRatio) + } else { + "%s: %s".format(zip, errorRatio) + } } // Return the number unique users in last 15 seconds diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala index a467ca1ae7..ea94d4accb 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala @@ -165,8 +165,11 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext { // not have any edges in the specified direction. assert(edges.count === 50) edges.collect.foreach { - case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size == 2) - else assert(edges.size == 1) + case (vid, edges) => if (vid > 0 && vid < 49) { + assert(edges.size == 2) + } else { + assert(edges.size == 1) + } } edges.collect.foreach { case (vid, edges) => diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala index dcc139544e..f8432c8af6 100644 --- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala +++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala @@ -47,9 +47,13 @@ trait SparkExprTyper extends Logging { var isIncomplete = false reporter.withIncompleteHandler((_, _) => isIncomplete = true) { val trees = codeParser.stmts(line) - if (reporter.hasErrors) Some(Nil) - else if (isIncomplete) None - else Some(trees) + if (reporter.hasErrors) { + Some(Nil) + } else if (isIncomplete) { + None + } else { + Some(trees) + } } } // def parsesAsExpr(line: String) = { @@ -70,8 +74,7 @@ trait SparkExprTyper extends Logging { val sym0 = symbolOfTerm(name) // drop NullaryMethodType val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType) - if (sym.info.typeSymbol eq UnitClass) NoSymbol - else sym + if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym case _ => NoSymbol } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala index d5846baa72..f825ca3c02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala @@ -203,8 +203,9 @@ case class InsertIntoParquetTable( val stageId = sc.newRddId() val taskIdOffset = - if (overwrite) 1 - else { + if (overwrite) { + 1 + } else { FileSystemHelper .findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1 } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala index 84b1b46094..71ba0fecce 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala @@ -158,8 +158,11 @@ private[parquet] class CatalystGroupConverter( a => a.dataType match { case ctype: NativeType => // note: for some reason matching for StringType fails so use this ugly if instead - if (ctype == StringType) new CatalystPrimitiveStringConverter(this, schema.indexOf(a)) - else new CatalystPrimitiveConverter(this, schema.indexOf(a)) + if (ctype == StringType) { + new CatalystPrimitiveStringConverter(this, schema.indexOf(a)) + } else { + new CatalystPrimitiveConverter(this, schema.indexOf(a)) + } case _ => throw new RuntimeException( s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter") } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 3d2537f6f2..557e0961d5 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -240,8 +240,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging { if (hasLocationPreferences) { val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get))) ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences) - } - else { + } else { ssc.sc.makeRDD(receivers, receivers.size) } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 2f74965900..fc13dbecb4 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, // LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X val localDirs = Option(System.getenv("YARN_LOCAL_DIRS")) .orElse(Option(System.getenv("LOCAL_DIRS"))) - + localDirs match { case None => throw new Exception("Yarn Local dirs can't be empty") case Some(l) => l } - } + } private def getApplicationAttemptId(): ApplicationAttemptId = { val envs = System.getenv() @@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, logInfo("Allocating %d containers to make up for (potentially) lost containers". format(missingExecutorCount)) yarnAllocator.allocateContainers(missingExecutorCount) + } else { + sendProgress() } - else sendProgress() Thread.sleep(sleepTime) } } @@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, return } isFinished = true - + logInfo("finishApplicationMaster with " + status) if (registered) { val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest]) diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala index ea356f33eb..65b7215afb 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala @@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers") yarnAllocator.allocateContainers(missingExecutorCount) + } else { + sendProgress() } - else sendProgress() Thread.sleep(sleepTime) } } diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 95f0f9d0ff..856391e52b 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -60,12 +60,12 @@ object AllocationType extends Enumeration { */ private[yarn] class YarnAllocationHandler( val conf: Configuration, - val resourceManager: AMRMProtocol, + val resourceManager: AMRMProtocol, val appAttemptId: ApplicationAttemptId, val maxExecutors: Int, val executorMemory: Int, val executorCores: Int, - val preferredHostToCount: Map[String, Int], + val preferredHostToCount: Map[String, Int], val preferredRackToCount: Map[String, Int], val sparkConf: SparkConf) extends Logging { @@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler( val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]()) containers += container + } else { + // Add all ignored containers to released list + releasedContainerList.add(container.getId()) } - // Add all ignored containers to released list - else releasedContainerList.add(container.getId()) } // Find the appropriate containers to use. Slightly non trivial groupBy ... @@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler( dataLocalContainers.put(candidateHost, remainingContainers) // all consumed remainingContainers = null - } - else if (requiredHostCount > 0) { + } else if (requiredHostCount > 0) { // Container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - // requiredHostCount) and rest as remainingContainer @@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler( // remainingContainers = remaining // yarn has nasty habit of allocating a tonne of containers on a host - discourage this : - // add remaining to release list. If we have insufficient containers, next allocation + // add remaining to release list. If we have insufficient containers, next allocation // cycle will reallocate (but wont treat it as data local) for (container <- remaining) releasedContainerList.add(container.getId()) remainingContainers = null @@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler( if (rack != null){ val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0) - val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - + val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) - rackLocalContainers.get(rack).getOrElse(List()).size @@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler( dataLocalContainers.put(rack, remainingContainers) // All consumed remainingContainers = null - } - else if (requiredRackCount > 0) { + } else if (requiredRackCount > 0) { // container list has more containers than we need for data locality. // Split into two : data local container count of (remainingContainers.size - // requiredRackCount) and rest as remainingContainer @@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler( } } - // Now that we have split the containers into various groups, go through them in order : + // Now that we have split the containers into various groups, go through them in order : // first host local, then rack local and then off rack (everything else). // Note that the list we create below tries to ensure that not all containers end up within a // host if there are sufficiently large number of hosts/containers. @@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler( releasedContainerList.add(containerId) // reset counter back to old value. numExecutorsRunning.decrementAndGet() - } - else { + } else { // Deallocate + allocate can result in reusing id's wrongly - so use a different counter // (executorIdCounter) val executorId = executorIdCounter.incrementAndGet().toString @@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler( // Was this released by us ? If yes, then simply remove from containerSet and move on. if (pendingReleaseContainers.containsKey(containerId)) { pendingReleaseContainers.remove(containerId) - } - else { + } else { // Simply decrement count - next iteration of ReporterThread will take care of allocating. numExecutorsRunning.decrementAndGet() logInfo("Completed container %s (state: %s, exit status: %s)".format( @@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler( assert (containerSet != null) containerSet -= containerId - if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host) - else allocatedHostToContainersMap.update(host, containerSet) + if (containerSet.isEmpty) { + allocatedHostToContainersMap.remove(host) + } else { + allocatedHostToContainersMap.update(host, containerSet) + } allocatedContainerToHostMap -= containerId @@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler( val rack = YarnAllocationHandler.lookupRack(conf, host) if (rack != null) { val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1 - if (rackCount > 0) allocatedRackCount.put(rack, rackCount) - else allocatedRackCount.remove(rack) + if (rackCount > 0) { + allocatedRackCount.put(rack, rackCount) + } else { + allocatedRackCount.remove(rack) + } } } } @@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler( } } - val requestedContainers: ArrayBuffer[ResourceRequest] = + val requestedContainers: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](rackToCounts.size) for ((rack, count) <- rackToCounts){ - requestedContainers += + requestedContainers += createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY) } @@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler( preferredHostToCount.isEmpty) resourceRequests = List(createResourceRequest( AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY)) - } - else { - // request for all hosts in preferred nodes and for numExecutors - + } else { + // request for all hosts in preferred nodes and for numExecutors - // candidates.size, request by default allocation policy. - val hostContainerRequests: ArrayBuffer[ResourceRequest] = + val hostContainerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](preferredHostToCount.size) for ((candidateHost, candidateCount) <- preferredHostToCount) { val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost) @@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler( if (numExecutors > 0) { logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors, executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD)) - } - else { + } else { logDebug("Empty allocation req .. release : " + releasedContainerList) } @@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler( private def createResourceRequest( - requestType: AllocationType.AllocationType, + requestType: AllocationType.AllocationType, resource:String, numExecutors: Int, priority: Int): ResourceRequest = { @@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler( if (! retval.isEmpty) { releasedContainerList.removeAll(retval) for (v <- retval) pendingReleaseContainers.put(v, true) - logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + + logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " + pendingReleaseContainers) } @@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler( object YarnAllocationHandler { val ANY_HOST = "*" - // All requests are issued with same priority : we do not (yet) have any distinction between + // All requests are issued with same priority : we do not (yet) have any distinction between // request types (like map/reduce in hadoop for example) val PRIORITY = 1 @@ -548,7 +549,7 @@ object YarnAllocationHandler { // Host to rack map - saved from allocation requests // We are expecting this not to change. - // Note that it is possible for this to change : and RM will indicate that to us via update + // Note that it is possible for this to change : and RM will indicate that to us via update // response to allocate. But we are punting on handling that for now. private val hostToRack = new ConcurrentHashMap[String, String]() private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() @@ -565,7 +566,7 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numExecutors, + args.numExecutors, args.executorMemory, args.executorCores, Map[String, Int](), @@ -587,7 +588,7 @@ object YarnAllocationHandler { conf, resourceManager, appAttemptId, - args.numExecutors, + args.numExecutors, args.executorMemory, args.executorCores, hostToCount, -- cgit v1.2.3