diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 11:55:48 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-11-21 11:55:48 +0530 |
commit | 199e9cf02dfaa372c1f067bca54556e1f6ce787d (patch) | |
tree | 7ea8ac8abb51f3e3cb918b7ba147cb4d909c5f99 /yarn | |
parent | 6860b79f6e4cc0d38b08848f19127c259d9b5069 (diff) | |
parent | f6b2e590b1ef35611f68c3ff7eb5c632d31a0dcc (diff) | |
download | spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.gz spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.tar.bz2 spark-199e9cf02dfaa372c1f067bca54556e1f6ce787d.zip |
Merge branch 'scala210-master' of github.com:colorant/incubator-spark into scala-2.10
Conflicts:
core/src/main/scala/org/apache/spark/deploy/client/Client.scala
core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Diffstat (limited to 'yarn')
8 files changed, 812 insertions, 111 deletions
diff --git a/yarn/pom.xml b/yarn/pom.xml index 7770cbb0cc..12bc97da8a 100644 --- a/yarn/pom.xml +++ b/yarn/pom.xml @@ -61,6 +61,16 @@ <groupId>org.apache.avro</groupId> <artifactId>avro-ipc</artifactId> </dependency> + <dependency> + <groupId>org.scalatest</groupId> + <artifactId>scalatest_2.9.3</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> @@ -106,6 +116,46 @@ </execution> </executions> </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-antrun-plugin</artifactId> + <executions> + <execution> + <phase>test</phase> + <goals> + <goal>run</goal> + </goals> + <configuration> + <exportAntProperties>true</exportAntProperties> + <tasks> + <property name="spark.classpath" refid="maven.test.classpath" /> + <property environment="env" /> + <fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry."> + <condition> + <not> + <or> + <isset property="env.SCALA_HOME" /> + <isset property="env.SCALA_LIBRARY_PATH" /> + </or> + </not> + </condition> + </fail> + </tasks> + </configuration> + </execution> + </executions> + </plugin> + <plugin> + <groupId>org.scalatest</groupId> + <artifactId>scalatest-maven-plugin</artifactId> + <configuration> + <environmentVariables> + <SPARK_HOME>${basedir}/..</SPARK_HOME> + <SPARK_TESTING>1</SPARK_TESTING> + <SPARK_CLASSPATH>${spark.classpath}</SPARK_CLASSPATH> + </environmentVariables> + </configuration> + </plugin> </plugins> </build> </project> diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 858b58d338..4302ef4cda 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,22 +17,25 @@ package org.apache.spark.deploy.yarn +import java.io.IOException; import java.net.Socket +import java.security.PrivilegedExceptionAction import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils import org.apache.hadoop.security.UserGroupInformation -import java.security.PrivilegedExceptionAction +import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var appAttemptId: ApplicationAttemptId = null private var userThread: Thread = null private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + private val fs = FileSystem.get(yarnConf) private var yarnAllocator: YarnAllocationHandler = null private var isFinished:Boolean = false private var uiAddress: String = "" + private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) + private var isLastAMRetry: Boolean = true def run() { // setup the directories so things go to yarn approved directories rather // then user specified and /tmp System.setProperty("spark.local.dir", getLocalDirs()) + + // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // It need shutdown hook to set SUCCEEDED successed = true } finally { + logDebug("finishing main") + isLastAMRetry = true; if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { @@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - - private def allocateWorkers() { try { logInfo("Allocating " + args.numWorkers + " workers.") @@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.finishApplicationMaster(finishReq) } + + /** + * clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_STAGING_DIR")) + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case e: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, e) + } + } + + // The shutdown hook that runs when a signal is received AND during normal + // close of the JVM. + class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } } @@ -368,6 +413,8 @@ object ApplicationMaster { // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit // Should not really have to do this, but it helps yarn to evict resources earlier. // not to mention, prevent Client declaring failure even though we exit'ed properly. + // Note that this will unfortunately not properly clean up the staging files because it gets called to + // late and the filesystem is already shutdown. if (modified) { Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run' diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 076dd3c9b0..4e0e060ddc 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,26 +17,31 @@ package org.apache.spark.deploy.yarn -import java.net.{InetSocketAddress, URI} +import java.net.{InetAddress, InetSocketAddress, UnknownHostException, URI} import java.nio.ByteBuffer + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} +import org.apache.hadoop.fs.{FileContext, FileStatus, FileSystem, Path, FileUtil} +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.mapred.Master import org.apache.hadoop.net.NetUtils import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.security.UserGroupInformation import org.apache.hadoop.yarn.api._ +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.client.YarnClientImpl import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC +import org.apache.hadoop.yarn.util.{Apps, Records} + import scala.collection.mutable.HashMap +import scala.collection.mutable.Map import scala.collection.JavaConversions._ + import org.apache.spark.Logging import org.apache.spark.util.Utils -import org.apache.hadoop.yarn.util.{Apps, Records, ConverterUtils} -import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.spark.deploy.SparkHadoopUtil class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl with Logging { @@ -45,8 +50,15 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - val credentials = UserGroupInformation.getCurrentUser().getCredentials(); - + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + private val SPARK_STAGING: String = ".sparkStaging" + private val distCacheMgr = new ClientDistributedCacheManager() + + // staging directory is private! -> rwx-------- + val STAGING_DIR_PERMISSION: FsPermission = FsPermission.createImmutable(0700:Short) + // app files are world-wide readable and owner writable -> rw-r--r-- + val APP_FILE_PERMISSION: FsPermission = FsPermission.createImmutable(0644:Short) + def run() { init(yarnConf) start() @@ -57,8 +69,9 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl verifyClusterResources(newApp) val appContext = createApplicationSubmissionContext(appId) - val localResources = prepareLocalResources(appId, "spark") - val env = setupLaunchEnv(localResources) + val appStagingDir = getAppStagingDir(appId) + val localResources = prepareLocalResources(appStagingDir) + val env = setupLaunchEnv(localResources, appStagingDir) val amContainer = createContainerLaunchContext(newApp, localResources, env) appContext.setQueue(args.amQueue) @@ -70,7 +83,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl monitorApplication(appId) System.exit(0) } - + + def getAppStagingDir(appId: ApplicationId): String = { + SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR + } def logClusterResourceDetails() { val clusterMetrics: YarnClusterMetrics = super.getYarnClusterMetrics @@ -109,11 +125,74 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setApplicationName(args.appName) return appContext } - - def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = { + + /* + * see if two file systems are the same or not. + */ + private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { + val srcUri = srcFs.getUri() + val dstUri = destFs.getUri() + if (srcUri.getScheme() == null) { + return false + } + if (!srcUri.getScheme().equals(dstUri.getScheme())) { + return false + } + var srcHost = srcUri.getHost() + var dstHost = dstUri.getHost() + if ((srcHost != null) && (dstHost != null)) { + try { + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName(); + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName(); + } catch { + case e: UnknownHostException => + return false + } + if (!srcHost.equals(dstHost)) { + return false + } + } else if (srcHost == null && dstHost != null) { + return false + } else if (srcHost != null && dstHost == null) { + return false + } + //check for ports + if (srcUri.getPort() != dstUri.getPort()) { + return false + } + return true; + } + + /** + * Copy the file into HDFS if needed. + */ + private def copyRemoteFile( + dstDir: Path, + originalPath: Path, + replication: Short, + setPerms: Boolean = false): Path = { + val fs = FileSystem.get(conf) + val remoteFs = originalPath.getFileSystem(conf); + var newPath = originalPath + if (! compareFs(remoteFs, fs)) { + newPath = new Path(dstDir, originalPath.getName()) + logInfo("Uploading " + originalPath + " to " + newPath) + FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf); + fs.setReplication(newPath, replication); + if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) + } + // resolve any symlinks in the URI path so using a "current" symlink + // to point to a specific version shows the specific version + // in the distributed cache configuration + val qualPath = fs.makeQualified(newPath) + val fc = FileContext.getFileContext(qualPath.toUri(), conf) + val destPath = fc.resolvePath(qualPath) + destPath + } + + def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() - // Upload Spark and the application JAR to the remote file system + // Upload Spark and the application JAR to the remote file system if necessary // Add them as local resources to the AM val fs = FileSystem.get(conf) @@ -124,67 +203,95 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl System.exit(1) } } + val dst = new Path(fs.getHomeDirectory(), appStagingDir) + val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort + + if (UserGroupInformation.isSecurityEnabled()) { + val dstFs = dst.getFileSystem(conf) + dstFs.addDelegationTokens(delegTokenRenewer, credentials); + } + val localResources = HashMap[String, LocalResource]() + FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) + + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() - Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) + if (System.getenv("SPARK_JAR") == null || args.userJar == null) { + logError("Error: You must set SPARK_JAR environment variable and specify a user jar!") + System.exit(1) + } + + Map(Client.SPARK_JAR -> System.getenv("SPARK_JAR"), Client.APP_JAR -> args.userJar, + Client.LOG4J_PROP -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { - val src = new Path(localPath) - val pathSuffix = appName + "/" + appId.getId() + destName - val dst = new Path(fs.getHomeDirectory(), pathSuffix) - logInfo("Uploading " + src + " to " + dst) - fs.copyFromLocalFile(false, true, src, dst) - val destStatus = fs.getFileStatus(dst) - - // get tokens for anything we upload to hdfs - if (UserGroupInformation.isSecurityEnabled()) { - fs.addDelegationTokens(delegTokenRenewer, credentials); + var localURI = new URI(localPath) + // if not specified assume these are in the local filesystem to keep behavior like Hadoop + if (localURI.getScheme() == null) { + localURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(localPath)).toString()) } + val setPermissions = if (destName.equals(Client.APP_JAR)) true else false + val destPath = copyRemoteFile(dst, new Path(localURI), replication, setPermissions) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + destName, statCache) + } + } + + // handle any add jars + if ((args.addJars != null) && (!args.addJars.isEmpty())){ + args.addJars.split(',').foreach { case file: String => + val localURI = new URI(file.trim()) + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + linkname, statCache, true) + } + } + + // handle any distributed cache files + if ((args.files != null) && (!args.files.isEmpty())){ + args.files.split(',').foreach { case file: String => + val localURI = new URI(file.trim()) + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, + linkname, statCache) + } + } - val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - amJarRsrc.setType(LocalResourceType.FILE) - amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)) - amJarRsrc.setTimestamp(destStatus.getModificationTime()) - amJarRsrc.setSize(destStatus.getLen()) - locaResources(destName) = amJarRsrc + // handle any distributed cache archives + if ((args.archives != null) && (!args.archives.isEmpty())) { + args.archives.split(',').foreach { case file:String => + val localURI = new URI(file.trim()) + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyRemoteFile(dst, localPath, replication) + distCacheMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, + linkname, statCache) } } + UserGroupInformation.getCurrentUser().addCredentials(credentials); - return locaResources + return localResources } - def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = { + def setupLaunchEnv( + localResources: HashMap[String, LocalResource], + stagingDir: String): HashMap[String, String] = { logInfo("Setting up the launch environment") - val log4jConfLocalRes = localResources.getOrElse("log4j.properties", null) + val log4jConfLocalRes = localResources.getOrElse(Client.LOG4J_PROP, null) val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" - env("SPARK_YARN_JAR_PATH") = - localResources("spark.jar").getResource().getScheme.toString() + "://" + - localResources("spark.jar").getResource().getFile().toString() - env("SPARK_YARN_JAR_TIMESTAMP") = localResources("spark.jar").getTimestamp().toString() - env("SPARK_YARN_JAR_SIZE") = localResources("spark.jar").getSize().toString() - - env("SPARK_YARN_USERJAR_PATH") = - localResources("app.jar").getResource().getScheme.toString() + "://" + - localResources("app.jar").getResource().getFile().toString() - env("SPARK_YARN_USERJAR_TIMESTAMP") = localResources("app.jar").getTimestamp().toString() - env("SPARK_YARN_USERJAR_SIZE") = localResources("app.jar").getSize().toString() - - if (log4jConfLocalRes != null) { - env("SPARK_YARN_LOG4J_PATH") = - log4jConfLocalRes.getResource().getScheme.toString() + "://" + log4jConfLocalRes.getResource().getFile().toString() - env("SPARK_YARN_LOG4J_TIMESTAMP") = log4jConfLocalRes.getTimestamp().toString() - env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() - } + env("SPARK_YARN_STAGING_DIR") = stagingDir + + // set the environment variables to be passed on to the Workers + distCacheMgr.setDistFilesEnv(env) + distCacheMgr.setDistArchivesEnv(env) // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) @@ -253,6 +360,11 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl javaCommand = Environment.JAVA_HOME.$() + "/bin/java" } + if (args.userClass == null) { + logError("Error: You must specify a user class!") + System.exit(1) + } + val commands = List[String](javaCommand + " -server " + JAVA_OPTS + @@ -320,6 +432,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } object Client { + val SPARK_JAR: String = "spark.jar" + val APP_JAR: String = "app.jar" + val LOG4J_PROP: String = "log4j.properties" + def main(argStrings: Array[String]) { // Set an env variable indicating we are running in YARN mode. // Note that anything with SPARK prefix gets propagated to all (remote) processes @@ -335,4 +451,30 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } + + def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + // If log4j present, ensure ours overrides all others + if (addLog4j) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + LOG4J_PROP) + } + // normally the users app.jar is last in case conflicts with spark jars + val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") + .toBoolean + if (userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + APP_JAR) + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + SPARK_JAR) + Client.populateHadoopClasspath(conf, env) + + if (!userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + APP_JAR) + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "*") + } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index c56dbd99ba..852dbd7dab 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,9 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var addJars: String = null + var files: String = null + var archives: String = null var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() @@ -81,6 +84,17 @@ class ClientArguments(val args: Array[String]) { case ("--name") :: value :: tail => appName = value + + case ("--addJars") :: value :: tail => + addJars = value + args = tail + + case ("--files") :: value :: tail => + files = value + args = tail + + case ("--archives") :: value :: tail => + archives = value args = tail case Nil => @@ -97,7 +111,7 @@ class ClientArguments(val args: Array[String]) { inputFormatInfo = inputFormatMap.values.toList } - + def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { if (unknownParam != null) { System.err.println("Unknown/unsupported param " + unknownParam) @@ -113,10 +127,13 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --name NAME The name of your application (Default: Spark)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" + " --name NAME The name of your application (Default: Spark)\n" + + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + + " --files files Comma separated list of files to be distributed with the job.\n" + + " --archives archives Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } - + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala new file mode 100644 index 0000000000..07686fefd7 --- /dev/null +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI; + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.yarn.api.records.LocalResource +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility +import org.apache.hadoop.yarn.api.records.LocalResourceType +import org.apache.hadoop.yarn.util.{Records, ConverterUtils} + +import org.apache.spark.Logging + +import scala.collection.mutable.HashMap +import scala.collection.mutable.LinkedHashMap +import scala.collection.mutable.Map + + +/** Client side methods to setup the Hadoop distributed cache */ +class ClientDistributedCacheManager() extends Logging { + private val distCacheFiles: Map[String, Tuple3[String, String, String]] = + LinkedHashMap[String, Tuple3[String, String, String]]() + private val distCacheArchives: Map[String, Tuple3[String, String, String]] = + LinkedHashMap[String, Tuple3[String, String, String]]() + + + /** + * Add a resource to the list of distributed cache resources. This list can + * be sent to the ApplicationMaster and possibly the workers so that it can + * be downloaded into the Hadoop distributed cache for use by this application. + * Adds the LocalResource to the localResources HashMap passed in and saves + * the stats of the resources to they can be sent to the workers and verified. + * + * @param fs FileSystem + * @param conf Configuration + * @param destPath path to the resource + * @param localResources localResource hashMap to insert the resource into + * @param resourceType LocalResourceType + * @param link link presented in the distributed cache to the destination + * @param statCache cache to store the file/directory stats + * @param appMasterOnly Whether to only add the resource to the app master + */ + def addResource( + fs: FileSystem, + conf: Configuration, + destPath: Path, + localResources: HashMap[String, LocalResource], + resourceType: LocalResourceType, + link: String, + statCache: Map[URI, FileStatus], + appMasterOnly: Boolean = false) = { + val destStatus = fs.getFileStatus(destPath) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(resourceType) + val visibility = getVisibility(conf, destPath.toUri(), statCache) + amJarRsrc.setVisibility(visibility) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(destPath)) + amJarRsrc.setTimestamp(destStatus.getModificationTime()) + amJarRsrc.setSize(destStatus.getLen()) + if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name") + localResources(link) = amJarRsrc + + if (appMasterOnly == false) { + val uri = destPath.toUri() + val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) + if (resourceType == LocalResourceType.FILE) { + distCacheFiles(pathURI.toString()) = (destStatus.getLen().toString(), + destStatus.getModificationTime().toString(), visibility.name()) + } else { + distCacheArchives(pathURI.toString()) = (destStatus.getLen().toString(), + destStatus.getModificationTime().toString(), visibility.name()) + } + } + } + + /** + * Adds the necessary cache file env variables to the env passed in + * @param env + */ + def setDistFilesEnv(env: Map[String, String]) = { + val (keys, tupleValues) = distCacheFiles.unzip + val (sizes, timeStamps, visibilities) = tupleValues.unzip3 + + if (keys.size > 0) { + env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = + timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = + sizes.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_FILES_VISIBILITIES") = + visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } + } + } + + /** + * Adds the necessary cache archive env variables to the env passed in + * @param env + */ + def setDistArchivesEnv(env: Map[String, String]) = { + val (keys, tupleValues) = distCacheArchives.unzip + val (sizes, timeStamps, visibilities) = tupleValues.unzip3 + + if (keys.size > 0) { + env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = + timeStamps.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = + sizes.reduceLeft[String] { (acc,n) => acc + "," + n } + env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") = + visibilities.reduceLeft[String] { (acc,n) => acc + "," + n } + } + } + + /** + * Returns the local resource visibility depending on the cache file permissions + * @param conf + * @param uri + * @param statCache + * @return LocalResourceVisibility + */ + def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): + LocalResourceVisibility = { + if (isPublic(conf, uri, statCache)) { + return LocalResourceVisibility.PUBLIC + } + return LocalResourceVisibility.PRIVATE + } + + /** + * Returns a boolean to denote whether a cache file is visible to all(public) + * or not + * @param conf + * @param uri + * @param statCache + * @return true if the path in the uri is visible to all, false otherwise + */ + def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { + val fs = FileSystem.get(uri, conf) + val current = new Path(uri.getPath()) + //the leaf level file should be readable by others + if (!checkPermissionOfOther(fs, current, FsAction.READ, statCache)) { + return false + } + return ancestorsHaveExecutePermissions(fs, current.getParent(), statCache) + } + + /** + * Returns true if all ancestors of the specified path have the 'execute' + * permission set for all users (i.e. that other users can traverse + * the directory heirarchy to the given path) + * @param fs + * @param path + * @param statCache + * @return true if all ancestors have the 'execute' permission set for all users + */ + def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, + statCache: Map[URI, FileStatus]): Boolean = { + var current = path + while (current != null) { + //the subdirs in the path should have execute permissions for others + if (!checkPermissionOfOther(fs, current, FsAction.EXECUTE, statCache)) { + return false + } + current = current.getParent() + } + return true + } + + /** + * Checks for a given path whether the Other permissions on it + * imply the permission in the passed FsAction + * @param fs + * @param path + * @param action + * @param statCache + * @return true if the path in the uri is visible to all, false otherwise + */ + def checkPermissionOfOther(fs: FileSystem, path: Path, + action: FsAction, statCache: Map[URI, FileStatus]): Boolean = { + val status = getFileStatus(fs, path.toUri(), statCache); + val perms = status.getPermission() + val otherAction = perms.getOtherAction() + if (otherAction.implies(action)) { + return true; + } + return false + } + + /** + * Checks to see if the given uri exists in the cache, if it does it + * returns the existing FileStatus, otherwise it stats the uri, stores + * it in the cache, and returns the FileStatus. + * @param fs + * @param uri + * @param statCache + * @return FileStatus + */ + def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = { + val stat = statCache.get(uri) match { + case Some(existstat) => existstat + case None => + val newStat = fs.getFileStatus(new Path(uri)) + statCache.put(uri, newStat) + newStat + } + return stat + } +} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index a60e8a3007..7a66532254 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? " -XX:OnOutOfMemoryError='kill %p' " + JAVA_OPTS + - " org.apache.spark.executor.StandaloneExecutorBackend " + + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + masterAddress + " " + slaveId + " " + hostname + " " + @@ -137,61 +137,58 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S startReq.setContainerLaunchContext(ctx) cm.startContainer(startReq) } + + private def setupDistributedCache(file: String, + rtype: LocalResourceType, + localResources: HashMap[String, LocalResource], + timestamp: String, + size: String, + vis: String) = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + localResources(uri.getFragment()) = amJarRsrc + } def prepareLocalResources: HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() + val localResources = HashMap[String, LocalResource]() - // Spark JAR - val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - sparkJarResource.setType(LocalResourceType.FILE) - sparkJarResource.setVisibility(LocalResourceVisibility.APPLICATION) - sparkJarResource.setResource(ConverterUtils.getYarnUrlFromURI( - new URI(System.getenv("SPARK_YARN_JAR_PATH")))) - sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong) - sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong) - locaResources("spark.jar") = sparkJarResource - // User JAR - val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - userJarResource.setType(LocalResourceType.FILE) - userJarResource.setVisibility(LocalResourceVisibility.APPLICATION) - userJarResource.setResource(ConverterUtils.getYarnUrlFromURI( - new URI(System.getenv("SPARK_YARN_USERJAR_PATH")))) - userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong) - userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong) - locaResources("app.jar") = userJarResource - - // Log4j conf - if available - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - val log4jConfResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] - log4jConfResource.setType(LocalResourceType.FILE) - log4jConfResource.setVisibility(LocalResourceVisibility.APPLICATION) - log4jConfResource.setResource(ConverterUtils.getYarnUrlFromURI( - new URI(System.getenv("SPARK_YARN_LOG4J_PATH")))) - log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong) - log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong) - locaResources("log4j.properties") = log4jConfResource + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') + val visibilities = System.getenv("SPARK_YARN_CACHE_FILES_VISIBILITIES").split(',') + for( i <- 0 to distFiles.length - 1) { + setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), + fileSizes(i), visibilities(i)) + } } + if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') + val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') + val visibilities = System.getenv("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES").split(',') + for( i <- 0 to distArchives.length - 1) { + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + timeStamps(i), fileSizes(i), visibilities(i)) + } + } - logInfo("Prepared Local resources " + locaResources) - return locaResources + logInfo("Prepared Local resources " + localResources) + return localResources } def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - // Which is correct ? - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - } - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index d222f412a0..4beb5229fe 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -22,7 +22,7 @@ import org.apache.spark.util.Utils import org.apache.spark.scheduler.SplitInfo import scala.collection import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container} -import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} import org.apache.hadoop.yarn.util.{RackResolver, Records} import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} @@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but .. diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala new file mode 100644 index 0000000000..c0a2af0c6f --- /dev/null +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManagerSuite.scala @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import java.net.URI; + +import org.scalatest.FunSuite +import org.scalatest.mock.MockitoSugar +import org.mockito.Mockito.when + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.FileStatus +import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.permission.FsAction +import org.apache.hadoop.yarn.api.records.LocalResource +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility +import org.apache.hadoop.yarn.api.records.LocalResourceType +import org.apache.hadoop.yarn.util.{Records, ConverterUtils} + +import scala.collection.mutable.HashMap +import scala.collection.mutable.Map + + +class ClientDistributedCacheManagerSuite extends FunSuite with MockitoSugar { + + class MockClientDistributedCacheManager extends ClientDistributedCacheManager { + override def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): + LocalResourceVisibility = { + return LocalResourceVisibility.PRIVATE + } + } + + test("test getFileStatus empty") { + val distMgr = new ClientDistributedCacheManager() + val fs = mock[FileSystem] + val uri = new URI("/tmp/testing") + when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val stat = distMgr.getFileStatus(fs, uri, statCache) + assert(stat.getPath() === null) + } + + test("test getFileStatus cached") { + val distMgr = new ClientDistributedCacheManager() + val fs = mock[FileSystem] + val uri = new URI("/tmp/testing") + val realFileStatus = new FileStatus(10, false, 1, 1024, 10, 10, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(new Path(uri))).thenReturn(new FileStatus()) + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus](uri -> realFileStatus) + val stat = distMgr.getFileStatus(fs, uri, statCache) + assert(stat.getPath().toString() === "/tmp/testing") + } + + test("test addResource") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, "link", + statCache, false) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 0) + assert(resource.getSize() === 0) + assert(resource.getType() === LocalResourceType.FILE) + + val env = new HashMap[String, String]() + distMgr.setDistFilesEnv(env) + assert(env("SPARK_YARN_CACHE_FILES") === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === "0") + assert(env("SPARK_YARN_CACHE_FILES_FILE_SIZES") === "0") + assert(env("SPARK_YARN_CACHE_FILES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) + + distMgr.setDistArchivesEnv(env) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + + //add another one and verify both there and order correct + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing2")) + val destPath2 = new Path("file:///foo.invalid.com:8080/tmp/testing2") + when(fs.getFileStatus(destPath2)).thenReturn(realFileStatus) + distMgr.addResource(fs, conf, destPath2, localResources, LocalResourceType.FILE, "link2", + statCache, false) + val resource2 = localResources("link2") + assert(resource2.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource2.getResource()) === destPath2) + assert(resource2.getTimestamp() === 10) + assert(resource2.getSize() === 20) + assert(resource2.getType() === LocalResourceType.FILE) + + val env2 = new HashMap[String, String]() + distMgr.setDistFilesEnv(env2) + val timestamps = env2("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val files = env2("SPARK_YARN_CACHE_FILES").split(',') + val sizes = env2("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val visibilities = env2("SPARK_YARN_CACHE_FILES_VISIBILITIES") .split(',') + assert(files(0) === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(timestamps(0) === "0") + assert(sizes(0) === "0") + assert(visibilities(0) === LocalResourceVisibility.PRIVATE.name()) + + assert(files(1) === "file:/foo.invalid.com:8080/tmp/testing2#link2") + assert(timestamps(1) === "10") + assert(sizes(1) === "20") + assert(visibilities(1) === LocalResourceVisibility.PRIVATE.name()) + } + + test("test addResource link null") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + when(fs.getFileStatus(destPath)).thenReturn(new FileStatus()) + + intercept[Exception] { + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.FILE, null, + statCache, false) + } + assert(localResources.get("link") === None) + assert(localResources.size === 0) + } + + test("test addResource appmaster only") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + statCache, true) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 10) + assert(resource.getSize() === 20) + assert(resource.getType() === LocalResourceType.ARCHIVE) + + val env = new HashMap[String, String]() + distMgr.setDistFilesEnv(env) + assert(env.get("SPARK_YARN_CACHE_FILES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + + distMgr.setDistArchivesEnv(env) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === None) + } + + test("test addResource archive") { + val distMgr = new MockClientDistributedCacheManager() + val fs = mock[FileSystem] + val conf = new Configuration() + val destPath = new Path("file:///foo.invalid.com:8080/tmp/testing") + val localResources = HashMap[String, LocalResource]() + val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() + val realFileStatus = new FileStatus(20, false, 1, 1024, 10, 30, null, "testOwner", + null, new Path("/tmp/testing")) + when(fs.getFileStatus(destPath)).thenReturn(realFileStatus) + + distMgr.addResource(fs, conf, destPath, localResources, LocalResourceType.ARCHIVE, "link", + statCache, false) + val resource = localResources("link") + assert(resource.getVisibility() === LocalResourceVisibility.PRIVATE) + assert(ConverterUtils.getPathFromYarnURL(resource.getResource()) === destPath) + assert(resource.getTimestamp() === 10) + assert(resource.getSize() === 20) + assert(resource.getType() === LocalResourceType.ARCHIVE) + + val env = new HashMap[String, String]() + + distMgr.setDistArchivesEnv(env) + assert(env("SPARK_YARN_CACHE_ARCHIVES") === "file:/foo.invalid.com:8080/tmp/testing#link") + assert(env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") === "10") + assert(env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") === "20") + assert(env("SPARK_YARN_CACHE_ARCHIVES_VISIBILITIES") === LocalResourceVisibility.PRIVATE.name()) + + distMgr.setDistFilesEnv(env) + assert(env.get("SPARK_YARN_CACHE_FILES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_TIME_STAMPS") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_FILE_SIZES") === None) + assert(env.get("SPARK_YARN_CACHE_FILES_VISIBILITIES") === None) + } + + +} |