diff options
4 files changed, 22 insertions, 24 deletions
diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e58a926b59..71a64ecf58 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -110,15 +110,15 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa appContext } - def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { - val minResMemory: Int = newApp.getMinimumResourceCapability().getMemory() + def calculateAMMemory(newApp: GetNewApplicationResponse): Int = { + val minResMemory = newApp.getMinimumResourceCapability().getMemory() val amMemory = ((args.amMemory / minResMemory) * minResMemory) + ((if ((args.amMemory % minResMemory) == 0) 0 else minResMemory) - - YarnAllocationHandler.MEMORY_OVERHEAD) + YarnAllocationHandler.MEMORY_OVERHEAD) amMemory } - def setupSecurityToken(amContainer :ContainerLaunchContext) = { + def setupSecurityToken(amContainer: ContainerLaunchContext) = { // Setup security tokens. val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) @@ -154,7 +154,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ) val state = report.getYarnApplicationState() - val dsStatus = report.getFinalApplicationStatus() if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 96e998ab61..2db5744be1 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -54,8 +54,6 @@ trait ClientBase extends Logging { val args: ClientArguments val conf: Configuration val sparkConf: SparkConf - - //var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration val credentials = UserGroupInformation.getCurrentUser().getCredentials() private val SPARK_STAGING: String = ".sparkStaging" @@ -140,9 +138,10 @@ trait ClientBase extends Logging { } //check for ports if (srcUri.getPort() != dstUri.getPort()) { - return false + false + } else { + true } - return true } /** Copy the file into HDFS if needed. */ @@ -169,7 +168,7 @@ trait ClientBase extends Logging { destPath } - def qualifyForLocal(localURI : URI): Path = { + def qualifyForLocal(localURI: URI): Path = { var qualifiedURI = localURI // If not specified assume these are in the local filesystem to keep behavior like Hadoop if (qualifiedURI.getScheme() == null) { @@ -296,9 +295,9 @@ trait ClientBase extends Logging { retval.toString } - def calculateAMMemory(newApp: GetNewApplicationResponse) :Int + def calculateAMMemory(newApp: GetNewApplicationResponse): Int - def setupSecurityToken(amContainer :ContainerLaunchContext) + def setupSecurityToken(amContainer: ContainerLaunchContext) def createContainerLaunchContext( newApp: GetNewApplicationResponse, diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala index 3c9379de84..bfa8f84bf7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnableUtil.scala @@ -45,11 +45,12 @@ trait WorkerRunnableUtil extends Logging { val sparkConf: SparkConf lazy val env = prepareEnvironment - def prepareCommand(masterAddress: String, - slaveId: String, - hostname: String, - workerMemory: Int, - workerCores: Int) = { + def prepareCommand( + masterAddress: String, + slaveId: String, + hostname: String, + workerMemory: Int, + workerCores: Int) = { // Extra options for the JVM var JAVA_OPTS = "" // Set the JVM memory diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index fef4702c66..837b7e12cb 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -108,11 +108,11 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa val queueInfo: QueueInfo = super.getQueueInfo(args.amQueue) logInfo( """Queue info ... queueName: %s, queueCurrentCapacity: %s, queueMaxCapacity: %s, queueApplicationCount = %s, queueChildQueueCount = %s""".format( - queueInfo.getQueueName, - queueInfo.getCurrentCapacity, - queueInfo.getMaximumCapacity, - queueInfo.getApplications.size, - queueInfo.getChildQueues.size)) + queueInfo.getQueueName, + queueInfo.getCurrentCapacity, + queueInfo.getMaximumCapacity, + queueInfo.getApplications.size, + queueInfo.getChildQueues.size)) } def calculateAMMemory(newApp: GetNewApplicationResponse) :Int = { @@ -124,7 +124,7 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa args.amMemory } - def setupSecurityToken(amContainer :ContainerLaunchContext) = { + def setupSecurityToken(amContainer: ContainerLaunchContext) = { // Setup security tokens. val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) @@ -160,7 +160,6 @@ class Client(clientArgs: ClientArguments, hadoopConf: Configuration, spConf: Spa ) val state = report.getYarnApplicationState() - val dsStatus = report.getFinalApplicationStatus() if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { |