aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-04-16 09:34:59 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-16 09:34:59 -0700
commit77f836799639ea939a1773cef2f4828b381f5ca2 (patch)
tree9b6bae7218d4a12f99269da88cf05487c5ab5395 /yarn
parentc3527a333a0877f4b49614f3fd1f041b01749651 (diff)
downloadspark-77f836799639ea939a1773cef2f4828b381f5ca2.tar.gz
spark-77f836799639ea939a1773cef2f4828b381f5ca2.tar.bz2
spark-77f836799639ea939a1773cef2f4828b381f5ca2.zip
SPARK-1497. Fix scalastyle warnings in YARN, Hive code
(I wasn't sure how to automatically set `SPARK_YARN=true` and `SPARK_HIVE=true` when running scalastyle, but these are the errors that turn up.) Author: Sean Owen <sowen@cloudera.com> Closes #413 from srowen/SPARK-1497 and squashes the following commits: f0c9318 [Sean Owen] Fix more scalastyle warnings in yarn 80bf4c3 [Sean Owen] Add YARN alpha / YARN profile to scalastyle check 026319c [Sean Owen] Fix scalastyle warnings in YARN, Hive code
Diffstat (limited to 'yarn')
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala21
-rw-r--r--yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala11
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala3
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala8
-rw-r--r--yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala7
5 files changed, 30 insertions, 20 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index 7b0e020263..21f14576ef 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -37,7 +37,8 @@ import org.apache.spark.scheduler.SplitInfo
class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sparkConf: SparkConf)
extends Logging {
- def this(args: ApplicationMasterArguments, sparkConf: SparkConf) = this(args, new Configuration(), sparkConf)
+ def this(args: ApplicationMasterArguments, sparkConf: SparkConf) =
+ this(args, new Configuration(), sparkConf)
def this(args: ApplicationMasterArguments) = this(args, new SparkConf())
@@ -63,7 +64,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+ // Send a hello message thus the connection is actually established, thus we can
+ // monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -104,8 +106,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+ // Launch a progress reporter thread, else app will get killed after expiration
+ // (def: 10mins) timeout ensure that progress is sent before
+ // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
@@ -163,8 +166,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
val appMasterRequest = Records.newRecord(classOf[RegisterApplicationMasterRequest])
.asInstanceOf[RegisterApplicationMasterRequest]
appMasterRequest.setApplicationAttemptId(appAttemptId)
- // Setting this to master host,port - so that the ApplicationReport at client has some sensible info.
- // Users can then monitor stderr/stdout on that node if required.
+ // Setting this to master host,port - so that the ApplicationReport at client has
+ // some sensible info. Users can then monitor stderr/stdout on that node if required.
appMasterRequest.setHost(Utils.localHostName())
appMasterRequest.setRpcPort(0)
// What do we provide here ? Might make sense to expose something sensible later ?
@@ -213,7 +216,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// TODO: This is a bit ugly. Can we make it nicer?
// TODO: Handle container failure
while ((yarnAllocator.getNumExecutorsRunning < args.numExecutors) && (!driverClosed)) {
- yarnAllocator.allocateContainers(math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
+ yarnAllocator.allocateContainers(
+ math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0))
Thread.sleep(100)
}
@@ -230,7 +234,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
while (!driverClosed) {
val missingExecutorCount = args.numExecutors - yarnAllocator.getNumExecutorsRunning
if (missingExecutorCount > 0) {
- logInfo("Allocating " + missingExecutorCount + " containers to make up for (potentially ?) lost containers")
+ logInfo("Allocating " + missingExecutorCount +
+ " containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
}
else sendProgress()
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 2056667af5..d6d46a5f6c 100644
--- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -225,8 +225,8 @@ private[yarn] class YarnAllocationHandler(
val executorHostname = container.getNodeId.getHost
val containerId = container.getId
- assert(
- container.getResource.getMemory >= (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
+ assert( container.getResource.getMemory >=
+ (executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
if (numExecutorsRunningNow > maxExecutors) {
logInfo("""Ignoring container %s at host %s, since we already have the required number of
@@ -393,9 +393,10 @@ private[yarn] class YarnAllocationHandler(
// default.
if (numExecutors <= 0 || preferredHostToCount.isEmpty) {
- logDebug("numExecutors: " + numExecutors + ", host preferences: " + preferredHostToCount.isEmpty)
- resourceRequests = List(
- createResourceRequest(AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
+ logDebug("numExecutors: " + numExecutors + ", host preferences: " +
+ preferredHostToCount.isEmpty)
+ resourceRequests = List(createResourceRequest(
+ AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
// request for all hosts in preferred nodes and for numExecutors -
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 61af0f9ac5..581cfe43b6 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -137,7 +137,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
System.getenv(ApplicationConstants.APPLICATION_WEB_PROXY_BASE_ENV)
val params = "PROXY_HOST=" + parts(0) + "," + "PROXY_URI_BASE=" + uriBase
- System.setProperty("spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
+ System.setProperty(
+ "spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.params", params)
}
/** Get the Yarn approved local directories. */
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
index b697f10391..67ed591c78 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorLauncher.scala
@@ -65,7 +65,8 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
override def preStart() {
logInfo("Listen to driver: " + driverUrl)
driver = context.actorSelection(driverUrl)
- // Send a hello message thus the connection is actually established, thus we can monitor Lifecycle Events.
+ // Send a hello message thus the connection is actually established,
+ // thus we can monitor Lifecycle Events.
driver ! "Hello"
context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
}
@@ -95,8 +96,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
// Allocate all containers
allocateExecutors()
- // Launch a progress reporter thread, else app will get killed after expiration (def: 10mins) timeout
- // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
+ // Launch a progress reporter thread, else app will get killed after expiration
+ // (def: 10mins) timeout ensure that progress is sent before
+ // YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
// we want to be reasonably responsive without causing too many requests to RM.
diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index e31c4060e8..4fafae1aff 100644
--- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -276,7 +276,8 @@ private[yarn] class YarnAllocationHandler(
allocatedRackCount.put(rack, allocatedRackCount.getOrElse(rack, 0) + 1)
}
}
- logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(driverUrl, executorHostname))
+ logInfo("Launching ExecutorRunnable. driverUrl: %s, executorHostname: %s".format(
+ driverUrl, executorHostname))
val executorRunnable = new ExecutorRunnable(
container,
conf,
@@ -314,8 +315,8 @@ private[yarn] class YarnAllocationHandler(
// `pendingReleaseContainers`.
pendingReleaseContainers.remove(containerId)
} else {
- // Decrement the number of executors running. The next iteration of the ApplicationMaster's
- // reporting thread will take care of allocating.
+ // Decrement the number of executors running. The next iteration of
+ // the ApplicationMaster's reporting thread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
containerId,