From bc36df127d3b9f56b4edaeb5eca7697d4aef761a Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 5 Apr 2016 14:12:00 -0500 Subject: [SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as configurable MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What changes were proposed in this pull request? Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'. ## How was this patch tested? I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file system’s home directory for the user. Author: Devaraj K Closes #12082 from devaraj-kavali/SPARK-13063. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 18 +++++++++++++++--- .../scala/org/apache/spark/deploy/yarn/config.scala | 5 +++++ 2 files changed, 20 insertions(+), 3 deletions(-) (limited to 'yarn') diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 336e29fc6b..5e7e3be08d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -182,8 +182,8 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val stagingDirPath = new Path(appStagingDir) val fs = FileSystem.get(hadoopConf) + val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) @@ -357,7 +357,7 @@ private[spark] class Client( // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) - val dst = new Path(fs.getHomeDirectory(), appStagingDir) + val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir) val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added @@ -668,7 +668,7 @@ private[spark] class Client( env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) + val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -1438,4 +1438,16 @@ private object Client extends Logging { uri.startsWith(s"$LOCAL_SCHEME:") } + /** + * Returns the app staging dir based on the STAGING_DIR configuration if configured + * otherwise based on the users home directory. + */ + private def getAppStagingDirPath( + conf: SparkConf, + fs: FileSystem, + appStagingDir: String): Path = { + val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory()) + new Path(baseDir, appStagingDir) + } + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index a3b9134b58..5188a3e229 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -108,6 +108,11 @@ package object config { .intConf .optional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .optional + /* Cluster-mode launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") -- cgit v1.2.3