diff options
author | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-09-23 09:09:59 -0500 |
---|---|---|
committer | Y.CORP.YAHOO.COM\tgraves <tgraves@thatenemy-lm.champ.corp.yahoo.com> | 2013-09-23 09:09:59 -0500 |
commit | 9d4246863a25f7c91f324e004fe000b9848f6057 (patch) | |
tree | bc5c669de3cb38bcbc4b323c0bd12253848c0d6e /yarn | |
parent | 119de80294bd0cb82855bd1982c5371b661b6fd5 (diff) | |
download | spark-9d4246863a25f7c91f324e004fe000b9848f6057.tar.gz spark-9d4246863a25f7c91f324e004fe000b9848f6057.tar.bz2 spark-9d4246863a25f7c91f324e004fe000b9848f6057.zip |
Support distributed cache files and archives on spark on yarn and attempt to cleanup the staging directory on exit
Diffstat (limited to 'yarn')
4 files changed, 226 insertions, 29 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 858b58d338..c1a87d3373 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -17,22 +17,25 @@ package org.apache.spark.deploy.yarn +import java.io.IOException; import java.net.Socket +import java.security.PrivilegedExceptionAction import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.atomic.{AtomicInteger, AtomicReference} import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.hadoop.net.NetUtils +import org.apache.hadoop.util.ShutdownHookManager import org.apache.hadoop.yarn.api._ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.api.protocolrecords._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} -import scala.collection.JavaConversions._ import org.apache.spark.{SparkContext, Logging} import org.apache.spark.util.Utils import org.apache.hadoop.security.UserGroupInformation -import java.security.PrivilegedExceptionAction +import scala.collection.JavaConversions._ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) extends Logging { @@ -43,18 +46,26 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e private var appAttemptId: ApplicationAttemptId = null private var userThread: Thread = null private val yarnConf: YarnConfiguration = new YarnConfiguration(conf) + private val fs = FileSystem.get(yarnConf) private var yarnAllocator: YarnAllocationHandler = null private var isFinished:Boolean = false private var uiAddress: String = "" + private val maxAppAttempts: Int = conf.getInt(YarnConfiguration.RM_AM_MAX_RETRIES, + YarnConfiguration.DEFAULT_RM_AM_MAX_RETRIES) + private var isLastAMRetry: Boolean = true def run() { // setup the directories so things go to yarn approved directories rather // then user specified and /tmp System.setProperty("spark.local.dir", getLocalDirs()) + + // use priority 30 as its higher then HDFS. Its same priority as MapReduce is using + ShutdownHookManager.get().addShutdownHook(new AppMasterShutdownHook(this), 30) appAttemptId = getApplicationAttemptId() + isLastAMRetry = appAttemptId.getAttemptId() >= maxAppAttempts; resourceManager = registerWithResourceManager() // Workaround until hadoop moves to something which has @@ -183,6 +194,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e // It need shutdown hook to set SUCCEEDED successed = true } finally { + logDebug("finishing main") + isLastAMRetry = true; if (successed) { ApplicationMaster.this.finishApplicationMaster(FinalApplicationStatus.SUCCEEDED) } else { @@ -229,8 +242,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e } } - - private def allocateWorkers() { try { logInfo("Allocating " + args.numWorkers + " workers.") @@ -329,6 +340,40 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration) e resourceManager.finishApplicationMaster(finishReq) } + + /** + * clean up the staging directory. + */ + private def cleanupStagingDir() { + var stagingDirPath: Path = null + try { + val preserveFiles = System.getProperty("spark.yarn.preserve.staging.files", "false").toBoolean + if (!preserveFiles) { + stagingDirPath = new Path(System.getenv("SPARK_YARN_JAR_PATH")).getParent() + if (stagingDirPath == null) { + logError("Staging directory is null") + return + } + logInfo("Deleting staging directory " + stagingDirPath) + fs.delete(stagingDirPath, true) + } + } catch { + case e: IOException => + logError("Failed to cleanup staging dir " + stagingDirPath, e) + } + } + + // The shutdown hook that runs when a signal is received AND during normal + // close of the JVM. + class AppMasterShutdownHook(appMaster: ApplicationMaster) extends Runnable { + + def run() { + logInfo("AppMaster received a signal.") + // we need to clean up staging dir before HDFS is shut down + // make sure we don't delete it until this is the last AM + if (appMaster.isLastAMRetry) appMaster.cleanupStagingDir() + } + } } @@ -368,6 +413,8 @@ object ApplicationMaster { // Add a shutdown hook - as a best case effort in case users do not call sc.stop or do System.exit // Should not really have to do this, but it helps yarn to evict resources earlier. // not to mention, prevent Client declaring failure even though we exit'ed properly. + // Note that this will unfortunately not properly clean up the staging files because it gets called to + // late and the filesystem is already shutdown. if (modified) { Runtime.getRuntime().addShutdownHook(new Thread with Logging { // This is not just to log, but also to ensure that log system is initialized for this instance when we actually are 'run' diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 844c707834..b4d243ed7a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -45,7 +45,13 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl var rpc: YarnRPC = YarnRPC.create(conf) val yarnConf: YarnConfiguration = new YarnConfiguration(conf) - val credentials = UserGroupInformation.getCurrentUser().getCredentials(); + val credentials = UserGroupInformation.getCurrentUser().getCredentials() + private var distFiles = None: Option[String] + private var distFilesTimeStamps = None: Option[String] + private var distFilesFileSizes = None: Option[String] + private var distArchives = None: Option[String] + private var distArchivesTimeStamps = None: Option[String] + private var distArchivesFileSizes = None: Option[String] def run() { init(yarnConf) @@ -57,7 +63,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl verifyClusterResources(newApp) val appContext = createApplicationSubmissionContext(appId) - val localResources = prepareLocalResources(appId, "spark") + val localResources = prepareLocalResources(appId, ".sparkStaging") val env = setupLaunchEnv(localResources) val amContainer = createContainerLaunchContext(newApp, localResources, env) @@ -109,10 +115,71 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl appContext.setApplicationName("Spark") return appContext } - - def prepareLocalResources(appId: ApplicationId, appName: String): HashMap[String, LocalResource] = { + + /** + * Copy the local file into HDFS and configure to be distributed with the + * job via the distributed cache. + * If a fragment is specified the file will be referenced as that fragment. + */ + private def copyLocalFile( + dstDir: Path, + resourceType: LocalResourceType, + originalPath: Path, + replication: Short, + localResources: HashMap[String,LocalResource], + fragment: String) = { + val fs = FileSystem.get(conf) + val newPath = new Path(dstDir, originalPath.getName()) + logInfo("Uploading " + originalPath + " to " + newPath) + fs.copyFromLocalFile(false, true, originalPath, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) + + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(resourceType) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) + amJarRsrc.setTimestamp(destStatus.getModificationTime()) + amJarRsrc.setSize(destStatus.getLen()) + var pathURI: URI = new URI(newPath.toString() + "#" + originalPath.getName()); + if ((fragment == null) || (fragment.isEmpty())){ + localResources(originalPath.getName()) = amJarRsrc + } else { + localResources(fragment) = amJarRsrc + pathURI = new URI(newPath.toString() + "#" + fragment); + } + val distPath = pathURI.toString() + if (resourceType == LocalResourceType.FILE) { + distFiles match { + case Some(path) => + distFilesFileSizes = Some(distFilesFileSizes.get + "," + + destStatus.getLen().toString()) + distFilesTimeStamps = Some(distFilesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distFiles = Some(path + "," + distPath) + case _ => + distFilesFileSizes = Some(destStatus.getLen().toString()) + distFilesTimeStamps = Some(destStatus.getModificationTime().toString()) + distFiles = Some(distPath) + } + } else { + distArchives match { + case Some(path) => + distArchivesTimeStamps = Some(distArchivesTimeStamps.get + "," + + destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(distArchivesFileSizes.get + "," + + destStatus.getLen().toString()) + distArchives = Some(path + "," + distPath) + case _ => + distArchivesTimeStamps = Some(destStatus.getModificationTime().toString()) + distArchivesFileSizes = Some(destStatus.getLen().toString()) + distArchives = Some(distPath) + } + } + } + + def prepareLocalResources(appId: ApplicationId, sparkStagingDir: String): HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() // Upload Spark and the application JAR to the remote file system // Add them as local resources to the AM val fs = FileSystem.get(conf) @@ -125,33 +192,59 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl } } + val pathSuffix = sparkStagingDir + "/" + appId.toString() + "/" + val dst = new Path(fs.getHomeDirectory(), pathSuffix) + val replication = System.getProperty("spark.yarn.submit.file.replication", "3").toShort + + if (UserGroupInformation.isSecurityEnabled()) { + val dstFs = dst.getFileSystem(conf) + dstFs.addDelegationTokens(delegTokenRenewer, credentials); + } + val localResources = HashMap[String, LocalResource]() + Map("spark.jar" -> System.getenv("SPARK_JAR"), "app.jar" -> args.userJar, "log4j.properties" -> System.getenv("SPARK_LOG4J_CONF")) .foreach { case(destName, _localPath) => val localPath: String = if (_localPath != null) _localPath.trim() else "" if (! localPath.isEmpty()) { val src = new Path(localPath) - val pathSuffix = appName + "/" + appId.getId() + destName - val dst = new Path(fs.getHomeDirectory(), pathSuffix) - logInfo("Uploading " + src + " to " + dst) - fs.copyFromLocalFile(false, true, src, dst) - val destStatus = fs.getFileStatus(dst) - - // get tokens for anything we upload to hdfs - if (UserGroupInformation.isSecurityEnabled()) { - fs.addDelegationTokens(delegTokenRenewer, credentials); - } + val newPath = new Path(dst, destName) + logInfo("Uploading " + src + " to " + newPath) + fs.copyFromLocalFile(false, true, src, newPath) + fs.setReplication(newPath, replication); + val destStatus = fs.getFileStatus(newPath) val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] amJarRsrc.setType(LocalResourceType.FILE) amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) - amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(dst)) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromPath(newPath)) amJarRsrc.setTimestamp(destStatus.getModificationTime()) amJarRsrc.setSize(destStatus.getLen()) - locaResources(destName) = amJarRsrc + localResources(destName) = amJarRsrc + } + } + + // handle any distributed cache files + if ((args.files != null) && (!args.files.isEmpty())){ + args.files.split(',').foreach { case file: String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.FILE, tmp, replication, localResources, + tmpURI.getFragment()) + } + } + + // handle any distributed cache archives + if ((args.archives != null) && (!args.archives.isEmpty())) { + args.archives.split(',').foreach { case file:String => + val tmpURI = new URI(file) + val tmp = new Path(tmpURI) + copyLocalFile(dst, LocalResourceType.ARCHIVE, tmp, replication, + localResources, tmpURI.getFragment()) } } + UserGroupInformation.getCurrentUser().addCredentials(credentials); - return locaResources + return localResources } def setupLaunchEnv(localResources: HashMap[String, LocalResource]): HashMap[String, String] = { @@ -186,6 +279,17 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl env("SPARK_YARN_LOG4J_SIZE") = log4jConfLocalRes.getSize().toString() } + if (distFiles != None) { + env("SPARK_YARN_CACHE_FILES") = distFiles.get + env("SPARK_YARN_CACHE_FILES_TIME_STAMPS") = distFilesTimeStamps.get + env("SPARK_YARN_CACHE_FILES_FILE_SIZES") = distFilesFileSizes.get + } + if (distArchives != None) { + env("SPARK_YARN_CACHE_ARCHIVES") = distArchives.get + env("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS") = distArchivesTimeStamps.get + env("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES") = distArchivesFileSizes.get + } + // allow users to specify some environment variables Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index cd651904d2..30d9b6e60f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -24,6 +24,8 @@ import org.apache.spark.scheduler.{InputFormatInfo, SplitInfo} // TODO: Add code and support for ensuring that yarn resource 'asks' are location aware ! class ClientArguments(val args: Array[String]) { + var files: String = null + var archives: String = null var userJar: String = null var userClass: String = null var userArgs: Seq[String] = Seq[String]() @@ -78,6 +80,14 @@ class ClientArguments(val args: Array[String]) { amQueue = value args = tail + case ("--files") :: value :: tail => + files = value + args = tail + + case ("--archives") :: value :: tail => + archives = value + args = tail + case Nil => if (userJar == null || userClass == null) { printUsageAndExit(1) @@ -108,7 +118,9 @@ class ClientArguments(val args: Array[String]) { " --worker-cores NUM Number of cores for the workers (Default: 1). This is unsused right now.\n" + " --master-memory MEM Memory for Master (e.g. 1000M, 2G) (Default: 512 Mb)\n" + " --worker-memory MEM Memory per Worker (e.g. 1000M, 2G) (Default: 1G)\n" + - " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')" + " --queue QUEUE The hadoop queue to use for allocation requests (Default: 'default')\n" + + " --files file Comma separated list of files to be distributed with the job.\n" + + " --archives archive Comma separated list of archives to be distributed with the job." ) System.exit(exitCode) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala index 6229167cb4..d340b114df 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/WorkerRunnable.scala @@ -136,11 +136,26 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S startReq.setContainerLaunchContext(ctx) cm.startContainer(startReq) } + + private def setupDistributedCache(file: String, + rtype: LocalResourceType, + localResources: HashMap[String, LocalResource], + timestamp: String, + size: String) = { + val uri = new URI(file) + val amJarRsrc = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] + amJarRsrc.setType(rtype) + amJarRsrc.setVisibility(LocalResourceVisibility.APPLICATION) + amJarRsrc.setResource(ConverterUtils.getYarnUrlFromURI(uri)) + amJarRsrc.setTimestamp(timestamp.toLong) + amJarRsrc.setSize(size.toLong) + localResources(uri.getFragment()) = amJarRsrc + } def prepareLocalResources: HashMap[String, LocalResource] = { logInfo("Preparing Local resources") - val locaResources = HashMap[String, LocalResource]() + val localResources = HashMap[String, LocalResource]() // Spark JAR val sparkJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] @@ -150,7 +165,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_JAR_PATH")))) sparkJarResource.setTimestamp(System.getenv("SPARK_YARN_JAR_TIMESTAMP").toLong) sparkJarResource.setSize(System.getenv("SPARK_YARN_JAR_SIZE").toLong) - locaResources("spark.jar") = sparkJarResource + localResources("spark.jar") = sparkJarResource // User JAR val userJarResource = Records.newRecord(classOf[LocalResource]).asInstanceOf[LocalResource] userJarResource.setType(LocalResourceType.FILE) @@ -159,7 +174,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_USERJAR_PATH")))) userJarResource.setTimestamp(System.getenv("SPARK_YARN_USERJAR_TIMESTAMP").toLong) userJarResource.setSize(System.getenv("SPARK_YARN_USERJAR_SIZE").toLong) - locaResources("app.jar") = userJarResource + localResources("app.jar") = userJarResource // Log4j conf - if available if (System.getenv("SPARK_YARN_LOG4J_PATH") != null) { @@ -170,12 +185,31 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S new URI(System.getenv("SPARK_YARN_LOG4J_PATH")))) log4jConfResource.setTimestamp(System.getenv("SPARK_YARN_LOG4J_TIMESTAMP").toLong) log4jConfResource.setSize(System.getenv("SPARK_YARN_LOG4J_SIZE").toLong) - locaResources("log4j.properties") = log4jConfResource + localResources("log4j.properties") = log4jConfResource } + if (System.getenv("SPARK_YARN_CACHE_FILES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_FILES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_FILES_FILE_SIZES").split(',') + val distFiles = System.getenv("SPARK_YARN_CACHE_FILES").split(',') + for( i <- 0 to distFiles.length - 1) { + setupDistributedCache(distFiles(i), LocalResourceType.FILE, localResources, timeStamps(i), + fileSizes(i)) + } + } + + if (System.getenv("SPARK_YARN_CACHE_ARCHIVES") != null) { + val timeStamps = System.getenv("SPARK_YARN_CACHE_ARCHIVES_TIME_STAMPS").split(',') + val fileSizes = System.getenv("SPARK_YARN_CACHE_ARCHIVES_FILE_SIZES").split(',') + val distArchives = System.getenv("SPARK_YARN_CACHE_ARCHIVES").split(',') + for( i <- 0 to distArchives.length - 1) { + setupDistributedCache(distArchives(i), LocalResourceType.ARCHIVE, localResources, + timeStamps(i), fileSizes(i)) + } + } - logInfo("Prepared Local resources " + locaResources) - return locaResources + logInfo("Prepared Local resources " + localResources) + return localResources } def prepareEnvironment: HashMap[String, String] = { |