diff options
Diffstat (limited to 'yarn/common')
7 files changed, 613 insertions, 475 deletions
diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 40d8d6d6e6..201b742736 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -17,15 +17,14 @@ package org.apache.spark.deploy.yarn -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkConf -import org.apache.spark.scheduler.InputFormatInfo import org.apache.spark.util.{Utils, IntParam, MemoryParam} // TODO: Add code and support for ensuring that yarn resource 'tasks' are location aware ! -class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { +private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf) { var addJars: String = null var files: String = null var archives: String = null @@ -35,28 +34,56 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { var executorMemory = 1024 // MB var executorCores = 1 var numExecutors = 2 - var amQueue = sparkConf.get("QUEUE", "default") + var amQueue = sparkConf.get("spark.yarn.queue", "default") var amMemory: Int = 512 // MB var appName: String = "Spark" var priority = 0 - parseArgs(args.toList) + // Additional memory to allocate to containers + // For now, use driver's memory overhead as our AM container's memory overhead + val amMemoryOverhead = sparkConf.getInt( + "spark.yarn.driver.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) + val executorMemoryOverhead = sparkConf.getInt( + "spark.yarn.executor.memoryOverhead", YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) - // env variable SPARK_YARN_DIST_ARCHIVES/SPARK_YARN_DIST_FILES set in yarn-client then - // it should default to hdfs:// - files = Option(files).getOrElse(sys.env.get("SPARK_YARN_DIST_FILES").orNull) - archives = Option(archives).getOrElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES").orNull) + parseArgs(args.toList) + loadEnvironmentArgs() + validateArgs() + + /** Load any default arguments provided through environment variables and Spark properties. */ + private def loadEnvironmentArgs(): Unit = { + // For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://, + // while spark.yarn.dist.{archives/files} should be resolved to file:// (SPARK-2051). + files = Option(files) + .orElse(sys.env.get("SPARK_YARN_DIST_FILES")) + .orElse(sparkConf.getOption("spark.yarn.dist.files").map(p => Utils.resolveURIs(p))) + .orNull + archives = Option(archives) + .orElse(sys.env.get("SPARK_YARN_DIST_ARCHIVES")) + .orElse(sparkConf.getOption("spark.yarn.dist.archives").map(p => Utils.resolveURIs(p))) + .orNull + } - // spark.yarn.dist.archives/spark.yarn.dist.files defaults to use file:// if not specified, - // for both yarn-client and yarn-cluster - files = Option(files).getOrElse(sparkConf.getOption("spark.yarn.dist.files"). - map(p => Utils.resolveURIs(p)).orNull) - archives = Option(archives).getOrElse(sparkConf.getOption("spark.yarn.dist.archives"). - map(p => Utils.resolveURIs(p)).orNull) + /** + * Fail fast if any arguments provided are invalid. + * This is intended to be called only after the provided arguments have been parsed. + */ + private def validateArgs(): Unit = { + // TODO: memory checks are outdated (SPARK-3476) + Map[Boolean, String]( + (numExecutors <= 0) -> "You must specify at least 1 executor!", + (amMemory <= amMemoryOverhead) -> s"AM memory must be > $amMemoryOverhead MB", + (executorMemory <= executorMemoryOverhead) -> + s"Executor memory must be > $executorMemoryOverhead MB" + ).foreach { case (errorCondition, errorMessage) => + if (errorCondition) { + throw new IllegalArgumentException(errorMessage + "\n" + getUsageMessage()) + } + } + } private def parseArgs(inputArgs: List[String]): Unit = { - val userArgsBuffer: ArrayBuffer[String] = new ArrayBuffer[String]() - + val userArgsBuffer = new ArrayBuffer[String]() var args = inputArgs while (!args.isEmpty) { @@ -138,16 +165,14 @@ class ClientArguments(val args: Array[String], val sparkConf: SparkConf) { userArgs = userArgsBuffer.readOnly } - - def getUsageMessage(unknownParam: Any = null): String = { + private def getUsageMessage(unknownParam: List[String] = null): String = { val message = if (unknownParam != null) s"Unknown/unsupported param $unknownParam\n" else "" - message + "Usage: org.apache.spark.deploy.yarn.Client [options] \n" + "Options:\n" + " --jar JAR_PATH Path to your application's JAR file (required in yarn-cluster mode)\n" + " --class CLASS_NAME Name of your application's main class (required)\n" + - " --arg ARGS Argument to be passed to your application's main class.\n" + + " --arg ARG Argument to be passed to your application's main class.\n" + " Multiple invocations are possible, each will be passed in order.\n" + " --num-executors NUM Number of executors to start (Default: 2)\n" + " --executor-cores NUM Number of cores for the executors (Default: 1).\n" + diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala index 6ae4d49622..4870b0cb3d 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import scala.collection.JavaConversions._ @@ -37,154 +36,107 @@ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.util.Records + import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} /** - * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The - * Client submits an application to the YARN ResourceManager. + * The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. + * The Client submits an application to the YARN ResourceManager. */ -trait ClientBase extends Logging { - val args: ClientArguments - val conf: Configuration - val sparkConf: SparkConf - val yarnConf: YarnConfiguration - val credentials = UserGroupInformation.getCurrentUser().getCredentials() - private val SPARK_STAGING: String = ".sparkStaging" +private[spark] trait ClientBase extends Logging { + import ClientBase._ + + protected val args: ClientArguments + protected val hadoopConf: Configuration + protected val sparkConf: SparkConf + protected val yarnConf: YarnConfiguration + protected val credentials = UserGroupInformation.getCurrentUser.getCredentials + protected val amMemoryOverhead = args.amMemoryOverhead // MB + protected val executorMemoryOverhead = args.executorMemoryOverhead // MB private val distCacheMgr = new ClientDistributedCacheManager() - // Staging directory is private! -> rwx-------- - val STAGING_DIR_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) - // App files are world-wide readable and owner writable -> rw-r--r-- - val APP_FILE_PERMISSION: FsPermission = - FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) - - // Additional memory overhead - in mb. - protected def memoryOverhead: Int = sparkConf.getInt("spark.yarn.driver.memoryOverhead", - YarnSparkHadoopUtil.DEFAULT_MEMORY_OVERHEAD) - - // TODO(harvey): This could just go in ClientArguments. - def validateArgs() = { - Map( - (args.numExecutors <= 0) -> "Error: You must specify at least 1 executor!", - (args.amMemory <= memoryOverhead) -> ("Error: AM memory size must be" + - "greater than: " + memoryOverhead), - (args.executorMemory <= memoryOverhead) -> ("Error: Executor memory size" + - "must be greater than: " + memoryOverhead.toString) - ).foreach { case(cond, errStr) => - if (cond) { - logError(errStr) - throw new IllegalArgumentException(args.getUsageMessage()) - } - } - } - - def getAppStagingDir(appId: ApplicationId): String = { - SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR - } - - def verifyClusterResources(app: GetNewApplicationResponse) = { - val maxMem = app.getMaximumResourceCapability().getMemory() - logInfo("Max mem capabililty of a single resource in this cluster " + maxMem) - - // If we have requested more then the clusters max for a single resource then exit. - if (args.executorMemory > maxMem) { - val errorMessage = - "Required executor memory (%d MB), is above the max threshold (%d MB) of this cluster." - .format(args.executorMemory, maxMem) - - logError(errorMessage) - throw new IllegalArgumentException(errorMessage) - } - val amMem = args.amMemory + memoryOverhead + /** + * Fail fast if we have requested more resources per container than is available in the cluster. + */ + protected def verifyClusterResources(newAppResponse: GetNewApplicationResponse): Unit = { + val maxMem = newAppResponse.getMaximumResourceCapability().getMemory() + logInfo("Verifying our application has not requested more than the maximum " + + s"memory capability of the cluster ($maxMem MB per container)") + val executorMem = args.executorMemory + executorMemoryOverhead + if (executorMem > maxMem) { + throw new IllegalArgumentException(s"Required executor memory ($executorMem MB) " + + s"is above the max threshold ($maxMem MB) of this cluster!") + } + val amMem = args.amMemory + amMemoryOverhead if (amMem > maxMem) { - - val errorMessage = "Required AM memory (%d) is above the max threshold (%d) of this cluster." - .format(amMem, maxMem) - logError(errorMessage) - throw new IllegalArgumentException(errorMessage) + throw new IllegalArgumentException(s"Required AM memory ($amMem MB) " + + s"is above the max threshold ($maxMem MB) of this cluster!") } - // We could add checks to make sure the entire cluster has enough resources but that involves // getting all the node reports and computing ourselves. } - /** See if two file systems are the same or not. */ - private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { - val srcUri = srcFs.getUri() - val dstUri = destFs.getUri() - if (srcUri.getScheme() == null) { - return false - } - if (!srcUri.getScheme().equals(dstUri.getScheme())) { - return false - } - var srcHost = srcUri.getHost() - var dstHost = dstUri.getHost() - if ((srcHost != null) && (dstHost != null)) { - try { - srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() - dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() - } catch { - case e: UnknownHostException => - return false - } - if (!srcHost.equals(dstHost)) { - return false - } - } else if (srcHost == null && dstHost != null) { - return false - } else if (srcHost != null && dstHost == null) { - return false - } - if (srcUri.getPort() != dstUri.getPort()) { - false - } else { - true - } - } - - /** Copy the file into HDFS if needed. */ - private[yarn] def copyRemoteFile( - dstDir: Path, - originalPath: Path, + /** + * Copy the given file to a remote file system (e.g. HDFS) if needed. + * The file is only copied if the source and destination file systems are different. This is used + * for preparing resources for launching the ApplicationMaster container. Exposed for testing. + */ + def copyFileToRemote( + destDir: Path, + srcPath: Path, replication: Short, setPerms: Boolean = false): Path = { - val fs = FileSystem.get(conf) - val remoteFs = originalPath.getFileSystem(conf) - var newPath = originalPath - if (!compareFs(remoteFs, fs)) { - newPath = new Path(dstDir, originalPath.getName()) - logInfo("Uploading " + originalPath + " to " + newPath) - FileUtil.copy(remoteFs, originalPath, fs, newPath, false, conf) - fs.setReplication(newPath, replication) - if (setPerms) fs.setPermission(newPath, new FsPermission(APP_FILE_PERMISSION)) + val destFs = destDir.getFileSystem(hadoopConf) + val srcFs = srcPath.getFileSystem(hadoopConf) + var destPath = srcPath + if (!compareFs(srcFs, destFs)) { + destPath = new Path(destDir, srcPath.getName()) + logInfo(s"Uploading resource $srcPath -> $destPath") + FileUtil.copy(srcFs, srcPath, destFs, destPath, false, hadoopConf) + destFs.setReplication(destPath, replication) + if (setPerms) { + destFs.setPermission(destPath, new FsPermission(APP_FILE_PERMISSION)) + } + } else { + logInfo(s"Source and destination file systems are the same. Not copying $srcPath") } // Resolve any symlinks in the URI path so using a "current" symlink to point to a specific // version shows the specific version in the distributed cache configuration - val qualPath = fs.makeQualified(newPath) - val fc = FileContext.getFileContext(qualPath.toUri(), conf) - val destPath = fc.resolvePath(qualPath) - destPath + val qualifiedDestPath = destFs.makeQualified(destPath) + val fc = FileContext.getFileContext(qualifiedDestPath.toUri(), hadoopConf) + fc.resolvePath(qualifiedDestPath) } - private def qualifyForLocal(localURI: URI): Path = { - var qualifiedURI = localURI - // If not specified, assume these are in the local filesystem to keep behavior like Hadoop - if (qualifiedURI.getScheme() == null) { - qualifiedURI = new URI(FileSystem.getLocal(conf).makeQualified(new Path(qualifiedURI)).toString) - } + /** + * Given a local URI, resolve it and return a qualified local path that corresponds to the URI. + * This is used for preparing local resources to be included in the container launch context. + */ + private def getQualifiedLocalPath(localURI: URI): Path = { + val qualifiedURI = + if (localURI.getScheme == null) { + // If not specified, assume this is in the local filesystem to keep the behavior + // consistent with that of Hadoop + new URI(FileSystem.getLocal(hadoopConf).makeQualified(new Path(localURI)).toString) + } else { + localURI + } new Path(qualifiedURI) } + /** + * Upload any resources to the distributed cache if needed. If a resource is intended to be + * consumed locally, set up the appropriate config for downstream code to handle it properly. + * This is used for setting up a container launch context for our ApplicationMaster. + * Exposed for testing. + */ def prepareLocalResources(appStagingDir: String): HashMap[String, LocalResource] = { - logInfo("Preparing Local resources") - // Upload Spark and the application JAR to the remote file system if necessary. Add them as - // local resources to the application master. - val fs = FileSystem.get(conf) + logInfo("Preparing resources for our AM container") + // Upload Spark and the application JAR to the remote file system if necessary, + // and add them as local resources to the application master. + val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) - val nns = ClientBase.getNameNodesToAccess(sparkConf) + dst - ClientBase.obtainTokensForNamenodes(nns, conf, credentials) + val nns = getNameNodesToAccess(sparkConf) + dst + obtainTokensForNamenodes(nns, hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", 3).toShort val localResources = HashMap[String, LocalResource]() @@ -200,73 +152,84 @@ trait ClientBase extends Logging { "for alternatives.") } + /** + * Copy the given main resource to the distributed cache if the scheme is not "local". + * Otherwise, set the corresponding key in our SparkConf to handle it downstream. + * Each resource is represented by a 4-tuple of: + * (1) destination resource name, + * (2) local path to the resource, + * (3) Spark property key to set if the scheme is not local, and + * (4) whether to set permissions for this resource + */ List( - (ClientBase.SPARK_JAR, ClientBase.sparkJar(sparkConf), ClientBase.CONF_SPARK_JAR), - (ClientBase.APP_JAR, args.userJar, ClientBase.CONF_SPARK_USER_JAR), - ("log4j.properties", oldLog4jConf.getOrElse(null), null) - ).foreach { case(destName, _localPath, confKey) => + (SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR, false), + (APP_JAR, args.userJar, CONF_SPARK_USER_JAR, true), + ("log4j.properties", oldLog4jConf.orNull, null, false) + ).foreach { case (destName, _localPath, confKey, setPermissions) => val localPath: String = if (_localPath != null) _localPath.trim() else "" - if (! localPath.isEmpty()) { + if (!localPath.isEmpty()) { val localURI = new URI(localPath) - if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { - val setPermissions = destName.equals(ClientBase.APP_JAR) - val destPath = copyRemoteFile(dst, qualifyForLocal(localURI), replication, setPermissions) - val destFs = FileSystem.get(destPath.toUri(), conf) - distCacheMgr.addResource(destFs, conf, destPath, localResources, LocalResourceType.FILE, - destName, statCache) + if (localURI.getScheme != LOCAL_SCHEME) { + val src = getQualifiedLocalPath(localURI) + val destPath = copyFileToRemote(dst, src, replication, setPermissions) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, + localResources, LocalResourceType.FILE, destName, statCache) } else if (confKey != null) { + // If the resource is intended for local use only, handle this downstream + // by setting the appropriate property sparkConf.set(confKey, localPath) } } } + /** + * Do the same for any additional resources passed in through ClientArguments. + * Each resource category is represented by a 3-tuple of: + * (1) comma separated list of resources in this category, + * (2) resource type, and + * (3) whether to add these resources to the classpath + */ val cachedSecondaryJarLinks = ListBuffer.empty[String] - val fileLists = List( (args.addJars, LocalResourceType.FILE, true), + List( + (args.addJars, LocalResourceType.FILE, true), (args.files, LocalResourceType.FILE, false), - (args.archives, LocalResourceType.ARCHIVE, false) ) - fileLists.foreach { case (flist, resType, addToClasspath) => + (args.archives, LocalResourceType.ARCHIVE, false) + ).foreach { case (flist, resType, addToClasspath) => if (flist != null && !flist.isEmpty()) { - flist.split(',').foreach { case file: String => + flist.split(',').foreach { file => val localURI = new URI(file.trim()) - if (!ClientBase.LOCAL_SCHEME.equals(localURI.getScheme())) { + if (localURI.getScheme != LOCAL_SCHEME) { val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyRemoteFile(dst, localPath, replication) - distCacheMgr.addResource(fs, conf, destPath, localResources, resType, - linkname, statCache) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache) if (addToClasspath) { cachedSecondaryJarLinks += linkname } } else if (addToClasspath) { + // Resource is intended for local use only and should be added to the class path cachedSecondaryJarLinks += file.trim() } } } } - logInfo("Prepared Local resources " + localResources) - sparkConf.set(ClientBase.CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) + if (cachedSecondaryJarLinks.nonEmpty) { + sparkConf.set(CONF_SPARK_YARN_SECONDARY_JARS, cachedSecondaryJarLinks.mkString(",")) + } - UserGroupInformation.getCurrentUser().addCredentials(credentials) localResources } - /** Get all application master environment variables set on this SparkConf */ - def getAppMasterEnv: Seq[(String, String)] = { - val prefix = "spark.yarn.appMasterEnv." - sparkConf.getAll.filter{case (k, v) => k.startsWith(prefix)} - .map{case (k, v) => (k.substring(prefix.length), v)} - } - - - def setupLaunchEnv( - localResources: HashMap[String, LocalResource], - stagingDir: String): HashMap[String, String] = { - logInfo("Setting up the launch environment") - + /** + * Set up the environment for launching our ApplicationMaster container. + */ + private def setupLaunchEnv(stagingDir: String): HashMap[String, String] = { + logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() - val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - ClientBase.populateClasspath(args, yarnConf, sparkConf, env, extraCp) + populateClasspath(args, yarnConf, sparkConf, env, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -275,42 +238,20 @@ trait ClientBase extends Logging { distCacheMgr.setDistFilesEnv(env) distCacheMgr.setDistArchivesEnv(env) - getAppMasterEnv.foreach { case (key, value) => - YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator) - } + // Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.* + val amEnvPrefix = "spark.yarn.appMasterEnv." + sparkConf.getAll + .filter { case (k, v) => k.startsWith(amEnvPrefix) } + .map { case (k, v) => (k.substring(amEnvPrefix.length), v) } + .foreach { case (k, v) => YarnSparkHadoopUtil.addPathToEnvironment(env, k, v) } // Keep this for backwards compatibility but users should move to the config sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => // Allow users to specify some environment variables. - YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs, File.pathSeparator) - + YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) // Pass SPARK_YARN_USER_ENV itself to the AM so it can use it to set up executor environments. env("SPARK_YARN_USER_ENV") = userEnvs } - env - } - - def userArgsToString(clientArgs: ClientArguments): String = { - val prefix = " --arg " - val args = clientArgs.userArgs - val retval = new StringBuilder() - for (arg <- args) { - retval.append(prefix).append(" ").append(YarnSparkHadoopUtil.escapeForShell(arg)) - } - retval.toString - } - - def setupSecurityToken(amContainer: ContainerLaunchContext) - - def createContainerLaunchContext( - newApp: GetNewApplicationResponse, - localResources: HashMap[String, LocalResource], - env: HashMap[String, String]): ContainerLaunchContext = { - logInfo("Setting up container launch context") - val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) - amContainer.setLocalResources(localResources) - - val isLaunchingDriver = args.userClass != null // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's @@ -320,6 +261,7 @@ trait ClientBase extends Logging { // Note that to warn the user about the deprecation in cluster mode, some code from // SparkConf#validateSettings() is duplicated here (to avoid triggering the condition // described above). + val isLaunchingDriver = args.userClass != null if (isLaunchingDriver) { sys.env.get("SPARK_JAVA_OPTS").foreach { value => val warning = @@ -342,14 +284,30 @@ trait ClientBase extends Logging { env("SPARK_JAVA_OPTS") = value } } - amContainer.setEnvironment(env) - val amMemory = args.amMemory + env + } + + /** + * Set up a ContainerLaunchContext to launch our ApplicationMaster container. + * This sets up the launch environment, java options, and the command for launching the AM. + */ + protected def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) + : ContainerLaunchContext = { + logInfo("Setting up container launch context for our AM") + + val appId = newAppResponse.getApplicationId + val appStagingDir = getAppStagingDir(appId) + val localResources = prepareLocalResources(appStagingDir) + val launchEnv = setupLaunchEnv(appStagingDir) + val amContainer = Records.newRecord(classOf[ContainerLaunchContext]) + amContainer.setLocalResources(localResources) + amContainer.setEnvironment(launchEnv) val javaOpts = ListBuffer[String]() // Add Xmx for AM memory - javaOpts += "-Xmx" + amMemory + "m" + javaOpts += "-Xmx" + args.amMemory + "m" val tmpDir = new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) javaOpts += "-Djava.io.tmpdir=" + tmpDir @@ -361,8 +319,7 @@ trait ClientBase extends Logging { // Instead of using this, rely on cpusets by YARN to enforce "proper" Spark behavior in // multi-tenant environments. Not sure how default Java GC behaves if it is limited to subset // of cores on a node. - val useConcurrentAndIncrementalGC = env.isDefinedAt("SPARK_USE_CONC_INCR_GC") && - java.lang.Boolean.parseBoolean(env("SPARK_USE_CONC_INCR_GC")) + val useConcurrentAndIncrementalGC = launchEnv.get("SPARK_USE_CONC_INCR_GC").exists(_.toBoolean) if (useConcurrentAndIncrementalGC) { // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines @@ -380,6 +337,8 @@ trait ClientBase extends Logging { javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } + // Include driver-specific java options if we are launching a driver + val isLaunchingDriver = args.userClass != null if (isLaunchingDriver) { sparkConf.getOption("spark.driver.extraJavaOptions") .orElse(sys.env.get("SPARK_JAVA_OPTS")) @@ -397,19 +356,27 @@ trait ClientBase extends Logging { } else { Nil } + val userJar = + if (args.userJar != null) { + Seq("--jar", args.userJar) + } else { + Nil + } val amClass = if (isLaunchingDriver) { - classOf[ApplicationMaster].getName() + Class.forName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { - classOf[ApplicationMaster].getName().replace("ApplicationMaster", "ExecutorLauncher") + Class.forName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName } + val userArgs = args.userArgs.flatMap { arg => + Seq("--arg", YarnSparkHadoopUtil.escapeForShell(arg)) + } val amArgs = - Seq(amClass) ++ userClass ++ - (if (args.userJar != null) Seq("--jar", args.userJar) else Nil) ++ - Seq("--executor-memory", args.executorMemory.toString, + Seq(amClass) ++ userClass ++ userJar ++ userArgs ++ + Seq( + "--executor-memory", args.executorMemory.toString, "--executor-cores", args.executorCores.toString, - "--num-executors ", args.numExecutors.toString, - userArgsToString(args)) + "--num-executors ", args.numExecutors.toString) // Command for the ApplicationMaster val commands = Seq(Environment.JAVA_HOME.$() + "/bin/java", "-server") ++ @@ -418,41 +385,153 @@ trait ClientBase extends Logging { "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout", "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") - logInfo("Yarn AM launch context:") - logInfo(s" user class: ${args.userClass}") - logInfo(s" env: $env") - logInfo(s" command: ${commands.mkString(" ")}") - // TODO: it would be nicer to just make sure there are no null commands here val printableCommands = commands.map(s => if (s == null) "null" else s).toList amContainer.setCommands(printableCommands) - setupSecurityToken(amContainer) + logDebug("===============================================================================") + logDebug("Yarn AM launch context:") + logDebug(s" user class: ${Option(args.userClass).getOrElse("N/A")}") + logDebug(" env:") + launchEnv.foreach { case (k, v) => logDebug(s" $k -> $v") } + logDebug(" resources:") + localResources.foreach { case (k, v) => logDebug(s" $k -> $v")} + logDebug(" command:") + logDebug(s" ${printableCommands.mkString(" ")}") + logDebug("===============================================================================") // send the acl settings into YARN to control who has access via YARN interfaces val securityManager = new SecurityManager(sparkConf) amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager)) + setupSecurityToken(amContainer) + UserGroupInformation.getCurrentUser().addCredentials(credentials) amContainer } + + /** + * Report the state of an application until it has exited, either successfully or + * due to some failure, then return the application state. + * + * @param appId ID of the application to monitor. + * @param returnOnRunning Whether to also return the application state when it is RUNNING. + * @param logApplicationReport Whether to log details of the application report every iteration. + * @return state of the application, one of FINISHED, FAILED, KILLED, and RUNNING. + */ + def monitorApplication( + appId: ApplicationId, + returnOnRunning: Boolean = false, + logApplicationReport: Boolean = true): YarnApplicationState = { + val interval = sparkConf.getLong("spark.yarn.report.interval", 1000) + var lastState: YarnApplicationState = null + while (true) { + Thread.sleep(interval) + val report = getApplicationReport(appId) + val state = report.getYarnApplicationState + + if (logApplicationReport) { + logInfo(s"Application report for $appId (state: $state)") + val details = Seq[(String, String)]( + ("client token", getClientToken(report)), + ("diagnostics", report.getDiagnostics), + ("ApplicationMaster host", report.getHost), + ("ApplicationMaster RPC port", report.getRpcPort.toString), + ("queue", report.getQueue), + ("start time", report.getStartTime.toString), + ("final status", report.getFinalApplicationStatus.toString), + ("tracking URL", report.getTrackingUrl), + ("user", report.getUser) + ) + + // Use more loggable format if value is null or empty + val formattedDetails = details + .map { case (k, v) => + val newValue = Option(v).filter(_.nonEmpty).getOrElse("N/A") + s"\n\t $k: $newValue" } + .mkString("") + + // If DEBUG is enabled, log report details every iteration + // Otherwise, log them every time the application changes state + if (log.isDebugEnabled) { + logDebug(formattedDetails) + } else if (lastState != state) { + logInfo(formattedDetails) + } + } + + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + return state + } + + if (returnOnRunning && state == YarnApplicationState.RUNNING) { + return state + } + + lastState = state + } + + // Never reached, but keeps compiler happy + throw new SparkException("While loop is depleted! This should never happen...") + } + + /** + * Submit an application to the ResourceManager and monitor its state. + * This continues until the application has exited for any reason. + */ + def run(): Unit = monitorApplication(submitApplication()) + + /* --------------------------------------------------------------------------------------- * + | Methods that cannot be implemented here due to API differences across hadoop versions | + * --------------------------------------------------------------------------------------- */ + + /** Submit an application running our ApplicationMaster to the ResourceManager. */ + def submitApplication(): ApplicationId + + /** Set up security tokens for launching our ApplicationMaster container. */ + protected def setupSecurityToken(containerContext: ContainerLaunchContext): Unit + + /** Get the application report from the ResourceManager for an application we have submitted. */ + protected def getApplicationReport(appId: ApplicationId): ApplicationReport + + /** + * Return the security token used by this client to communicate with the ApplicationMaster. + * If no security is enabled, the token returned by the report is null. + */ + protected def getClientToken(report: ApplicationReport): String } -object ClientBase extends Logging { +private[spark] object ClientBase extends Logging { + + // Alias for the Spark assembly jar and the user jar val SPARK_JAR: String = "__spark__.jar" val APP_JAR: String = "__app__.jar" + + // URI scheme that identifies local resources val LOCAL_SCHEME = "local" + + // Staging directory for any temporary jars or files + val SPARK_STAGING: String = ".sparkStaging" + + // Location of any user-defined Spark jars val CONF_SPARK_JAR = "spark.yarn.jar" - /** - * This is an internal config used to propagate the location of the user's jar file to the - * driver/executors. - */ + val ENV_SPARK_JAR = "SPARK_JAR" + + // Internal config to propagate the location of the user's jar to the driver/executors val CONF_SPARK_USER_JAR = "spark.yarn.user.jar" - /** - * This is an internal config used to propagate the list of extra jars to add to the classpath - * of executors. - */ + + // Internal config to propagate the locations of any extra jars to add to the classpath + // of the executors val CONF_SPARK_YARN_SECONDARY_JARS = "spark.yarn.secondary.jars" - val ENV_SPARK_JAR = "SPARK_JAR" + + // Staging directory is private! -> rwx-------- + val STAGING_DIR_PERMISSION: FsPermission = + FsPermission.createImmutable(Integer.parseInt("700", 8).toShort) + + // App files are world-wide readable and owner writable -> rw-r--r-- + val APP_FILE_PERMISSION: FsPermission = + FsPermission.createImmutable(Integer.parseInt("644", 8).toShort) /** * Find the user-defined Spark jar if configured, or return the jar containing this @@ -461,7 +540,7 @@ object ClientBase extends Logging { * This method first looks in the SparkConf object for the CONF_SPARK_JAR key, and in the * user environment if that is not found (for backwards compatibility). */ - def sparkJar(conf: SparkConf) = { + private def sparkJar(conf: SparkConf): String = { if (conf.contains(CONF_SPARK_JAR)) { conf.get(CONF_SPARK_JAR) } else if (System.getenv(ENV_SPARK_JAR) != null) { @@ -474,16 +553,22 @@ object ClientBase extends Logging { } } - def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]) = { + /** + * Return the path to the given application's staging directory. + */ + private def getAppStagingDir(appId: ApplicationId): String = { + SPARK_STAGING + Path.SEPARATOR + appId.toString() + Path.SEPARATOR + } + + /** + * Populate the classpath entry in the given environment map with any application + * classpath specified through the Hadoop and Yarn configurations. + */ + def populateHadoopClasspath(conf: Configuration, env: HashMap[String, String]): Unit = { val classPathElementsToAdd = getYarnAppClasspath(conf) ++ getMRAppClasspath(conf) for (c <- classPathElementsToAdd.flatten) { - YarnSparkHadoopUtil.addToEnvironment( - env, - Environment.CLASSPATH.name, - c.trim, - File.pathSeparator) + YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, c.trim) } - classPathElementsToAdd } private def getYarnAppClasspath(conf: Configuration): Option[Seq[String]] = @@ -519,7 +604,7 @@ object ClientBase extends Logging { /** * In Hadoop 0.23, the MR application classpath comes with the YARN application - * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. + * classpath. In Hadoop 2.0, it's an array of Strings, and in 2.2+ it's a String. * So we need to use reflection to retrieve it. */ def getDefaultMRApplicationClasspath: Option[Seq[String]] = { @@ -545,8 +630,16 @@ object ClientBase extends Logging { triedDefault.toOption } - def populateClasspath(args: ClientArguments, conf: Configuration, sparkConf: SparkConf, - env: HashMap[String, String], extraClassPath: Option[String] = None) { + /** + * Populate the classpath entry in the given environment map. + * This includes the user jar, Spark jar, and any extra application jars. + */ + def populateClasspath( + args: ClientArguments, + conf: Configuration, + sparkConf: SparkConf, + env: HashMap[String, String], + extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry(Environment.PWD.$(), env) @@ -554,36 +647,40 @@ object ClientBase extends Logging { if (sparkConf.get("spark.yarn.user.classpath.first", "false").toBoolean) { addUserClasspath(args, sparkConf, env) addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - ClientBase.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) } else { addFileToClasspath(sparkJar(sparkConf), SPARK_JAR, env) - ClientBase.populateHadoopClasspath(conf, env) + populateHadoopClasspath(conf, env) addUserClasspath(args, sparkConf, env) } // Append all jar files under the working directory to the classpath. - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env); + addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + "*", env) } /** * Adds the user jars which have local: URIs (or alternate names, such as APP_JAR) explicitly * to the classpath. */ - private def addUserClasspath(args: ClientArguments, conf: SparkConf, - env: HashMap[String, String]) = { - if (args != null) { - addFileToClasspath(args.userJar, APP_JAR, env) - if (args.addJars != null) { - args.addJars.split(",").foreach { case file: String => - addFileToClasspath(file, null, env) - } + private def addUserClasspath( + args: ClientArguments, + conf: SparkConf, + env: HashMap[String, String]): Unit = { + + // If `args` is not null, we are launching an AM container. + // Otherwise, we are launching executor containers. + val (mainJar, secondaryJars) = + if (args != null) { + (args.userJar, args.addJars) + } else { + (conf.get(CONF_SPARK_USER_JAR, null), conf.get(CONF_SPARK_YARN_SECONDARY_JARS, null)) } - } else { - val userJar = conf.get(CONF_SPARK_USER_JAR, null) - addFileToClasspath(userJar, APP_JAR, env) - val cachedSecondaryJarLinks = conf.get(CONF_SPARK_YARN_SECONDARY_JARS, "").split(",") - cachedSecondaryJarLinks.foreach(jar => addFileToClasspath(jar, null, env)) + addFileToClasspath(mainJar, APP_JAR, env) + if (secondaryJars != null) { + secondaryJars.split(",").filter(_.nonEmpty).foreach { jar => + addFileToClasspath(jar, null, env) + } } } @@ -599,46 +696,44 @@ object ClientBase extends Logging { * @param fileName Alternate name for the file (optional). * @param env Map holding the environment variables. */ - private def addFileToClasspath(path: String, fileName: String, - env: HashMap[String, String]) : Unit = { + private def addFileToClasspath( + path: String, + fileName: String, + env: HashMap[String, String]): Unit = { if (path != null) { scala.util.control.Exception.ignoring(classOf[URISyntaxException]) { - val localPath = getLocalPath(path) - if (localPath != null) { - addClasspathEntry(localPath, env) + val uri = new URI(path) + if (uri.getScheme == LOCAL_SCHEME) { + addClasspathEntry(uri.getPath, env) return } } } if (fileName != null) { - addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env); + addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + fileName, env) } } /** - * Returns the local path if the URI is a "local:" URI, or null otherwise. + * Add the given path to the classpath entry of the given environment map. + * If the classpath is already set, this appends the new path to the existing classpath. */ - private def getLocalPath(resource: String): String = { - val uri = new URI(resource) - if (LOCAL_SCHEME.equals(uri.getScheme())) { - return uri.getPath() - } - null - } - - private def addClasspathEntry(path: String, env: HashMap[String, String]) = - YarnSparkHadoopUtil.addToEnvironment(env, Environment.CLASSPATH.name, path, - File.pathSeparator) + private def addClasspathEntry(path: String, env: HashMap[String, String]): Unit = + YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path) /** * Get the list of namenodes the user may access. */ - private[yarn] def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { - sparkConf.get("spark.yarn.access.namenodes", "").split(",").map(_.trim()).filter(!_.isEmpty) - .map(new Path(_)).toSet + def getNameNodesToAccess(sparkConf: SparkConf): Set[Path] = { + sparkConf.get("spark.yarn.access.namenodes", "") + .split(",") + .map(_.trim()) + .filter(!_.isEmpty) + .map(new Path(_)) + .toSet } - private[yarn] def getTokenRenewer(conf: Configuration): String = { + def getTokenRenewer(conf: Configuration): String = { val delegTokenRenewer = Master.getMasterPrincipal(conf) logDebug("delegation token renewer is: " + delegTokenRenewer) if (delegTokenRenewer == null || delegTokenRenewer.length() == 0) { @@ -652,17 +747,54 @@ object ClientBase extends Logging { /** * Obtains tokens for the namenodes passed in and adds them to the credentials. */ - private[yarn] def obtainTokensForNamenodes(paths: Set[Path], conf: Configuration, - creds: Credentials) { + def obtainTokensForNamenodes( + paths: Set[Path], + conf: Configuration, + creds: Credentials): Unit = { if (UserGroupInformation.isSecurityEnabled()) { val delegTokenRenewer = getTokenRenewer(conf) + paths.foreach { dst => + val dstFs = dst.getFileSystem(conf) + logDebug("getting token for namenode: " + dst) + dstFs.addDelegationTokens(delegTokenRenewer, creds) + } + } + } - paths.foreach { - dst => - val dstFs = dst.getFileSystem(conf) - logDebug("getting token for namenode: " + dst) - dstFs.addDelegationTokens(delegTokenRenewer, creds) + /** + * Return whether the two file systems are the same. + */ + private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = { + val srcUri = srcFs.getUri() + val dstUri = destFs.getUri() + if (srcUri.getScheme() == null) { + return false + } + if (!srcUri.getScheme().equals(dstUri.getScheme())) { + return false + } + var srcHost = srcUri.getHost() + var dstHost = dstUri.getHost() + if ((srcHost != null) && (dstHost != null)) { + try { + srcHost = InetAddress.getByName(srcHost).getCanonicalHostName() + dstHost = InetAddress.getByName(dstHost).getCanonicalHostName() + } catch { + case e: UnknownHostException => + return false } + if (!srcHost.equals(dstHost)) { + return false + } + } else if (srcHost == null && dstHost != null) { + return false + } else if (srcHost != null && dstHost == null) { + return false + } + if (srcUri.getPort() != dstUri.getPort()) { + false + } else { + true } } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala index 9b7f1fca96..c592ecfdfc 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientDistributedCacheManager.scala @@ -19,29 +19,24 @@ package org.apache.spark.deploy.yarn import java.net.URI +import scala.collection.mutable.{HashMap, LinkedHashMap, Map} + import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.FileStatus -import org.apache.hadoop.fs.FileSystem -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.hadoop.fs.permission.FsAction -import org.apache.hadoop.yarn.api.records.LocalResource -import org.apache.hadoop.yarn.api.records.LocalResourceVisibility -import org.apache.hadoop.yarn.api.records.LocalResourceType +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.util.{Records, ConverterUtils} -import org.apache.spark.Logging - -import scala.collection.mutable.HashMap -import scala.collection.mutable.LinkedHashMap -import scala.collection.mutable.Map - +import org.apache.spark.Logging /** Client side methods to setup the Hadoop distributed cache */ -class ClientDistributedCacheManager() extends Logging { - private val distCacheFiles: Map[String, Tuple3[String, String, String]] = - LinkedHashMap[String, Tuple3[String, String, String]]() - private val distCacheArchives: Map[String, Tuple3[String, String, String]] = - LinkedHashMap[String, Tuple3[String, String, String]]() +private[spark] class ClientDistributedCacheManager() extends Logging { + + // Mappings from remote URI to (file status, modification time, visibility) + private val distCacheFiles: Map[String, (String, String, String)] = + LinkedHashMap[String, (String, String, String)]() + private val distCacheArchives: Map[String, (String, String, String)] = + LinkedHashMap[String, (String, String, String)]() /** @@ -68,9 +63,9 @@ class ClientDistributedCacheManager() extends Logging { resourceType: LocalResourceType, link: String, statCache: Map[URI, FileStatus], - appMasterOnly: Boolean = false) = { + appMasterOnly: Boolean = false): Unit = { val destStatus = fs.getFileStatus(destPath) - val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + val amJarRsrc = Records.newRecord(classOf[LocalResource]) amJarRsrc.setType(resourceType) val visibility = getVisibility(conf, destPath.toUri(), statCache) amJarRsrc.setVisibility(visibility) @@ -80,7 +75,7 @@ class ClientDistributedCacheManager() extends Logging { if (link == null || link.isEmpty()) throw new Exception("You must specify a valid link name") localResources(link) = amJarRsrc - if (appMasterOnly == false) { + if (!appMasterOnly) { val uri = destPath.toUri() val pathURI = new URI(uri.getScheme(), uri.getAuthority(), uri.getPath(), null, link) if (resourceType == LocalResourceType.FILE) { @@ -95,12 +90,10 @@ class ClientDistributedCacheManager() extends Logging { /** * Adds the necessary cache file env variables to the env passed in - * @param env */ - def setDistFilesEnv(env: Map[String, String]) = { + def setDistFilesEnv(env: Map[String, String]): Unit = { val (keys, tupleValues) = distCacheFiles.unzip val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { env("SPARK_YARN_CACHE_FILES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = @@ -114,12 +107,10 @@ class ClientDistributedCacheManager() extends Logging { /** * Adds the necessary cache archive env variables to the env passed in - * @param env */ - def setDistArchivesEnv(env: Map[String, String]) = { + def setDistArchivesEnv(env: Map[String, String]): Unit = { val (keys, tupleValues) = distCacheArchives.unzip val (sizes, timeStamps, visibilities) = tupleValues.unzip3 - if (keys.size > 0) { env("SPARK_YARN_CACHE_ARCHIVES") = keys.reduceLeft[String] { (acc,n) => acc + "," + n } env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = @@ -133,25 +124,21 @@ class ClientDistributedCacheManager() extends Logging { /** * Returns the local resource visibility depending on the cache file permissions - * @param conf - * @param uri - * @param statCache * @return LocalResourceVisibility */ - def getVisibility(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): - LocalResourceVisibility = { + def getVisibility( + conf: Configuration, + uri: URI, + statCache: Map[URI, FileStatus]): LocalResourceVisibility = { if (isPublic(conf, uri, statCache)) { - return LocalResourceVisibility.PUBLIC - } - LocalResourceVisibility.PRIVATE + LocalResourceVisibility.PUBLIC + } else { + LocalResourceVisibility.PRIVATE + } } /** - * Returns a boolean to denote whether a cache file is visible to all(public) - * or not - * @param conf - * @param uri - * @param statCache + * Returns a boolean to denote whether a cache file is visible to all (public) * @return true if the path in the uri is visible to all, false otherwise */ def isPublic(conf: Configuration, uri: URI, statCache: Map[URI, FileStatus]): Boolean = { @@ -167,13 +154,12 @@ class ClientDistributedCacheManager() extends Logging { /** * Returns true if all ancestors of the specified path have the 'execute' * permission set for all users (i.e. that other users can traverse - * the directory heirarchy to the given path) - * @param fs - * @param path - * @param statCache + * the directory hierarchy to the given path) * @return true if all ancestors have the 'execute' permission set for all users */ - def ancestorsHaveExecutePermissions(fs: FileSystem, path: Path, + def ancestorsHaveExecutePermissions( + fs: FileSystem, + path: Path, statCache: Map[URI, FileStatus]): Boolean = { var current = path while (current != null) { @@ -187,32 +173,25 @@ class ClientDistributedCacheManager() extends Logging { } /** - * Checks for a given path whether the Other permissions on it + * Checks for a given path whether the Other permissions on it * imply the permission in the passed FsAction - * @param fs - * @param path - * @param action - * @param statCache * @return true if the path in the uri is visible to all, false otherwise */ - def checkPermissionOfOther(fs: FileSystem, path: Path, - action: FsAction, statCache: Map[URI, FileStatus]): Boolean = { + def checkPermissionOfOther( + fs: FileSystem, + path: Path, + action: FsAction, + statCache: Map[URI, FileStatus]): Boolean = { val status = getFileStatus(fs, path.toUri(), statCache) val perms = status.getPermission() val otherAction = perms.getOtherAction() - if (otherAction.implies(action)) { - return true - } - false + otherAction.implies(action) } /** - * Checks to see if the given uri exists in the cache, if it does it + * Checks to see if the given uri exists in the cache, if it does it * returns the existing FileStatus, otherwise it stats the uri, stores * it in the cache, and returns the FileStatus. - * @param fs - * @param uri - * @param statCache * @return FileStatus */ def getFileStatus(fs: FileSystem, uri: URI, statCache: Map[URI, FileStatus]): FileStatus = { diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala index f56f72cafe..bbbf615510 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnableUtil.scala @@ -17,7 +17,6 @@ package org.apache.spark.deploy.yarn -import java.io.File import java.net.URI import scala.collection.JavaConversions._ @@ -128,9 +127,9 @@ trait ExecutorRunnableUtil extends Logging { localResources: HashMap[String, LocalResource], timestamp: String, size: String, - vis: String) = { + vis: String): Unit = { val uri = new URI(file) - val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + val amJarRsrc = Records.newRecord(classOf[LocalResource]) amJarRsrc.setType(rtype) amJarRsrc.setVisibility(LocalResourceVisibility.valueOf(vis)) amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) @@ -175,14 +174,17 @@ trait ExecutorRunnableUtil extends Logging { ClientBase.populateClasspath(null, yarnConf, sparkConf, env, extraCp) sparkConf.getExecutorEnv.foreach { case (key, value) => - YarnSparkHadoopUtil.addToEnvironment(env, key, value, File.pathSeparator) + // This assumes each executor environment variable set here is a path + // This is kept for backward compatibility and consistency with hadoop + YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } // Keep this for backwards compatibility but users should move to the config - YarnSparkHadoopUtil.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"), - File.pathSeparator) + sys.env.get("SPARK_YARN_USER_ENV").foreach { userEnvs => + YarnSparkHadoopUtil.setEnvFromInputString(env, userEnvs) + } - System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k,v) => env(k) = v } + System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v } env } diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4a33e34c3b..0b712c2019 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -18,6 +18,7 @@ package org.apache.spark.deploy.yarn import java.lang.{Boolean => JBoolean} +import java.io.File import java.util.{Collections, Set => JSet} import java.util.regex.Matcher import java.util.regex.Pattern @@ -29,14 +30,12 @@ import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.hadoop.security.Credentials import org.apache.hadoop.security.UserGroupInformation -import org.apache.hadoop.util.StringInterner import org.apache.hadoop.yarn.conf.YarnConfiguration -import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.ApplicationAccessType import org.apache.hadoop.yarn.util.RackResolver import org.apache.hadoop.conf.Configuration -import org.apache.spark.{SecurityManager, SparkConf, SparkContext} +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils @@ -100,30 +99,26 @@ object YarnSparkHadoopUtil { private val hostToRack = new ConcurrentHashMap[String, String]() private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]() - def addToEnvironment( - env: HashMap[String, String], - variable: String, - value: String, - classPathSeparator: String) = { - var envVariable = "" - if (env.get(variable) == None) { - envVariable = value - } else { - envVariable = env.get(variable).get + classPathSeparator + value - } - env put (StringInterner.weakIntern(variable), StringInterner.weakIntern(envVariable)) + /** + * Add a path variable to the given environment map. + * If the map already contains this key, append the value to the existing value instead. + */ + def addPathToEnvironment(env: HashMap[String, String], key: String, value: String): Unit = { + val newValue = if (env.contains(key)) { env(key) + File.pathSeparator + value } else value + env.put(key, newValue) } - def setEnvFromInputString( - env: HashMap[String, String], - envString: String, - classPathSeparator: String) = { - if (envString != null && envString.length() > 0) { - var childEnvs = envString.split(",") - var p = Pattern.compile(getEnvironmentVariableRegex()) + /** + * Set zero or more environment variables specified by the given input string. + * The input string is expected to take the form "KEY1=VAL1,KEY2=VAL2,KEY3=VAL3". + */ + def setEnvFromInputString(env: HashMap[String, String], inputString: String): Unit = { + if (inputString != null && inputString.length() > 0) { + val childEnvs = inputString.split(",") + val p = Pattern.compile(environmentVariableRegex) for (cEnv <- childEnvs) { - var parts = cEnv.split("=") // split on '=' - var m = p.matcher(parts(1)) + val parts = cEnv.split("=") // split on '=' + val m = p.matcher(parts(1)) val sb = new StringBuffer while (m.find()) { val variable = m.group(1) @@ -131,8 +126,7 @@ object YarnSparkHadoopUtil { if (env.get(variable) != None) { replace = env.get(variable).get } else { - // if this key is not configured for the child .. get it - // from the env + // if this key is not configured for the child .. get it from the env replace = System.getenv(variable) if (replace == null) { // the env key is note present anywhere .. simply set it @@ -142,14 +136,15 @@ object YarnSparkHadoopUtil { m.appendReplacement(sb, Matcher.quoteReplacement(replace)) } m.appendTail(sb) - addToEnvironment(env, parts(0), sb.toString(), classPathSeparator) + // This treats the environment variable as path variable delimited by `File.pathSeparator` + // This is kept for backward compatibility and consistency with Hadoop's behavior + addPathToEnvironment(env, parts(0), sb.toString) } } } - private def getEnvironmentVariableRegex() : String = { - val osName = System.getProperty("os.name") - if (osName startsWith "Windows") { + private val environmentVariableRegex: String = { + if (Utils.isWindows) { "%([A-Za-z_][A-Za-z0-9_]*?)%" } else { "\\$([A-Za-z_][A-Za-z0-9_]*)" @@ -181,14 +176,14 @@ object YarnSparkHadoopUtil { } } - private[spark] def lookupRack(conf: Configuration, host: String): String = { + def lookupRack(conf: Configuration, host: String): String = { if (!hostToRack.contains(host)) { populateRackInfo(conf, host) } hostToRack.get(host) } - private[spark] def populateRackInfo(conf: Configuration, hostname: String) { + def populateRackInfo(conf: Configuration, hostname: String) { Utils.checkHost(hostname) if (!hostToRack.containsKey(hostname)) { @@ -212,8 +207,8 @@ object YarnSparkHadoopUtil { } } - private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager): - Map[ApplicationAccessType, String] = { + def getApplicationAclsForYarn(securityMgr: SecurityManager) + : Map[ApplicationAccessType, String] = { Map[ApplicationAccessType, String] ( ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls, ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 6aa6475fe4..200a308992 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler.cluster import org.apache.hadoop.yarn.api.records.{ApplicationId, YarnApplicationState} import org.apache.spark.{SparkException, Logging, SparkContext} -import org.apache.spark.deploy.yarn.{Client, ClientArguments, YarnSparkHadoopUtil} +import org.apache.spark.deploy.yarn.{Client, ClientArguments} import org.apache.spark.scheduler.TaskSchedulerImpl import scala.collection.mutable.ArrayBuffer @@ -34,115 +34,120 @@ private[spark] class YarnClientSchedulerBackend( minRegisteredRatio = 0.8 } - var client: Client = null - var appId: ApplicationId = null - var checkerThread: Thread = null - var stopping: Boolean = false - var totalExpectedExecutors = 0 - - private[spark] def addArg(optionName: String, envVar: String, sysProp: String, - arrayBuf: ArrayBuffer[String]) { - if (System.getenv(envVar) != null) { - arrayBuf += (optionName, System.getenv(envVar)) - } else if (sc.getConf.contains(sysProp)) { - arrayBuf += (optionName, sc.getConf.get(sysProp)) - } - } + private var client: Client = null + private var appId: ApplicationId = null + private var stopping: Boolean = false + private var totalExpectedExecutors = 0 + /** + * Create a Yarn client to submit an application to the ResourceManager. + * This waits until the application is running. + */ override def start() { super.start() - val driverHost = conf.get("spark.driver.host") val driverPort = conf.get("spark.driver.port") val hostport = driverHost + ":" + driverPort sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.appUIHostPort) } val argsArrayBuf = new ArrayBuffer[String]() - argsArrayBuf += ( - "--args", hostport - ) - - // process any optional arguments, given either as environment variables - // or system properties. use the defaults already defined in ClientArguments - // if things aren't specified. system properties override environment - // variables. - List(("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), - ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), - ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), - ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), - ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), - ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), - ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), - ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), - ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), - ("--name", "SPARK_YARN_APP_NAME", "spark.app.name")) - .foreach { case (optName, envVar, sysProp) => addArg(optName, envVar, sysProp, argsArrayBuf) } - - logDebug("ClientArguments called with: " + argsArrayBuf) + argsArrayBuf += ("--arg", hostport) + argsArrayBuf ++= getExtraClientArguments + + logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) val args = new ClientArguments(argsArrayBuf.toArray, conf) totalExpectedExecutors = args.numExecutors client = new Client(args, conf) - appId = client.runApp() - waitForApp() - checkerThread = yarnApplicationStateCheckerThread() + appId = client.submitApplication() + waitForApplication() + asyncMonitorApplication() } - def waitForApp() { - - // TODO : need a better way to find out whether the executors are ready or not - // maybe by resource usage report? - while(true) { - val report = client.getApplicationReport(appId) - - logInfo("Application report from ASM: \n" + - "\t appMasterRpcPort: " + report.getRpcPort() + "\n" + - "\t appStartTime: " + report.getStartTime() + "\n" + - "\t yarnAppState: " + report.getYarnApplicationState() + "\n" + /** + * Return any extra command line arguments to be passed to Client provided in the form of + * environment variables or Spark properties. + */ + private def getExtraClientArguments: Seq[String] = { + val extraArgs = new ArrayBuffer[String] + val optionTuples = // List of (target Client argument, environment variable, Spark property) + List( + ("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"), + ("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"), + ("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"), + ("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"), + ("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"), + ("--executor-memory", "SPARK_EXECUTOR_MEMORY", "spark.executor.memory"), + ("--executor-cores", "SPARK_WORKER_CORES", "spark.executor.cores"), + ("--executor-cores", "SPARK_EXECUTOR_CORES", "spark.executor.cores"), + ("--queue", "SPARK_YARN_QUEUE", "spark.yarn.queue"), + ("--name", "SPARK_YARN_APP_NAME", "spark.app.name") ) - - // Ready to go, or already gone. - val state = report.getYarnApplicationState() - if (state == YarnApplicationState.RUNNING) { - return - } else if (state == YarnApplicationState.FINISHED || - state == YarnApplicationState.FAILED || - state == YarnApplicationState.KILLED) { - throw new SparkException("Yarn application already ended," + - "might be killed or not able to launch application master.") + optionTuples.foreach { case (optionName, envVar, sparkProp) => + if (System.getenv(envVar) != null) { + extraArgs += (optionName, System.getenv(envVar)) + } else if (sc.getConf.contains(sparkProp)) { + extraArgs += (optionName, sc.getConf.get(sparkProp)) } + } + extraArgs + } - Thread.sleep(1000) + /** + * Report the state of the application until it is running. + * If the application has finished, failed or been killed in the process, throw an exception. + * This assumes both `client` and `appId` have already been set. + */ + private def waitForApplication(): Unit = { + assert(client != null && appId != null, "Application has not been submitted yet!") + val state = client.monitorApplication(appId, returnOnRunning = true) // blocking + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.FAILED || + state == YarnApplicationState.KILLED) { + throw new SparkException("Yarn application has already ended! " + + "It might have been killed or unable to launch application master.") + } + if (state == YarnApplicationState.RUNNING) { + logInfo(s"Application $appId has started running.") } } - private def yarnApplicationStateCheckerThread(): Thread = { + /** + * Monitor the application state in a separate thread. + * If the application has exited for any reason, stop the SparkContext. + * This assumes both `client` and `appId` have already been set. + */ + private def asyncMonitorApplication(): Unit = { + assert(client != null && appId != null, "Application has not been submitted yet!") val t = new Thread { override def run() { while (!stopping) { val report = client.getApplicationReport(appId) val state = report.getYarnApplicationState() - if (state == YarnApplicationState.FINISHED || state == YarnApplicationState.KILLED - || state == YarnApplicationState.FAILED) { - logError(s"Yarn application already ended: $state") + if (state == YarnApplicationState.FINISHED || + state == YarnApplicationState.KILLED || + state == YarnApplicationState.FAILED) { + logError(s"Yarn application has already exited with state $state!") sc.stop() stopping = true } Thread.sleep(1000L) } - checkerThread = null Thread.currentThread().interrupt() } } - t.setName("Yarn Application State Checker") + t.setName("Yarn application state monitor") t.setDaemon(true) t.start() - t } + /** + * Stop the scheduler. This assumes `start()` has already been called. + */ override def stop() { + assert(client != null, "Attempted to stop this scheduler before starting it!") stopping = true super.stop() - client.stop + client.stop() logInfo("Stopped") } diff --git a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala index c3b7a2c8f0..9bd916100d 100644 --- a/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala +++ b/yarn/common/src/test/scala/org/apache/spark/deploy/yarn/ClientBaseSuite.scala @@ -27,7 +27,7 @@ import org.apache.hadoop.mapreduce.MRJobConfig import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse -import org.apache.hadoop.yarn.api.records.ContainerLaunchContext +import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Matchers._ import org.mockito.Mockito._ @@ -90,7 +90,7 @@ class ClientBaseSuite extends FunSuite with Matchers { val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - ClientBase.populateClasspath(args, conf, sparkConf, env, None) + ClientBase.populateClasspath(args, conf, sparkConf, env) val cp = env("CLASSPATH").split(File.pathSeparator) s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -114,10 +114,10 @@ class ClientBaseSuite extends FunSuite with Matchers { val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) val client = spy(new DummyClient(args, conf, sparkConf, yarnConf)) - doReturn(new Path("/")).when(client).copyRemoteFile(any(classOf[Path]), + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), any(classOf[Path]), anyShort(), anyBoolean()) - var tempDir = Files.createTempDir(); + val tempDir = Files.createTempDir() try { client.prepareLocalResources(tempDir.getAbsolutePath()) sparkConf.getOption(ClientBase.CONF_SPARK_USER_JAR) should be (Some(USER)) @@ -247,13 +247,13 @@ class ClientBaseSuite extends FunSuite with Matchers { private class DummyClient( val args: ClientArguments, - val conf: Configuration, + val hadoopConf: Configuration, val sparkConf: SparkConf, val yarnConf: YarnConfiguration) extends ClientBase { - - override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = - throw new UnsupportedOperationException() - + override def setupSecurityToken(amContainer: ContainerLaunchContext): Unit = ??? + override def submitApplication(): ApplicationId = ??? + override def getApplicationReport(appId: ApplicationId): ApplicationReport = ??? + override def getClientToken(report: ApplicationReport): String = ??? } } |