aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
committerPrashant Sharma <prashant.s@imaginea.com>2013-11-21 12:34:46 +0530
commit95d8dbce91f49467050250d5cf3671aaaa648d76 (patch)
tree06e2583c63cdf39d6d15d36a3189c2e6db0148ba /yarn
parent199e9cf02dfaa372c1f067bca54556e1f6ce787d (diff)
parent2fead510f74b962b293de4d724136c24a9825271 (diff)
downloadspark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.gz
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.bz2
spark-95d8dbce91f49467050250d5cf3671aaaa648d76.zip
Merge branch 'master' of github.com:apache/incubator-spark into scala-2.10-temp
Conflicts: core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala53
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala63
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala6
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala13
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala16
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala5
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala2
7 files changed, 90 insertions, 68 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 4302ef4cda..a7baf0c36c 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -17,11 +17,11 @@
package org.apache.spark.deploy.yarn
-import java.io.IOException;
+import java.io.IOException
import java.net.Socket
-import java.security.PrivilegedExceptionAction
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
+
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.net.NetUtils
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
import org.apache.spark.{SparkContext, Logging}
import org.apache.spark.util.Utils
-import org.apache.hadoop.security.UserGroupInformation
+
import scala.collection.JavaConversions._
class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging {
@@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES,
YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES)
private var isLastAMRetry: Boolean = true
-
+ // default to numWorkers * 2, with minimum of 3
+ private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures",
+ math.max(args.numWorkers * 2, 3).toString()).toInt
def run() {
// setup the directories so things go to yarn approved directories rather
@@ -65,7 +67,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30)
appAttemptId = getApplicationAttemptId()
- isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts;
+ isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts
resourceManager = registerWithResourceManager()
// Workaround until hadoop moves to something which has
@@ -186,8 +188,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
var successed = false
try {
// Copy
- var mainArgs: Array[String] = new Array[String](args.userArgs.size())
- args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size())
+ var mainArgs: Array[String] = new Array[String](args.userArgs.size)
+ args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size)
mainMethod.invoke(null, mainArgs)
// some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR
// userThread will stop here unless it has uncaught exception thrown out
@@ -195,7 +197,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
successed = true
} finally {
logDebug("finishing main")
- isLastAMRetry = true;
+ isLastAMRetry = true
if (successed) {
ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED)
} else {
@@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
if (null != sparkContext) {
uiAddress = sparkContext.ui.appUIAddress
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args,
- sparkContext.preferredNodeLocationData)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args, sparkContext.preferredNodeLocationData)
} else {
logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime +
- ", numTries = " + numTries)
- this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args)
+ ", numTries = " + numTries)
+ this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager,
+ appAttemptId, args)
}
}
} finally {
@@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
while(yarnAllocator.getNumWorkersRunning < args.numWorkers &&
// If user thread exists, then quit !
userThread.isAlive) {
-
- this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
+ yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0))
ApplicationMaster.incrementAllocatorLoop(1)
Thread.sleep(100)
}
@@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
// ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse.
val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
- // must be <= timeoutInterval/ 2.
- // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM.
- // so atleast 1 minute or timeoutInterval / 10 - whichever is higher.
- val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L))
+
+ // we want to be reasonably responsive without causing too many requests to RM.
+ val schedulerInterval =
+ System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong
+
+ // must be <= timeoutInterval / 2.
+ val interval = math.min(timeoutInterval / 2, schedulerInterval)
launchReporterThread(interval)
}
}
- // TODO: We might want to extend this to allocate more containers in case they die !
private def launchReporterThread(_sleepTime: Long): Thread = {
val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime
val t = new Thread {
override def run() {
while (userThread.isAlive) {
+ if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) {
+ finishApplicationMaster(FinalApplicationStatus.FAILED,
+ "max number of worker failures reached")
+ }
val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning
if (missingWorkerCount > 0) {
logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers")
@@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
}
*/
- def finishApplicationMaster(status: FinalApplicationStatus) {
+ def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") {
synchronized {
if (isFinished) {
@@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e
.asInstanceOf[FinishApplicationMasterRequest]
finishReq.setAppAttemptId(appAttemptId)
finishReq.setFinishApplicationStatus(status)
+ finishReq.setDiagnostics(diagnostics)
// set tracking url to empty since we don't have a history server
finishReq.setTrackingUrl("")
resourceManager.finishApplicationMaster(finishReq)
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 4e0e060ddc..94e353af2e 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -17,14 +17,13 @@
package org.apache.spark.deploy.yarn
-import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI}
+import java.net.{InetAddress, UnknownHostException, URI}
import java.nio.ByteBuffer
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil}
-import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsPermission
import org.apache.hadoop.mapred.Master
-import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.api._
@@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap
import scala.collection.mutable.Map
import scala.collection.JavaConversions._
-import org.apache.spark.Logging
-import org.apache.spark.util.Utils
-import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.Logging
class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging {
@@ -60,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short)
def run() {
+ validateArgs()
+
init(yarnConf)
start()
logClusterResourceDetails()
@@ -84,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
System.exit(0)
}
+ def validateArgs() = {
+ Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!",
+ (args.userJar == null) -> "Error: You must specify a user jar!",
+ (args.userClass == null) -> "Error: You must specify a user class!",
+ (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!",
+ (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD),
+ (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) ->
+ ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString()))
+ .foreach { case(cond, errStr) =>
+ if (cond) {
+ logError(errStr)
+ args.printUsageAndExit(1)
+ }
+ }
+ }
+
def getAppStagingDir(appId: ApplicationId): String = {
SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR
}
@@ -97,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size +
", queueChildQueueCount=" + queueInfo.getChildQueues.size)
}
-
def verifyClusterResources(app: GetNewApplicationResponse) = {
val maxMem = app.getMaximumResourceCapability().getMemory()
@@ -105,7 +120,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// if we have requested more then the clusters max for a single resource then exit.
if (args.workerMemory > maxMem) {
- logError("the worker size is to large to run on this cluster " + args.workerMemory);
+ logError("the worker size is to large to run on this cluster " + args.workerMemory)
System.exit(1)
}
val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD
@@ -142,8 +157,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
var dstHost = dstUri.getHost()
if ((srcHost != null) && (dstHost != null)) {
try {
- srcHost = InetAddress.getByName(srcHost).getCanonicalHostName();
- dstHost = InetAddress.getByName(dstHost).getCanonicalHostName();
+ srcHost = InetAddress.getByName(srcHost).getCanonicalHostName()
+ dstHost = InetAddress.getByName(dstHost).getCanonicalHostName()
} catch {
case e: UnknownHostException =>
return false
@@ -160,7 +175,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (srcUri.getPort() != dstUri.getPort()) {
return false
}
- return true;
+ return true
}
/**
@@ -172,13 +187,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
replication: Short,
setPerms: Boolean = false): Path = {
val fs = FileSystem.get(conf)
- val remoteFs = originalPath.getFileSystem(conf);
+ val remoteFs = originalPath.getFileSystem(conf)
var newPath = originalPath
if (! compareFs(remoteFs, fs)) {
newPath = new Path(dstDir, originalPath.getName())
logInfo("Uploading " + originalPath + " to " + newPath)
- FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf);
- fs.setReplication(newPath, replication);
+ FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf)
+ fs.setReplication(newPath, replication)
if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION))
}
// resolve any symlinks in the URI path so using a "current" symlink
@@ -196,7 +211,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
// Add them as local resources to the AM
val fs = FileSystem.get(conf)
- val delegTokenRenewer = Master.getMasterPrincipal(conf);
+ val delegTokenRenewer = Master.getMasterPrincipal(conf)
if (UserGroupInformation.isSecurityEnabled()) {
if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) {
logError("Can't get Master Kerberos principal for use as renewer")
@@ -208,18 +223,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
if (UserGroupInformation.isSecurityEnabled()) {
val dstFs = dst.getFileSystem(conf)
- dstFs.addDelegationTokens(delegTokenRenewer, credentials);
+ dstFs.addDelegationTokens(delegTokenRenewer, credentials)
}
val localResources = HashMap[String, LocalResource]()
FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
- if (System.getenv("SPARK_JAR") == null || args.userJar == null) {
- logError("Error: You must set SPARK_JAR environment variable and specify a user jar!")
- System.exit(1)
- }
-
Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar,
Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF"))
.foreach { case(destName, _localPath) =>
@@ -273,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
}
- UserGroupInformation.getCurrentUser().addCredentials(credentials);
+ UserGroupInformation.getCurrentUser().addCredentials(credentials)
return localResources
}
@@ -334,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
JAVA_OPTS += " -Djava.io.tmpdir=" +
new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " "
-
// Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out.
// The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same
// node, spark gc effects all other containers performance (which can also be other spark containers)
@@ -354,17 +363,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
}
// Command for the ApplicationMaster
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
}
- if (args.userClass == null) {
- logError("Error: You must specify a user class!")
- System.exit(1)
- }
-
val commands = List[String](javaCommand +
" -server " +
JAVA_OPTS +
@@ -442,6 +446,7 @@ object Client {
System.setProperty("SPARK_YARN_MODE", "true")
val args = new ClientArguments(argStrings)
+
new Client(args).run
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
index 07686fefd7..5f159b073f 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.FileStatus
@@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging {
*/
def checkPermissionOfOther(fs: FileSystem, path: Path,
action: FsAction, statCache: Map[URI, FileStatus]): Boolean = {
- val status = getFileStatus(fs, path.toUri(), statCache);
+ val status = getFileStatus(fs, path.toUri(), statCache)
val perms = status.getPermission()
val otherAction = perms.getOtherAction()
if (otherAction.implies(action)) {
- return true;
+ return true
}
return false
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
index 7a66532254..a4d6e1d87d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala
@@ -22,7 +22,7 @@ import java.nio.ByteBuffer
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
+import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.DataOutputBuffer
import org.apache.hadoop.net.NetUtils
import org.apache.hadoop.security.UserGroupInformation
@@ -38,7 +38,6 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.HashMap
import org.apache.spark.Logging
-import org.apache.spark.util.Utils
class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String,
slaveId: String, hostname: String, workerMemory: Int, workerCores: Int)
@@ -108,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
credentials.writeTokenStorageToStream(dob)
ctx.setContainerTokens(ByteBuffer.wrap(dob.getData()))
- var javaCommand = "java";
+ var javaCommand = "java"
val javaHome = System.getenv("JAVA_HOME")
if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) {
javaCommand = Environment.JAVA_HOME.$() + "/bin/java"
@@ -204,8 +203,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// use doAs and remoteUser here so we can add the container token and not
// pollute the current users credentials with all of the individual container tokens
- val user = UserGroupInformation.createRemoteUser(container.getId().toString());
- val containerToken = container.getContainerToken();
+ val user = UserGroupInformation.createRemoteUser(container.getId().toString())
+ val containerToken = container.getContainerToken()
if (containerToken != null) {
user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress))
}
@@ -216,8 +215,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
return rpc.getProxy(classOf[ContainerManager],
cmAddress, conf).asInstanceOf[ContainerManager]
}
- });
- return proxy;
+ })
+ proxy
}
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
index 4beb5229fe..baa030b4a4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala
@@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
// Used to generate a unique id per worker
private val workerIdCounter = new AtomicInteger()
private val lastResponseId = new AtomicInteger()
+ private val numWorkersFailed = new AtomicInteger()
def getNumWorkersRunning: Int = numWorkersRunning.intValue
+ def getNumWorkersFailed: Int = numWorkersFailed.intValue
def isResourceConstraintSatisfied(container: Container): Boolean = {
container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD)
@@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
else {
// simply decrement count - next iteration of ReporterThread will take care of allocating !
numWorkersRunning.decrementAndGet()
- logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState +
- " httpaddress: " + completedContainer.getDiagnostics)
+ logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState +
+ " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus())
+
+ // Hadoop 2.2.X added a ContainerExitStatus we should switch to use
+ // there are some exit status' we shouldn't necessarily count against us, but for
+ // now I think its ok as none of the containers are expected to exit
+ if (completedContainer.getExitStatus() != 0) {
+ logInfo("Container marked as failed: " + containerId)
+ numWorkersFailed.incrementAndGet()
+ }
}
allocatedHostToContainersMap.synchronized {
@@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM
val releasedContainerList = createReleasedContainerList()
req.addAllReleases(releasedContainerList)
-
-
if (numWorkers > 0) {
logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.")
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index ca2f1e2565..2ba2366ead 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -18,13 +18,10 @@
package org.apache.spark.deploy.yarn
import org.apache.spark.deploy.SparkHadoopUtil
-import collection.mutable.HashMap
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.yarn.api.ApplicationConstants.Environment
-import java.security.PrivilegedExceptionAction
/**
* Contains util methods to interact with Hadoop from spark.
@@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
// add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster
override def addCredentials(conf: JobConf) {
- val jobCreds = conf.getCredentials();
+ val jobCreds = conf.getCredentials()
jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials())
}
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
index c0a2af0c6f..2941356bc5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.deploy.yarn
-import java.net.URI;
+import java.net.URI
import org.scalatest.FunSuite
import org.scalatest.mock.MockitoSugar