aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandeep <sandeep@techaddict.me>2014-04-24 15:07:23 -0700
committerReynold Xin <rxin@apache.org>2014-04-24 15:07:23 -0700
commita03ac222d84025a1036750e1179136a13f75dea7 (patch)
treecc7f041b35b7804b7d62520f279cc6e53e40d73c
parentc5c1916dd1b77e22759d58b5b361c56672983e3e (diff)
downloadspark-a03ac222d84025a1036750e1179136a13f75dea7.tar.gz
spark-a03ac222d84025a1036750e1179136a13f75dea7.tar.bz2
spark-a03ac222d84025a1036750e1179136a13f75dea7.zip
Fix Scala Style
Any comments are welcome Author: Sandeep <sandeep@techaddict.me> Closes #531 from techaddict/stylefix-1 and squashes the following commits: 7492730 [Sandeep] Pass 4 98b2428 [Sandeep] fix rxin suggestions b5e2e6f [Sandeep] Pass 3 05932d7 [Sandeep] fix if else styling 2 08690e5 [Sandeep] fix if else styling
-rw-r--r--core/src/main/scala/org/apache/spark/Accumulators.scala7
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/master/Master.scala3
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala16
-rw-r--r--core/src/main/scala/org/apache/spark/storage/BlockManager.scala8
-rw-r--r--core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala12
-rw-r--r--core/src/main/scala/org/apache/spark/util/FileLogger.scala4
-rw-r--r--core/src/main/scala/org/apache/spark/util/Utils.scala3
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LogQuery.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala7
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala7
-rw-r--r--repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala13
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala5
-rw-r--r--sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala7
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala3
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala9
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala3
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala67
20 files changed, 109 insertions, 83 deletions
diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala
index d5f3e3f6ec..6d652faae1 100644
--- a/core/src/main/scala/org/apache/spark/Accumulators.scala
+++ b/core/src/main/scala/org/apache/spark/Accumulators.scala
@@ -104,8 +104,11 @@ class Accumulable[R, T] (
* Set the accumulator's value; only allowed on master.
*/
def value_= (newValue: R) {
- if (!deserialized) value_ = newValue
- else throw new UnsupportedOperationException("Can't assign accumulator value in task")
+ if (!deserialized) {
+ value_ = newValue
+ } else {
+ throw new UnsupportedOperationException("Can't assign accumulator value in task")
+ }
}
/**
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index cc976565cc..c3e8c6b8c6 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (k.startsWith("spark")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
- }
- else {
+ } else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 81f990bfa6..fdb633bd33 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -237,8 +237,7 @@ private[spark] class Master(
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self ! DriverStateChanged(driverId, DriverState.KILLED, None)
- }
- else {
+ } else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index f918b42c83..662d37871e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -91,9 +91,11 @@ private[spark] class DriverRunner(
}
val state =
- if (killed) { DriverState.KILLED }
- else if (finalException.isDefined) { DriverState.ERROR }
- else {
+ if (killed) {
+ DriverState.KILLED
+ } else if (finalException.isDefined) {
+ DriverState.ERROR
+ } else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
index fec1207948..8381f59672 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/LogPage.scala
@@ -89,8 +89,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
</a>
- }
- else {
+ } else {
<button type="button" class="btn btn-default" disabled="disabled">
Previous 0 B
</button>
@@ -104,8 +103,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
</button>
</a>
- }
- else {
+ } else {
<button type="button" class="btn btn-default" disabled="disabled">
Next 0 B
</button>
@@ -137,9 +135,13 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val logLength = file.length()
val getOffset = offset.getOrElse(logLength - defaultBytes)
val startByte =
- if (getOffset < 0) 0L
- else if (getOffset > logLength) logLength
- else getOffset
+ if (getOffset < 0) {
+ 0L
+ } else if (getOffset > logLength) {
+ logLength
+ } else {
+ getOffset
+ }
val logPageLength = math.min(byteLength, maxBytes)
val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
index ccd5c5320a..02ba5ecf52 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala
@@ -281,7 +281,9 @@ private[spark] class BlockManager(
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
- } else true
+ } else {
+ true
+ }
}
/**
@@ -676,7 +678,7 @@ private[spark] class BlockManager(
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
- case ByteBufferValues(bytes) =>
+ case ByteBufferValues(bytes) =>
bytes.rewind()
tachyonStore.putBytes(blockId, bytes, level)
}
@@ -695,7 +697,7 @@ private[spark] class BlockManager(
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
- case ByteBufferValues(bytes) =>
+ case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}
diff --git a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
index b9f4a5d720..1b2b1932e0 100644
--- a/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
+++ b/core/src/main/scala/org/apache/spark/util/BoundedPriorityQueue.scala
@@ -43,8 +43,11 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
}
override def +=(elem: A): this.type = {
- if (size < maxSize) underlying.offer(elem)
- else maybeReplaceLowest(elem)
+ if (size < maxSize) {
+ underlying.offer(elem)
+ } else {
+ maybeReplaceLowest(elem)
+ }
this
}
@@ -59,7 +62,8 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
if (head != null && ord.gt(a, head)) {
underlying.poll()
underlying.offer(a)
- } else false
+ } else {
+ false
+ }
}
}
-
diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
index 7d47b2a72a..1ed3b70bb2 100644
--- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala
+++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala
@@ -113,7 +113,9 @@ private[spark] class FileLogger(
* @param withTime Whether to prepend message with a timestamp
*/
def log(msg: String, withTime: Boolean = false) {
- val writeInfo = if (!withTime) msg else {
+ val writeInfo = if (!withTime) {
+ msg
+ } else {
val date = new Date(System.currentTimeMillis())
dateFormat.get.format(date) + ": " + msg
}
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index a3af4e7b91..d333e2a88c 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -811,8 +811,7 @@ private[spark] object Utils extends Logging {
} else {
el.getMethodName
}
- }
- else {
+ } else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
firstUserClass = el.getClassName
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index d7c90346d8..2676558bfc 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -381,8 +381,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val prng42 = new Random(42)
val prng43 = new Random(43)
Array(1, 2, 3, 4, 5, 6).filter{i =>
- if (i < 4) 0 == prng42.nextInt(3)
- else 0 == prng43.nextInt(3)}
+ if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3)
+ }
}
assert(sample.size === checkSample.size)
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))
diff --git a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
index 820e87d04f..f77a444ff7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LogQuery.scala
@@ -49,8 +49,7 @@ object LogQuery {
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
val dataSet =
- if (args.length == 2) sc.textFile(args(1))
- else sc.parallelize(exampleApacheLogs)
+ if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs)
// scalastyle:off
val apacheLogRegex =
"""^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
index edc769c597..673013f7cf 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -69,8 +69,11 @@ object PageViewStream {
val normalCount = statuses.filter(_ == 200).size
val errorCount = statuses.size - normalCount
val errorRatio = errorCount.toFloat / statuses.size
- if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
- else {"%s: %s".format(zip, errorRatio)}
+ if (errorRatio > 0.05) {
+ "%s: **%s**".format(zip, errorRatio)
+ } else {
+ "%s: %s".format(zip, errorRatio)
+ }
}
// Return the number unique users in last 15 seconds
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
index a467ca1ae7..ea94d4accb 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala
@@ -165,8 +165,11 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
// not have any edges in the specified direction.
assert(edges.count === 50)
edges.collect.foreach {
- case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size == 2)
- else assert(edges.size == 1)
+ case (vid, edges) => if (vid > 0 && vid < 49) {
+ assert(edges.size == 2)
+ } else {
+ assert(edges.size == 1)
+ }
}
edges.collect.foreach {
case (vid, edges) =>
diff --git a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
index dcc139544e..f8432c8af6 100644
--- a/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
+++ b/repl/src/main/scala/org/apache/spark/repl/SparkExprTyper.scala
@@ -47,9 +47,13 @@ trait SparkExprTyper extends Logging {
var isIncomplete = false
reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
val trees = codeParser.stmts(line)
- if (reporter.hasErrors) Some(Nil)
- else if (isIncomplete) None
- else Some(trees)
+ if (reporter.hasErrors) {
+ Some(Nil)
+ } else if (isIncomplete) {
+ None
+ } else {
+ Some(trees)
+ }
}
}
// def parsesAsExpr(line: String) = {
@@ -70,8 +74,7 @@ trait SparkExprTyper extends Logging {
val sym0 = symbolOfTerm(name)
// drop NullaryMethodType
val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType)
- if (sym.info.typeSymbol eq UnitClass) NoSymbol
- else sym
+ if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
case _ => NoSymbol
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
index d5846baa72..f825ca3c02 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableOperations.scala
@@ -203,8 +203,9 @@ case class InsertIntoParquetTable(
val stageId = sc.newRddId()
val taskIdOffset =
- if (overwrite) 1
- else {
+ if (overwrite) {
+ 1
+ } else {
FileSystemHelper
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 84b1b46094..71ba0fecce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -158,8 +158,11 @@ private[parquet] class CatalystGroupConverter(
a => a.dataType match {
case ctype: NativeType =>
// note: for some reason matching for StringType fails so use this ugly if instead
- if (ctype == StringType) new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
- else new CatalystPrimitiveConverter(this, schema.indexOf(a))
+ if (ctype == StringType) {
+ new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
+ } else {
+ new CatalystPrimitiveConverter(this, schema.indexOf(a))
+ }
case _ => throw new RuntimeException(
s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter")
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 3d2537f6f2..557e0961d5 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -240,8 +240,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
- }
- else {
+ } else {
ssc.sc.makeRDD(receivers, receivers.size)
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 2f74965900..fc13dbecb4 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))
-
+
localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
- }
+ }
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
@@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.allocateContainers(missingExecutorCount)
+ } else {
+ sendProgress()
}
- else sendProgress()
Thread.sleep(sleepTime)
}
}
@@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
-
+
logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index ea356f33eb..65b7215afb 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
+ } else {
+ sendProgress()
}
- else sendProgress()
Thread.sleep(sleepTime)
}
}
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 95f0f9d0ff..856391e52b 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -60,12 +60,12 @@ object AllocationType extends Enumeration {
*/
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
- val resourceManager: AMRMProtocol,
+ val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
val maxExecutors: Int,
val executorMemory: Int,
val executorCores: Int,
- val preferredHostToCount: Map[String, Int],
+ val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
extends Logging {
@@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler(
val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
containers += container
+ } else {
+ // Add all ignored containers to released list
+ releasedContainerList.add(container.getId())
}
- // Add all ignored containers to released list
- else releasedContainerList.add(container.getId())
}
// Find the appropriate containers to use. Slightly non trivial groupBy ...
@@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(candidateHost, remainingContainers)
// all consumed
remainingContainers = null
- }
- else if (requiredHostCount > 0) {
+ } else if (requiredHostCount > 0) {
// Container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredHostCount) and rest as remainingContainer
@@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler(
// remainingContainers = remaining
// yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
- // add remaining to release list. If we have insufficient containers, next allocation
+ // add remaining to release list. If we have insufficient containers, next allocation
// cycle will reallocate (but wont treat it as data local)
for (container <- remaining) releasedContainerList.add(container.getId())
remainingContainers = null
@@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler(
if (rack != null){
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
- val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
+ val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
rackLocalContainers.get(rack).getOrElse(List()).size
@@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(rack, remainingContainers)
// All consumed
remainingContainers = null
- }
- else if (requiredRackCount > 0) {
+ } else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredRackCount) and rest as remainingContainer
@@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler(
}
}
- // Now that we have split the containers into various groups, go through them in order :
+ // Now that we have split the containers into various groups, go through them in order :
// first host local, then rack local and then off rack (everything else).
// Note that the list we create below tries to ensure that not all containers end up within a
// host if there are sufficiently large number of hosts/containers.
@@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler(
releasedContainerList.add(containerId)
// reset counter back to old value.
numExecutorsRunning.decrementAndGet()
- }
- else {
+ } else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
@@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler(
// Was this released by us ? If yes, then simply remove from containerSet and move on.
if (pendingReleaseContainers.containsKey(containerId)) {
pendingReleaseContainers.remove(containerId)
- }
- else {
+ } else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
@@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler(
assert (containerSet != null)
containerSet -= containerId
- if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
- else allocatedHostToContainersMap.update(host, containerSet)
+ if (containerSet.isEmpty) {
+ allocatedHostToContainersMap.remove(host)
+ } else {
+ allocatedHostToContainersMap.update(host, containerSet)
+ }
allocatedContainerToHostMap -= containerId
@@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler(
val rack = YarnAllocationHandler.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
- if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
- else allocatedRackCount.remove(rack)
+ if (rackCount > 0) {
+ allocatedRackCount.put(rack, rackCount)
+ } else {
+ allocatedRackCount.remove(rack)
+ }
}
}
}
@@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler(
}
}
- val requestedContainers: ArrayBuffer[ResourceRequest] =
+ val requestedContainers: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](rackToCounts.size)
for ((rack, count) <- rackToCounts){
- requestedContainers +=
+ requestedContainers +=
createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
}
@@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler(
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
- }
- else {
- // request for all hosts in preferred nodes and for numExecutors -
+ } else {
+ // request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
- val hostContainerRequests: ArrayBuffer[ResourceRequest] =
+ val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
@@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler(
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
- }
- else {
+ } else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
@@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
- requestType: AllocationType.AllocationType,
+ requestType: AllocationType.AllocationType,
resource:String,
numExecutors: Int,
priority: Int): ResourceRequest = {
@@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler(
if (! retval.isEmpty) {
releasedContainerList.removeAll(retval)
for (v <- retval) pendingReleaseContainers.put(v, true)
- logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
+ logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
pendingReleaseContainers)
}
@@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler(
object YarnAllocationHandler {
val ANY_HOST = "*"
- // All requests are issued with same priority : we do not (yet) have any distinction between
+ // All requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val PRIORITY = 1
@@ -548,7 +549,7 @@ object YarnAllocationHandler {
// Host to rack map - saved from allocation requests
// We are expecting this not to change.
- // Note that it is possible for this to change : and RM will indicate that to us via update
+ // Note that it is possible for this to change : and RM will indicate that to us via update
// response to allocate. But we are punting on handling that for now.
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
@@ -565,7 +566,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numExecutors,
+ args.numExecutors,
args.executorMemory,
args.executorCores,
Map[String, Int](),
@@ -587,7 +588,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
- args.numExecutors,
+ args.numExecutors,
args.executorMemory,
args.executorCores,
hostToCount,