diff options
author | Devaraj K <devaraj@apache.org> | 2016-04-05 14:12:00 -0500 |
---|---|---|
committer | Tom Graves <tgraves@yahoo-inc.com> | 2016-04-05 14:12:00 -0500 |
commit | bc36df127d3b9f56b4edaeb5eca7697d4aef761a (patch) | |
tree | 4382521ace7488fe00e7f8b1eb49a6c4f056a9e3 /yarn/src | |
parent | 463bac001171622538fc93d2e31d1a617ab562e6 (diff) | |
download | spark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.tar.gz spark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.tar.bz2 spark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.zip |
[SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as configurable
## What changes were proposed in this pull request?
Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'.
## How was this patch tested?
I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file system’s home directory for the user.
Author: Devaraj K <devaraj@apache.org>
Closes #12082 from devaraj-kavali/SPARK-13063.
Diffstat (limited to 'yarn/src')
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 18 | ||||
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala | 5 |
2 files changed, 20 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 336e29fc6b..5e7e3be08d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -182,8 +182,8 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val stagingDirPath = new Path(appStagingDir) val fs = FileSystem.get(hadoopConf) + val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) @@ -357,7 +357,7 @@ private[spark] class Client( // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. val fs = FileSystem.get(hadoopConf) - val dst = new Path(fs.getHomeDirectory(), appStagingDir) + val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir) val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added @@ -668,7 +668,7 @@ private[spark] class Client( env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir) + val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -1438,4 +1438,16 @@ private object Client extends Logging { uri.startsWith(s"$LOCAL_SCHEME:") } + /** + * Returns the app staging dir based on the STAGING_DIR configuration if configured + * otherwise based on the users home directory. + */ + private def getAppStagingDirPath( + conf: SparkConf, + fs: FileSystem, + appStagingDir: String): Path = { + val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory()) + new Path(baseDir, appStagingDir) + } + } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index a3b9134b58..5188a3e229 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -108,6 +108,11 @@ package object config { .intConf .optional + private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir") + .doc("Staging directory used while submitting applications.") + .stringConf + .optional + /* Cluster-mode launcher configuration. */ private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion") |