diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-10 19:34:33 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-10-10 19:34:33 -0700 |
commit | 8f11c36fe17c2718c895b771b59a9138521e0079 (patch) | |
tree | ae2bdd4eec278538fd5c6e971b3a1e42cfa62b60 /yarn/src | |
parent | c71499b7795564e1d16495c59273ecc027070fc5 (diff) | |
parent | 0fff4ee8523ff4137eedfc314b51135427137c63 (diff) | |
download | spark-8f11c36fe17c2718c895b771b59a9138521e0079.tar.gz spark-8f11c36fe17c2718c895b771b59a9138521e0079.tar.bz2 spark-8f11c36fe17c2718c895b771b59a9138521e0079.zip |
Merge remote-tracking branch 'tgravescs/sparkYarnDistCache'
Closes #11
Conflicts:
docs/running-on-yarn.md
yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala
Diffstat (limited to 'yarn/src')
4 files changed, 253 insertions, 45 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 858b58d338..c1a87d3373 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,22 +17,25 @@ package org.apache.spark.deploy.yarn +import java.io.IOException; import java.net.Socket +import java.security.PrivilegedExceptionAction import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils import org.apache.hadoop.security.UserGroupInformation -import java.security.PrivilegedExceptionAction +import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var appAttemptId: ApplicationAttemptId = null private var userThread: Thread = null private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + private val fs = FileSystem.get(yarnConf) private var yarnAllocator: YarnAllocationHandler = null private var isFinished:Boolean = false private var uiAddress: String = "" + private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) + private var isLastAMRetry: Boolean = true def run() { // setup the directories so things go to yarn approved directories rather // then user specified and /tmp System.setProperty("spark.local.dir", getLocalDirs()) + + // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // It need shutdown hook to set SUCCEEDED successed = true } finally { + logDebug("finishing main") + isLastAMRetry = true; if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { @@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - - private def allocateWorkers() { try { logInfo("Allocating " + args.numWorkers + " workers.") @@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.finishApplicationMaster(finishReq) } + + /** + * clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent() + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case e: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, e) + } + } + + // The shutdown hook that runs when a signal is received AND during normal + // close of the JVM. + class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } } @@ -368,6 +413,8 @@ object ApplicationMaster { // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit // Should not really have to do this, but it helps yarn to evict resources earlier. // not to mention, prevent Client declaring failure even though we exit'ed properly. + // Note that this will unfortunately not properly clean up the staging files because it gets called to + // late and the filesystem is already shutdown. if (modified) { Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run' diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 076dd3c9b0..8afb3e39cb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,7 +45,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - val credentials = UserGroupInformation.getCurrentUser().getCredentials(); + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + private var distFiles = None: Option[String] + private var distFilesTimeStamps = None: Option[String] + private var distFilesFileSizes = None: Option[String] + private var distArchives = None: Option[String] + private var distArchivesTimeStamps = None: Option[String] + private var distArchivesFileSizes = None: Option[String] def run() { init(yarnConf) @@ -57,7 +63,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl verifyClusterResources(newApp) val appContext = createApplicationSubmissionContext(appId) - val localResources = prepareLocalResources(appId, "spark") + val localResources = prepareLocalResources(appId, ".sparkStaging") val env = setupLaunchEnv(localResources) val amContainer = createContainerLaunchContext(newApp, localResources, env) @@ -109,10 +115,73 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setApplicationName(args.appName) return appContext } - - def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = { + + /** + * Copy the local file into HDFS and configure to be distributed with the + * job via the distributed cache. + * If a fragment is specified the file will be referenced as that fragment. + */ + private def copyLocalFile( + dstDir: Path, + resourceType: LocalResourceType, + originalPath: Path, + replication: Short, + localResources: HashMap[String,LocalResource], + fragment: String, + appMasterOnly: Boolean = false): Unit = { + val fs = FileSystem.get(conf) + val newPath = new Path(dstDir, originalPath.getName()) + logInfo("Uploading " + originalPath + " to " + newPath) + fs.copyFromLocalFile(false, true, originalPath, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) + + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(resourceType) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) + amJarRsrc.setTimestamp(destStatus.getModificationTime()) + amJarRsrc.setSize(destStatus.getLen()) + var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName()); + if ((fragment == null) || (fragment.isEmpty())){ + localResources(originalPath.getName()) = amJarRsrc + } else { + localResources(fragment) = amJarRsrc + pathURI = new URI(newPath.toString() + "#" + fragment); + } + val distPath = pathURI.toString() + if (appMasterOnly == true) return + if (resourceType == LocalResourceType.FILE) { + distFiles match { + case Some(path) => + distFilesFileSizes = Some(distFilesFileSizes.get + "," + + destStatus.getLen().toString()) + distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distFiles = Some(path + "," + distPath) + case _ => + distFilesFileSizes = Some(destStatus.getLen().toString()) + distFilesTimeStamps = Some(destStatus.getModificationTime().toString()) + distFiles = Some(distPath) + } + } else { + distArchives match { + case Some(path) => + distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + + destStatus.getLen().toString()) + distArchives = Some(path + "," + distPath) + case _ => + distArchivesTimeStamps = Some(destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(destStatus.getLen().toString()) + distArchives = Some(distPath) + } + } + } + + def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() // Upload Spark and the application JAR to the remote file system // Add them as local resources to the AM val fs = FileSystem.get(conf) @@ -125,33 +194,69 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/" + val dst = new Path(fs.getHomeDirectory(), pathSuffix) + val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort + + if (UserGroupInformation.isSecurityEnabled()) { + val dstFs = dst.getFileSystem(conf) + dstFs.addDelegationTokens(delegTokenRenewer, credentials); + } + val localResources = HashMap[String, LocalResource]() + Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val src = new Path(localPath) - val pathSuffix = appName + "/" + appId.getId() + destName - val dst = new Path(fs.getHomeDirectory(), pathSuffix) - logInfo("Uploading " + src + " to " + dst) - fs.copyFromLocalFile(false, true, src, dst) - val destStatus = fs.getFileStatus(dst) - - // get tokens for anything we upload to hdfs - if (UserGroupInformation.isSecurityEnabled()) { - fs.addDelegationTokens(delegTokenRenewer, credentials); - } + val newPath = new Path(dst, destName) + logInfo("Uploading " + src + " to " + newPath) + fs.copyFromLocalFile(false, true, src, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] amJarRsrc.setType(LocalResourceType.FILE) amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) amJarRsrc.setTimestamp(destStatus.getModificationTime()) amJarRsrc.setSize(destStatus.getLen()) - locaResources(destName) = amJarRsrc + localResources(destName) = amJarRsrc } } + + // handle any add jars + if ((args.addJars != null) && (!args.addJars.isEmpty())){ + args.addJars.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment(), true) + } + } + + // handle any distributed cache files + if ((args.files != null) && (!args.files.isEmpty())){ + args.files.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment()) + } + } + + // handle any distributed cache archives + if ((args.archives != null) && (!args.archives.isEmpty())) { + args.archives.split(',').foreach { case file:String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, + localResources, tmpURI.getFragment()) + } + } + UserGroupInformation.getCurrentUser().addCredentials(credentials); - return locaResources + return localResources } def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = { @@ -160,11 +265,10 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (log4jConfLocalRes != null) Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") Client.populateHadoopClasspath(yarnConf, env) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_JAR_PATH") = @@ -186,6 +290,18 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + // set the environment variables to be passed on to the Workers + if (distFiles != None) { + env("SPARK_YARN_CACHE_FILES") = distFiles.get + env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get + env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get + } + if (distArchives != None) { + env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get + env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get + env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get + } + // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index c56dbd99ba..852dbd7dab 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,9 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var addJars: String = null + var files: String = null + var archives: String = null var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() @@ -81,6 +84,17 @@ class ClientArguments(val args: Array[String]) { case ("--name") :: value :: tail => appName = value + + case ("--addJars") :: value :: tail => + addJars = value + args = tail + + case ("--files") :: value :: tail => + files = value + args = tail + + case ("--archives") :: value :: tail => + archives = value args = tail case Nil => @@ -97,7 +111,7 @@ class ClientArguments(val args: Array[String]) { inputFormatInfo = inputFormatMap.values.toList } - + def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { if (unknownParam != null) { System.err.println("Unknown/unsupported param " + unknownParam) @@ -113,10 +127,13 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --name NAME The name of your application (Default: Spark)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" + " --name NAME The name of your application (Default: Spark)\n" + + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + + " --addJars jars Comma separated list of local jars that want SparkContext.addJar to work with.\n" + + " --files files Comma separated list of files to be distributed with the job.\n" + + " --archives archives Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } - + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index a60e8a3007..8dac9e02ac 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -137,11 +137,26 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S startReq.setContainerLaunchContext(ctx) cm.startContainer(startReq) } + + private def setupDistributedCache(file: String, + rtype: LocalResourceType, + localResources: HashMap[String, LocalResource], + timestamp: String, + size: String) = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + localResources(uri.getFragment()) = amJarRsrc + } def prepareLocalResources: HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() + val localResources = HashMap[String, LocalResource]() // Spark JAR val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -151,7 +166,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_JAR_PATH")))) sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong) sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong) - locaResources("spark.jar") = sparkJarResource + localResources("spark.jar") = sparkJarResource // User JAR val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] userJarResource.setType(LocalResourceType.FILE) @@ -160,7 +175,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_USERJAR_PATH")))) userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong) userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong) - locaResources("app.jar") = userJarResource + localResources("app.jar") = userJarResource // Log4j conf - if available if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { @@ -171,26 +186,39 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_LOG4J_PATH")))) log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong) log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong) - locaResources("log4j.properties") = log4jConfResource + localResources("log4j.properties") = log4jConfResource + } + + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') + for( i <- 0 to distFiles.length - 1) { + setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), + fileSizes(i)) + } } + if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') + val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') + for( i <- 0 to distArchives.length - 1) { + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + timeStamps(i), fileSizes(i)) + } + } - logInfo("Prepared Local resources " + locaResources) - return locaResources + logInfo("Prepared Local resources " + localResources) + return localResources } def prepareEnvironment: HashMap[String, String] = { val env = new HashMap[String, String]() - // If log4j present, ensure ours overrides all others - if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { - // Which is correct ? - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./log4j.properties") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./") - } - - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "./*") - Apps.addToEnvironment(env, Environment.CLASSPATH.name, "$CLASSPATH") + Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) + Apps.addToEnvironment(env, Environment.CLASSPATH.name, + Environment.PWD.$() + Path.SEPARATOR + "*") Client.populateHadoopClasspath(yarnConf, env) // allow users to specify some environment variables |