diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 12:34:46 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 12:34:46 +0530 |
commit | 95d8dbce91f49467050250d5cf3671aaaa648d76 (patch) | |
tree | 06e2583c63cdf39d6d15d36a3189c2e6db0148ba /yarn | |
parent | 199e9cf02dfaa372c1f067bca54556e1f6ce787d (diff) | |
parent | 2fead510f74b962b293de4d724136c24a9825271 (diff) | |
download | spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.gz spark-95d8dbce91f49467050250d5cf3671aaaa648d76.tar.bz2 spark-95d8dbce91f49467050250d5cf3671aaaa648d76.zip |
Merge branch 'master' of github.com:apache/incubator-spark into scala-2.10-temp
Conflicts:
core/src/main/scala/org/apache/spark/util/collection/PrimitiveVector.scala
streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
Diffstat (limited to 'yarn')
7 files changed, 90 insertions, 68 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 4302ef4cda..a7baf0c36c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,11 +17,11 @@ package org.apache.spark.deploy.yarn -import java.io.IOException; +import java.io.IOException import java.net.Socket -import java.security.PrivilegedExceptionAction import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils @@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils -import org.apache.hadoop.security.UserGroupInformation + import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -54,7 +54,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) private var isLastAMRetry: Boolean = true - + // default to numWorkers * 2, with minimum of 3 + private val maxNumWorkerFailures = System.getProperty("spark.yarn.max.worker.failures", + math.max(args.numWorkers * 2, 3).toString()).toInt def run() { // setup the directories so things go to yarn approved directories rather @@ -65,7 +67,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() - isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -186,8 +188,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e var successed = false try { // Copy - var mainArgs: Array[String] = new Array[String](args.userArgs.size()) - args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size()) + var mainArgs: Array[String] = new Array[String](args.userArgs.size) + args.userArgs.copyToArray(mainArgs, 0, args.userArgs.size) mainMethod.invoke(null, mainArgs) // some job script has "System.exit(0)" at the end, for example SparkPi, SparkLR // userThread will stop here unless it has uncaught exception thrown out @@ -195,7 +197,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e successed = true } finally { logDebug("finishing main") - isLastAMRetry = true; + isLastAMRetry = true if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { @@ -227,12 +229,13 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e if (null != sparkContext) { uiAddress = sparkContext.ui.appUIAddress - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args, - sparkContext.preferredNodeLocationData) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args, sparkContext.preferredNodeLocationData) } else { logWarning("Unable to retrieve sparkContext inspite of waiting for " + count * waitTime + - ", numTries = " + numTries) - this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, appAttemptId, args) + ", numTries = " + numTries) + this.yarnAllocator = YarnAllocationHandler.newAllocator(yarnConf, resourceManager, + appAttemptId, args) } } } finally { @@ -251,8 +254,11 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e while(yarnAllocator.getNumWorkersRunning < args.numWorkers && // If user thread exists, then quit ! userThread.isAlive) { - - this.yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } + yarnAllocator.allocateContainers(math.max(args.numWorkers - yarnAllocator.getNumWorkersRunning, 0)) ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(100) } @@ -268,21 +274,27 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // ensure that progress is sent before YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS elapse. val timeoutInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000) - // must be <= timeoutInterval/ 2. - // On other hand, also ensure that we are reasonably responsive without causing too many requests to RM. - // so atleast 1 minute or timeoutInterval / 10 - whichever is higher. - val interval = math.min(timeoutInterval / 2, math.max(timeoutInterval/ 10, 60000L)) + + // we want to be reasonably responsive without causing too many requests to RM. + val schedulerInterval = + System.getProperty("spark.yarn.scheduler.heartbeat.interval-ms", "5000").toLong + + // must be <= timeoutInterval / 2. + val interval = math.min(timeoutInterval / 2, schedulerInterval) launchReporterThread(interval) } } - // TODO: We might want to extend this to allocate more containers in case they die ! private def launchReporterThread(_sleepTime: Long): Thread = { val sleepTime = if (_sleepTime <= 0 ) 0 else _sleepTime val t = new Thread { override def run() { while (userThread.isAlive) { + if (yarnAllocator.getNumWorkersFailed >= maxNumWorkerFailures) { + finishApplicationMaster(FinalApplicationStatus.FAILED, + "max number of worker failures reached") + } val missingWorkerCount = args.numWorkers - yarnAllocator.getNumWorkersRunning if (missingWorkerCount > 0) { logInfo("Allocating " + missingWorkerCount + " containers to make up for (potentially ?) lost containers") @@ -321,7 +333,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } */ - def finishApplicationMaster(status: FinalApplicationStatus) { + def finishApplicationMaster(status: FinalApplicationStatus, diagnostics: String = "") { synchronized { if (isFinished) { @@ -335,6 +347,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e .asInstanceOf[FinishApplicationMasterRequest] finishReq.setAppAttemptId(appAttemptId) finishReq.setFinishApplicationStatus(status) + finishReq.setDiagnostics(diagnostics) // set tracking url to empty since we don't have a history server finishReq.setTrackingUrl("") resourceManager.finishApplicationMaster(finishReq) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4e0e060ddc..94e353af2e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,14 +17,13 @@ package org.apache.spark.deploy.yarn -import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI} +import java.net.{InetAddress, UnknownHostException, URI} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} -import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.fs.permission.FsPermission import org.apache.hadoop.mapred.Master -import org.apache.hadoop.net.NetUtils import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ @@ -40,9 +39,7 @@ import scala.collection.mutable.HashMap import scala.collection.mutable.Map import scala.collection.JavaConversions._ -import org.apache.spark.Logging -import org.apache.spark.util.Utils -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.Logging class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { @@ -60,6 +57,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) def run() { + validateArgs() + init(yarnConf) start() logClusterResourceDetails() @@ -84,6 +83,23 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(0) } + def validateArgs() = { + Map((System.getenv("SPARK_JAR") == null) -> "Error: You must set SPARK_JAR environment variable!", + (args.userJar == null) -> "Error: You must specify a user jar!", + (args.userClass == null) -> "Error: You must specify a user class!", + (args.numWorkers <= 0) -> "Error: You must specify atleast 1 worker!", + (args.amMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: AM memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD), + (args.workerMemory <= YarnAllocationHandler.MEMORY_OVERHEAD) -> + ("Error: Worker memory size must be greater then: " + YarnAllocationHandler.MEMORY_OVERHEAD.toString())) + .foreach { case(cond, errStr) => + if (cond) { + logError(errStr) + args.printUsageAndExit(1) + } + } + } + def getAppStagingDir(appId: ApplicationId): String = { SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR } @@ -97,7 +113,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl ", queueMaxCapacity=" + queueInfo.getMaximumCapacity + ", queueApplicationCount=" + queueInfo.getApplications.size + ", queueChildQueueCount=" + queueInfo.getChildQueues.size) } - def verifyClusterResources(app: GetNewApplicationResponse) = { val maxMem = app.getMaximumResourceCapability().getMemory() @@ -105,7 +120,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // if we have requested more then the clusters max for a single resource then exit. if (args.workerMemory > maxMem) { - logError("the worker size is to large to run on this cluster " + args.workerMemory); + logError("the worker size is to large to run on this cluster " + args.workerMemory) System.exit(1) } val amMem = args.amMemory + YarnAllocationHandler.MEMORY_OVERHEAD @@ -142,8 +157,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var dstHost = dstUri.getHost() if ((srcHost != null) && (dstHost != null)) { try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() } catch { case e: UnknownHostException => return false @@ -160,7 +175,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (srcUri.getPort() != dstUri.getPort()) { return false } - return true; + return true } /** @@ -172,13 +187,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl replication: Short, setPerms: Boolean = false): Path = { val fs = FileSystem.get(conf) - val remoteFs = originalPath.getFileSystem(conf); + val remoteFs = originalPath.getFileSystem(conf) var newPath = originalPath if (! compareFs(remoteFs, fs)) { newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) - FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf); - fs.setReplication(newPath, replication); + FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) + fs.setReplication(newPath, replication) if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) } // resolve any symlinks in the URI path so using a "current" symlink @@ -196,7 +211,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add them as local resources to the AM val fs = FileSystem.get(conf) - val delegTokenRenewer = Master.getMasterPrincipal(conf); + val delegTokenRenewer = Master.getMasterPrincipal(conf) if (UserGroupInformation.isSecurityEnabled()) { if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { logError("Can't get Master Kerberos principal for use as renewer") @@ -208,18 +223,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl if (UserGroupInformation.isSecurityEnabled()) { val dstFs = dst.getFileSystem(conf) - dstFs.addDelegationTokens(delegTokenRenewer, credentials); + dstFs.addDelegationTokens(delegTokenRenewer, credentials) } val localResources = HashMap[String, LocalResource]() FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - if (System.getenv("SPARK_JAR") == null || args.userJar == null) { - logError("Error: You must set SPARK_JAR environment variable and specify a user jar!") - System.exit(1) - } - Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => @@ -273,7 +283,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } - UserGroupInformation.getCurrentUser().addCredentials(credentials); + UserGroupInformation.getCurrentUser().addCredentials(credentials) return localResources } @@ -334,7 +344,6 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " - // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same // node, spark gc effects all other containers performance (which can also be other spark containers) @@ -354,17 +363,12 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } // Command for the ApplicationMaster - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } - if (args.userClass == null) { - logError("Error: You must specify a user class!") - System.exit(1) - } - val commands = List[String](javaCommand + " -server " + JAVA_OPTS + @@ -442,6 +446,7 @@ object Client { System.setProperty("SPARK_YARN_MODE", "true") val args = new ClientArguments(argStrings) + new Client(args).run } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 07686fefd7..5f159b073f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.net.URI; +import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileStatus @@ -197,11 +197,11 @@ class ClientDistributedCacheManager() extends Logging { */ def checkPermissionOfOther(fs: FileSystem, path: Path, action: FsAction, statCache: Map[URI, FileStatus]): Boolean = { - val status = getFileStatus(fs, path.toUri(), statCache); + val status = getFileStatus(fs, path.toUri(), statCache) val perms = status.getPermission() val otherAction = perms.getOtherAction() if (otherAction.implies(action)) { - return true; + return true } return false } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 7a66532254..a4d6e1d87d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -22,7 +22,7 @@ import java.nio.ByteBuffer import java.security.PrivilegedExceptionAction import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.Path import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.net.NetUtils import org.apache.hadoop.security.UserGroupInformation @@ -38,7 +38,6 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.HashMap import org.apache.spark.Logging -import org.apache.spark.util.Utils class WorkerRunnable(container: Container, conf: Configuration, masterAddress: String, slaveId: String, hostname: String, workerMemory: Int, workerCores: Int) @@ -108,7 +107,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S credentials.writeTokenStorageToStream(dob) ctx.setContainerTokens(ByteBuffer.wrap(dob.getData())) - var javaCommand = "java"; + var javaCommand = "java" val javaHome = System.getenv("JAVA_HOME") if ((javaHome != null && !javaHome.isEmpty()) || env.isDefinedAt("JAVA_HOME")) { javaCommand = Environment.JAVA_HOME.$() + "/bin/java" @@ -204,8 +203,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // use doAs and remoteUser here so we can add the container token and not // pollute the current users credentials with all of the individual container tokens - val user = UserGroupInformation.createRemoteUser(container.getId().toString()); - val containerToken = container.getContainerToken(); + val user = UserGroupInformation.createRemoteUser(container.getId().toString()) + val containerToken = container.getContainerToken() if (containerToken != null) { user.addToken(ProtoUtils.convertFromProtoFormat(containerToken, cmAddress)) } @@ -216,8 +215,8 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S return rpc.getProxy(classOf[ContainerManager], cmAddress, conf).asInstanceOf[ContainerManager] } - }); - return proxy; + }) + proxy } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 4beb5229fe..baa030b4a4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -72,9 +72,11 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM // Used to generate a unique id per worker private val workerIdCounter = new AtomicInteger() private val lastResponseId = new AtomicInteger() + private val numWorkersFailed = new AtomicInteger() def getNumWorkersRunning: Int = numWorkersRunning.intValue + def getNumWorkersFailed: Int = numWorkersFailed.intValue def isResourceConstraintSatisfied(container: Container): Boolean = { container.getResource.getMemory >= (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) @@ -253,8 +255,16 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM else { // simply decrement count - next iteration of ReporterThread will take care of allocating ! numWorkersRunning.decrementAndGet() - logInfo("Container completed ? nodeId: " + containerId + ", state " + completedContainer.getState + - " httpaddress: " + completedContainer.getDiagnostics) + logInfo("Container completed not by us ? nodeId: " + containerId + ", state " + completedContainer.getState + + " httpaddress: " + completedContainer.getDiagnostics + " exit status: " + completedContainer.getExitStatus()) + + // Hadoop 2.2.X added a ContainerExitStatus we should switch to use + // there are some exit status' we shouldn't necessarily count against us, but for + // now I think its ok as none of the containers are expected to exit + if (completedContainer.getExitStatus() != 0) { + logInfo("Container marked as failed: " + containerId) + numWorkersFailed.incrementAndGet() + } } allocatedHostToContainersMap.synchronized { @@ -378,8 +388,6 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val releasedContainerList = createReleasedContainerList() req.addAllReleases(releasedContainerList) - - if (numWorkers > 0) { logInfo("Allocating " + numWorkers + " worker containers with " + (workerMemory + YarnAllocationHandler.MEMORY_OVERHEAD) + " of memory each.") } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index ca2f1e2565..2ba2366ead 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -18,13 +18,10 @@ package org.apache.spark.deploy.yarn import org.apache.spark.deploy.SparkHadoopUtil -import collection.mutable.HashMap import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment -import java.security.PrivilegedExceptionAction /** * Contains util methods to interact with Hadoop from spark. @@ -40,7 +37,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // add any user credentials to the job conf which are necessary for running on a secure Hadoop cluster override def addCredentials(conf: JobConf) { - val jobCreds = conf.getCredentials(); + val jobCreds = conf.getCredentials() jobCreds.mergeAll(UserGroupInformation.getCurrentUser().getCredentials()) } } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala index c0a2af0c6f..2941356bc5 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy.yarn -import java.net.URI; +import java.net.URI import org.scalatest.FunSuite import org.scalatest.mock.MockitoSugar |