From 9d4246863a25f7c91f324e004fe000b9848f6057 Mon Sep 17 00:00:00 2001 From: "Y.CORP.YAHOO.COM\\tgraves" Date: Mon, 23 Sep 2013 09:09:59 -0500 Subject: Support distributed cache files and archives on spark on yarn and attempt to cleanup the staging directory on exit --- .../spark/deploy/yarn/ApplicationMaster.scala | 55 +++++++- .../org/apache/spark/deploy/yarn/Client.scala | 140 ++++++++++++++++++--- .../apache/spark/deploy/yarn/ClientArguments.scala | 14 ++- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 46 ++++++- 4 files changed, 226 insertions(+), 29 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 858b58d338..c1a87d3373 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,22 +17,25 @@ package org.apache.spark.deploy.yarn +import java.io.IOException; import java.net.Socket +import java.security.PrivilegedExceptionAction import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils import org.apache.hadoop.security.UserGroupInformation -import java.security.PrivilegedExceptionAction +import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var appAttemptId: ApplicationAttemptId = null private var userThread: Thread = null private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + private val fs = FileSystem.get(yarnConf) private var yarnAllocator: YarnAllocationHandler = null private var isFinished:Boolean = false private var uiAddress: String = "" + private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) + private var isLastAMRetry: Boolean = true def run() { // setup the directories so things go to yarn approved directories rather // then user specified and /tmp System.setProperty("spark.local.dir", getLocalDirs()) + + // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // It need shutdown hook to set SUCCEEDED successed = true } finally { + logDebug("finishing main") + isLastAMRetry = true; if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { @@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - - private def allocateWorkers() { try { logInfo("Allocating " + args.numWorkers + " workers.") @@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.finishApplicationMaster(finishReq) } + + /** + * clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent() + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case e: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, e) + } + } + + // The shutdown hook that runs when a signal is received AND during normal + // close of the JVM. + class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } } @@ -368,6 +413,8 @@ object ApplicationMaster { // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit // Should not really have to do this, but it helps yarn to evict resources earlier. // not to mention, prevent Client declaring failure even though we exit'ed properly. + // Note that this will unfortunately not properly clean up the staging files because it gets called to + // late and the filesystem is already shutdown. if (modified) { Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run' diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 844c707834..b4d243ed7a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,7 +45,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - val credentials = UserGroupInformation.getCurrentUser().getCredentials(); + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + private var distFiles = None: Option[String] + private var distFilesTimeStamps = None: Option[String] + private var distFilesFileSizes = None: Option[String] + private var distArchives = None: Option[String] + private var distArchivesTimeStamps = None: Option[String] + private var distArchivesFileSizes = None: Option[String] def run() { init(yarnConf) @@ -57,7 +63,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl verifyClusterResources(newApp) val appContext = createApplicationSubmissionContext(appId) - val localResources = prepareLocalResources(appId, "spark") + val localResources = prepareLocalResources(appId, ".sparkStaging") val env = setupLaunchEnv(localResources) val amContainer = createContainerLaunchContext(newApp, localResources, env) @@ -109,10 +115,71 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setApplicationName("Spark") return appContext } - - def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = { + + /** + * Copy the local file into HDFS and configure to be distributed with the + * job via the distributed cache. + * If a fragment is specified the file will be referenced as that fragment. + */ + private def copyLocalFile( + dstDir: Path, + resourceType: LocalResourceType, + originalPath: Path, + replication: Short, + localResources: HashMap[String,LocalResource], + fragment: String) = { + val fs = FileSystem.get(conf) + val newPath = new Path(dstDir, originalPath.getName()) + logInfo("Uploading " + originalPath + " to " + newPath) + fs.copyFromLocalFile(false, true, originalPath, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) + + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(resourceType) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) + amJarRsrc.setTimestamp(destStatus.getModificationTime()) + amJarRsrc.setSize(destStatus.getLen()) + var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName()); + if ((fragment == null) || (fragment.isEmpty())){ + localResources(originalPath.getName()) = amJarRsrc + } else { + localResources(fragment) = amJarRsrc + pathURI = new URI(newPath.toString() + "#" + fragment); + } + val distPath = pathURI.toString() + if (resourceType == LocalResourceType.FILE) { + distFiles match { + case Some(path) => + distFilesFileSizes = Some(distFilesFileSizes.get + "," + + destStatus.getLen().toString()) + distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distFiles = Some(path + "," + distPath) + case _ => + distFilesFileSizes = Some(destStatus.getLen().toString()) + distFilesTimeStamps = Some(destStatus.getModificationTime().toString()) + distFiles = Some(distPath) + } + } else { + distArchives match { + case Some(path) => + distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + + destStatus.getLen().toString()) + distArchives = Some(path + "," + distPath) + case _ => + distArchivesTimeStamps = Some(destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(destStatus.getLen().toString()) + distArchives = Some(distPath) + } + } + } + + def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() // Upload Spark and the application JAR to the remote file system // Add them as local resources to the AM val fs = FileSystem.get(conf) @@ -125,33 +192,59 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/" + val dst = new Path(fs.getHomeDirectory(), pathSuffix) + val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort + + if (UserGroupInformation.isSecurityEnabled()) { + val dstFs = dst.getFileSystem(conf) + dstFs.addDelegationTokens(delegTokenRenewer, credentials); + } + val localResources = HashMap[String, LocalResource]() + Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val src = new Path(localPath) - val pathSuffix = appName + "/" + appId.getId() + destName - val dst = new Path(fs.getHomeDirectory(), pathSuffix) - logInfo("Uploading " + src + " to " + dst) - fs.copyFromLocalFile(false, true, src, dst) - val destStatus = fs.getFileStatus(dst) - - // get tokens for anything we upload to hdfs - if (UserGroupInformation.isSecurityEnabled()) { - fs.addDelegationTokens(delegTokenRenewer, credentials); - } + val newPath = new Path(dst, destName) + logInfo("Uploading " + src + " to " + newPath) + fs.copyFromLocalFile(false, true, src, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] amJarRsrc.setType(LocalResourceType.FILE) amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) amJarRsrc.setTimestamp(destStatus.getModificationTime()) amJarRsrc.setSize(destStatus.getLen()) - locaResources(destName) = amJarRsrc + localResources(destName) = amJarRsrc + } + } + + // handle any distributed cache files + if ((args.files != null) && (!args.files.isEmpty())){ + args.files.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment()) + } + } + + // handle any distributed cache archives + if ((args.archives != null) && (!args.archives.isEmpty())) { + args.archives.split(',').foreach { case file:String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, + localResources, tmpURI.getFragment()) } } + UserGroupInformation.getCurrentUser().addCredentials(credentials); - return locaResources + return localResources } def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = { @@ -186,6 +279,17 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + if (distFiles != None) { + env("SPARK_YARN_CACHE_FILES") = distFiles.get + env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get + env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get + } + if (distArchives != None) { + env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get + env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get + env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get + } + // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index cd651904d2..30d9b6e60f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,8 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var files: String = null + var archives: String = null var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() @@ -78,6 +80,14 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--files") :: value :: tail => + files = value + args = tail + + case ("--archives") :: value :: tail => + archives = value + args = tail + case Nil => if (userJar == null || userClass == null) { printUsageAndExit(1) @@ -108,7 +118,9 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + + " --files file Comma separated list of files to be distributed with the job.\n" + + " --archives archive Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 6229167cb4..d340b114df 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -136,11 +136,26 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S startReq.setContainerLaunchContext(ctx) cm.startContainer(startReq) } + + private def setupDistributedCache(file: String, + rtype: LocalResourceType, + localResources: HashMap[String, LocalResource], + timestamp: String, + size: String) = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + localResources(uri.getFragment()) = amJarRsrc + } def prepareLocalResources: HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() + val localResources = HashMap[String, LocalResource]() // Spark JAR val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -150,7 +165,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_JAR_PATH")))) sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong) sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong) - locaResources("spark.jar") = sparkJarResource + localResources("spark.jar") = sparkJarResource // User JAR val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] userJarResource.setType(LocalResourceType.FILE) @@ -159,7 +174,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_USERJAR_PATH")))) userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong) userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong) - locaResources("app.jar") = userJarResource + localResources("app.jar") = userJarResource // Log4j conf - if available if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { @@ -170,12 +185,31 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_LOG4J_PATH")))) log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong) log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong) - locaResources("log4j.properties") = log4jConfResource + localResources("log4j.properties") = log4jConfResource } + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') + for( i <- 0 to distFiles.length - 1) { + setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), + fileSizes(i)) + } + } + + if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') + val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') + for( i <- 0 to distArchives.length - 1) { + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + timeStamps(i), fileSizes(i)) + } + } - logInfo("Prepared Local resources " + locaResources) - return locaResources + logInfo("Prepared Local resources " + localResources) + return localResources } def prepareEnvironment: HashMap[String, String] = { -- cgit v1.2.3 From 0fff4ee8523ff4137eedfc314b51135427137c63 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Thu, 3 Oct 2013 11:52:16 -0500 Subject: Adding in the --addJars option to make SparkContext.addJar work on yarn and cleanup the classpaths --- .../main/scala/org/apache/spark/SparkContext.scala | 17 +++++++++++--- docs/running-on-yarn.md | 2 ++ .../org/apache/spark/deploy/yarn/Client.scala | 27 ++++++++++++++++------ .../apache/spark/deploy/yarn/ClientArguments.scala | 10 ++++++-- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 17 +++++--------- 5 files changed, 50 insertions(+), 23 deletions(-) (limited to 'yarn') diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 72540c712a..8ed5dcbf5d 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -643,10 +643,21 @@ class SparkContext( key = uri.getScheme match { case null | "file" => if (env.hadoop.isYarnMode()) { - logWarning("local jar specified as parameter to addJar under Yarn mode") - return + // In order for this to work on yarn the user must specify the --addjars option to + // the client to upload the file into the distributed cache to make it show up in the + // current working directory. + val fileName = new Path(uri.getPath).getName() + try { + env.httpFileServer.addJar(new File(fileName)) + } catch { + case e: Exception => { + logError("Error adding jar (" + e + "), was the --addJars option used?") + throw e + } + } + } else { + env.httpFileServer.addJar(new File(uri.getPath)) } - env.httpFileServer.addJar(new File(uri.getPath)) case _ => path } diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index beaae69aa2..a807ec603d 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -53,6 +53,7 @@ The command to launch the YARN Client is as follows: --worker-memory \ --worker-cores \ --queue \ + --addJars \ --files \ --archives @@ -88,3 +89,4 @@ The above starts a YARN Client programs which periodically polls the Application - We do not requesting container resources based on the number of cores. Thus the numbers of cores given via command line arguments cannot be guaranteed. - The local directories used for spark will be the local directories configured for YARN (Hadoop Yarn config yarn.nodemanager.local-dirs). If the user specifies spark.local.dir, it will be ignored. - The --files and --archives options support specifying file names with the # similar to Hadoop. For example you can specify: --files localtest.txt#appSees.txt and this will upload the file you have locally named localtest.txt into HDFS but this will be linked to by the name appSees.txt and your application should use the name as appSees.txt to reference it when running on YARN. +- The --addJars option allows the SparkContext.addJar function to work if you are using it with local files. It does not need to be used if you are using it with HDFS, HTTP, HTTPS, or FTP files. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index b4d243ed7a..fb1b339f27 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -127,7 +127,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl originalPath: Path, replication: Short, localResources: HashMap[String,LocalResource], - fragment: String) = { + fragment: String, + appMasterOnly: Boolean = false): Unit = { val fs = FileSystem.get(conf) val newPath = new Path(dstDir, originalPath.getName()) logInfo("Uploading " + originalPath + " to " + newPath) @@ -149,6 +150,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl pathURI = new URI(newPath.toString() + "#" + fragment); } val distPath = pathURI.toString() + if (appMasterOnly == true) return if (resourceType == LocalResourceType.FILE) { distFiles match { case Some(path) => @@ -223,6 +225,16 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + // handle any add jars + if ((args.addJars != null) && (!args.addJars.isEmpty())){ + args.addJars.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment(), true) + } + } + // handle any distributed cache files if ((args.files != null) && (!args.files.isEmpty())){ args.files.split(',').foreach { case file: String => @@ -253,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = @@ -279,6 +290,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // set the environment variables to be passed on to the Workers if (distFiles != None) { env("SPARK_YARN_CACHE_FILES") = distFiles.get env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get @@ -328,8 +340,8 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl // Add Xmx for am memory JAVA_OPTS += "-Xmx" + amMemory + "m " - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. @@ -345,6 +357,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl JAVA_OPTS += " -XX:CMSIncrementalDutyCycleMin=0 " JAVA_OPTS += " -XX:CMSIncrementalDutyCycle=10 " } + if (env.isDefinedAt("SPARK_JAVA_OPTS")) { JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index 30d9b6e60f..0833153541 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,7 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var addJars: String = null var files: String = null var archives: String = null var userJar: String = null @@ -80,6 +81,10 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--addJars") :: value :: tail => + addJars = value + args = tail + case ("--files") :: value :: tail => files = value args = tail @@ -119,8 +124,9 @@ class ClientArguments(val args: Array[String]) { " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + - " --files file Comma separated list of files to be distributed with the job.\n" + - " --archives archive Comma separated list of archives to be distributed with the job." + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + + " --files files Comma separated list of files to be distributed with the job.\n" + + " --archives archives Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index d340b114df..8dac9e02ac 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -77,8 +77,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S JAVA_OPTS += env("SPARK_JAVA_OPTS") + " " } - JAVA_OPTS += " -Djava.io.tmpdir=" + new Path(Environment.PWD.$(), - YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + JAVA_OPTS += " -Djava.io.tmpdir=" + + new Path(Environment.PWD.$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) + " " + // Commenting it out for now - so that people can refer to the properties if required. Remove it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence if there are multiple containers in same @@ -215,15 +216,9 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - // Which is correct ? - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - } - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") Client.populateHadoopClasspath(yarnConf, env) // allow users to specify some environment variables -- cgit v1.2.3 From cc7df2b3ccdee602a6a90964628676e7dc4e0954 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Wed, 16 Oct 2013 10:09:16 -0500 Subject: Fix yarn build --- .../scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 6d6ef149cc..25da9aa917 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -22,7 +22,7 @@ import org.apache.spark.util.Utils import org.apache.spark.scheduler.SplitInfo import scala.collection import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId, ContainerId, Priority, Resource, ResourceRequest, ContainerStatus, Container} -import org.apache.spark.scheduler.cluster.{ClusterScheduler, StandaloneSchedulerBackend} +import org.apache.spark.scheduler.cluster.{ClusterScheduler, CoarseGrainedSchedulerBackend} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} import org.apache.hadoop.yarn.util.{RackResolver, Records} import java.util.concurrent.{CopyOnWriteArrayList, ConcurrentHashMap} @@ -211,7 +211,7 @@ private[yarn] class YarnAllocationHandler(val conf: Configuration, val resourceM val workerId = workerIdCounter.incrementAndGet().toString val driverUrl = "akka://spark@%s:%s/user/%s".format( System.getProperty("spark.driver.host"), System.getProperty("spark.driver.port"), - StandaloneSchedulerBackend.ACTOR_NAME) + CoarseGrainedSchedulerBackend.ACTOR_NAME) logInfo("launching container on " + containerId + " host " + workerHostname) // just to be safe, simply remove it from pendingReleaseContainers. Should not be there, but .. -- cgit v1.2.3 From b6571541a6043ed12cb8af51e198e207968394a7 Mon Sep 17 00:00:00 2001 From: tgravescs Date: Mon, 21 Oct 2013 14:05:15 -0500 Subject: Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath to be explicit about inclusion of spark.jar and app.jar --- .../org/apache/spark/deploy/yarn/Client.scala | 32 ++++++++++++++++++---- .../apache/spark/deploy/yarn/WorkerRunnable.scala | 7 ++--- 2 files changed, 29 insertions(+), 10 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8afb3e39cb..1a380ae714 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = localResources("spark.jar").getResource().getScheme.toString() + "://" + @@ -451,4 +447,30 @@ object Client { Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) } } + + def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + // If log4j present, ensure ours overrides all others + if (addLog4j) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "log4j.properties") + } + // normally the users app.jar is last in case conflicts with spark jars + val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false") + .toBoolean + if (userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "spark.jar") + Client.populateHadoopClasspath(conf, env) + + if (!userClasspathFirst) { + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "app.jar") + } + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() + + Path.SEPARATOR + "*") + } } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 8dac9e02ac..ba352daac4 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? " -XX:OnOutOfMemoryError='kill %p' " + JAVA_OPTS + - " org.apache.spark.executor.StandaloneExecutorBackend " + + " org.apache.spark.executor.CoarseGrainedExecutorBackend " + masterAddress + " " + slaveId + " " + hostname + " " + @@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) - Apps.addToEnvironment(env, Environment.CLASSPATH.name, - Environment.PWD.$() + Path.SEPARATOR + "*") - Client.populateHadoopClasspath(yarnConf, env) + Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env) // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) -- cgit v1.2.3