aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorHarvey Feng <harvey@databricks.com>2013-11-21 03:55:03 -0800
committerHarvey Feng <harvey@databricks.com>2013-11-23 17:08:30 -0800
commita67ebf43776d9b66c077c48f2c9d1976791ca4e8 (patch)
treeea7165044887a3be4e015c1c931cd97bb6be0a8d /yarn
parent9eae80f11157c81169e2b396017a6b85967e6ad5 (diff)
downloadspark-a67ebf43776d9b66c077c48f2c9d1976791ca4e8.tar.gz
spark-a67ebf43776d9b66c077c48f2c9d1976791ca4e8.tar.bz2
spark-a67ebf43776d9b66c077c48f2c9d1976791ca4e8.zip
A few more style fixes in `yarn` package.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala4
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala17
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala95
3 files changed, 71 insertions, 45 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 9c43a7287d..240ed8b32a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -176,8 +176,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
driverUp = true
} catch {
case e: Exception => {
- logWarning("Failed to connect to driver at %s:%s, retrying ...").
- format(driverHost, driverPort)
+ logWarning("Failed to connect to driver at %s:%s, retrying ...".
+ format(driverHost, driverPort))
Thread.sleep(100)
tries = tries + 1
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 68527fbdc7..b3a6d2b8eb 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -88,15 +88,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
def validateArgs() = {
- Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ Map(
+ (System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
(args.userJar == null) -> "Error: You must specify a user jar!",
(args.userClass == null) -> "Error: You must specify a user class!",
(args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
- (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
- ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
- (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
- ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
- .foreach { case(cond, errStr) =>
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: AM memory size must be " +
+ "greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> ("Error: Worker memory size " +
+ "must be greater than: " + YarnAllocationHandler.MEMORY_OVERHEAD)
+ ).foreach { case(cond, errStr) =>
if (cond) {
logError(errStr)
args.printUsageAndExit(1)
@@ -120,7 +121,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
queueInfo.getCurrentCapacity,
queueInfo.getMaximumCapacity,
queueInfo.getApplications.size,
- queueInfo.getChildQueues.size)
+ queueInfo.getChildQueues.size))
}
def verifyClusterResources(app: GetNewApplicationResponse) = {
@@ -242,7 +243,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var localURI = new URI(localPath)
// if not specified assume these are in the local filesystem to keep behavior like Hadoop
if (localURI.getScheme() == null) {
- localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString())
+ localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString)
}
val setPermissions = if (destName.equals(Client.APP_JAR)) true else false
val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 2a08255bf3..f15f3c7c11 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -52,8 +52,8 @@ object AllocationType extends Enumeration ("HOST", "RACK", "ANY") {
// make it more proactive and decoupled.
// Note that right now, we assume all node asks as uniform in terms of capabilities and priority
-// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for more info
-// on how we are requesting for containers.
+// Refer to http://developer.yahoo.com/blogs/hadoop/posts/2011/03/mapreduce-nextgen-scheduler/ for
+// more info on how we are requesting for containers.
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
@@ -105,13 +105,20 @@ private[yarn] class YarnAllocationHandler(
val amResp = allocateWorkerResources(workersToRequest).getAMResponse
val _allocatedContainers = amResp.getAllocatedContainers()
- if (_allocatedContainers.size > 0) {
-
- logDebug("Allocated " + _allocatedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
- logDebug("Cluster Resources: " + amResp.getAvailableResources)
+ if (_allocatedContainers.size > 0) {
+ logDebug("""
+ Allocated containers: %d
+ Current worker count: %d
+ Containers released: %s
+ Containers to be released: %s
+ Cluster resources: %s
+ """.format(
+ _allocatedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers,
+ amResp.getAvailableResources))
val hostToContainers = new HashMap[String, ArrayBuffer[Container]]()
@@ -150,9 +157,10 @@ private[yarn] class YarnAllocationHandler(
}
else if (requiredHostCount > 0) {
// Container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredHostCount)
- // and rest as remainingContainer
- val (dataLocal, remaining) = remainingContainers.splitAt(remainingContainers.size - requiredHostCount)
+ // Split into two : data local container count of (remainingContainers.size -
+ // requiredHostCount) and rest as remainingContainer
+ val (dataLocal, remaining) = remainingContainers.splitAt(
+ remainingContainers.size - requiredHostCount)
dataLocalContainers.put(candidateHost, dataLocal)
// remainingContainers = remaining
@@ -181,8 +189,8 @@ private[yarn] class YarnAllocationHandler(
}
else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
- // Split into two : data local container count of (remainingContainers.size - requiredRackCount)
- // and rest as remainingContainer
+ // Split into two : data local container count of (remainingContainers.size -
+ // requiredRackCount) and rest as remainingContainer
val (rackLocal, remaining) = remainingContainers.splitAt(
remainingContainers.size - requiredRackCount)
val existingRackLocal = rackLocalContainers.getOrElseUpdate(rack,
@@ -216,11 +224,12 @@ private[yarn] class YarnAllocationHandler(
val workerHostname = container.getNodeId.getHost
val containerId = container.getId
- assert(container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ assert(
+ container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
if (numWorkersRunningNow > maxWorkers) {
- logInfo("Ignoring container " + containerId + " at host " + workerHostname +
- " .. we already have required number of containers")
+ logInfo("""Ignoring container %s at host %s, since we already have the required number of
+ containers for it.""".format(containerId, workerHostname))
releasedContainerList.add(containerId)
// reset counter back to old value.
numWorkersRunning.decrementAndGet()
@@ -245,7 +254,9 @@ private[yarn] class YarnAllocationHandler(
containerSet += containerId
allocatedContainerToHostMap.put(containerId, workerHostname)
- if (rack != null) allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ if (rack != null) {
+ allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
+ }
}
new Thread(
@@ -254,17 +265,23 @@ private[yarn] class YarnAllocationHandler(
).start()
}
}
- logDebug("After allocated " + allocatedContainers.size + " containers (orig : " +
- _allocatedContainers.size + "), current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
+ logDebug("""
+ Finished processing %d containers.
+ Current number of workers running: %d,
+ releasedContainerList: %s,
+ pendingReleaseContainers: %s
+ """.format(
+ allocatedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers))
}
val completedContainers = amResp.getCompletedContainersStatuses()
if (completedContainers.size > 0){
- logDebug("Completed " + completedContainers.size + " containers, current count " + numWorkersRunning.get() +
- ", to-be-released " + releasedContainerList + ", pendingReleaseContainers : " + pendingReleaseContainers)
-
+ logDebug("Completed %d containers, to-be-released: %s".format(
+ completedContainers.size, releasedContainerList))
for (completedContainer <- completedContainers){
val containerId = completedContainer.getContainerId
@@ -275,9 +292,10 @@ private[yarn] class YarnAllocationHandler(
else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numWorkersRunning.decrementAndGet()
- logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
-
+ logInfo("Completed container %s (state: %s, exit status: %s)".format(
+ containerId,
+ completedContainer.getState,
+ completedContainer.getExitStatus()))
// Hadoop 2.2.X added a ContainerExitStatus we should switch to use
// there are some exit status' we shouldn't necessarily count against us, but for
// now I think its ok as none of the containers are expected to exit
@@ -311,9 +329,16 @@ private[yarn] class YarnAllocationHandler(
}
}
}
- logDebug("After completed " + completedContainers.size + " containers, current count " +
- numWorkersRunning.get() + ", to-be-released " + releasedContainerList +
- ", pendingReleaseContainers : " + pendingReleaseContainers)
+ logDebug("""
+ Finished processing %d completed containers.
+ Current number of workers running: %d,
+ releasedContainerList: %s,
+ pendingReleaseContainers: %s
+ """.format(
+ completedContainers.size,
+ numWorkersRunning.get(),
+ releasedContainerList,
+ pendingReleaseContainers))
}
}
@@ -367,7 +392,7 @@ private[yarn] class YarnAllocationHandler(
// default.
if (numWorkers <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numWorkers: " + numWorkers + ", host preferences ? " + preferredHostToCount.isEmpty)
+ logDebug("numWorkers: " + numWorkers + ", host preferences: " + preferredHostToCount.isEmpty)
resourceRequests = List(
createResourceRequest(AllocationType.ANY, null, numWorkers, YarnAllocationHandler.PRIORITY))
}
@@ -397,7 +422,7 @@ private[yarn] class YarnAllocationHandler(
YarnAllocationHandler.PRIORITY)
val containerRequests: ArrayBuffer[ResourceRequest] = new ArrayBuffer[ResourceRequest](
- hostContainerRequests.size() + rackContainerRequests.size() + 1)
+ hostContainerRequests.size + rackContainerRequests.size + 1)
containerRequests ++= hostContainerRequests
containerRequests ++= rackContainerRequests
@@ -416,20 +441,20 @@ private[yarn] class YarnAllocationHandler(
req.addAllReleases(releasedContainerList)
if (numWorkers > 0) {
- logInfo("Allocating %d worker containers with %d of memory each.").format(numWorkers,
- workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
+ logInfo("Allocating %d worker containers with %d of memory each.".format(numWorkers,
+ workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
for (request <- resourceRequests) {
- logInfo("ResourceRequest (host : %s, num containers: %d, priority = %d , capability : %s)").
+ logInfo("ResourceRequest (host : %s, num containers: %d, priority = %s , capability : %s)".
format(
request.getHostName,
request.getNumContainers,
request.getPriority,
- request.getCapability)
+ request.getCapability))
}
resourceManager.allocate(req)
}