aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorDevaraj K <devaraj@apache.org>2016-04-05 14:12:00 -0500
committerTom Graves <tgraves@yahoo-inc.com>2016-04-05 14:12:00 -0500
commitbc36df127d3b9f56b4edaeb5eca7697d4aef761a (patch)
tree4382521ace7488fe00e7f8b1eb49a6c4f056a9e3 /yarn
parent463bac001171622538fc93d2e31d1a617ab562e6 (diff)
downloadspark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.tar.gz
spark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.tar.bz2
spark-bc36df127d3b9f56b4edaeb5eca7697d4aef761a.zip
[SPARK-13063][YARN] Make the SPARK YARN STAGING DIR as configurable
## What changes were proposed in this pull request? Made the SPARK YARN STAGING DIR as configurable with the configuration as 'spark.yarn.staging-dir'. ## How was this patch tested? I have verified it manually by running applications on yarn, If the 'spark.yarn.staging-dir' is configured then the value used as staging directory otherwise uses the default value i.e. file system’s home directory for the user. Author: Devaraj K <devaraj@apache.org> Closes #12082 from devaraj-kavali/SPARK-13063.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala18
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala5
2 files changed, 20 insertions, 3 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 336e29fc6b..5e7e3be08d 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -182,8 +182,8 @@ private[spark] class Client(
val appStagingDir = getAppStagingDir(appId)
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
- val stagingDirPath = new Path(appStagingDir)
val fs = FileSystem.get(hadoopConf)
+ val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
@@ -357,7 +357,7 @@ private[spark] class Client(
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
val fs = FileSystem.get(hadoopConf)
- val dst = new Path(fs.getHomeDirectory(), appStagingDir)
+ val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
@@ -668,7 +668,7 @@ private[spark] class Client(
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
val remoteFs = FileSystem.get(hadoopConf)
- val stagingDirPath = new Path(remoteFs.getHomeDirectory, stagingDir)
+ val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
@@ -1438,4 +1438,16 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}
+ /**
+ * Returns the app staging dir based on the STAGING_DIR configuration if configured
+ * otherwise based on the users home directory.
+ */
+ private def getAppStagingDirPath(
+ conf: SparkConf,
+ fs: FileSystem,
+ appStagingDir: String): Path = {
+ val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
+ new Path(baseDir, appStagingDir)
+ }
+
}
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
index a3b9134b58..5188a3e229 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala
@@ -108,6 +108,11 @@ package object config {
.intConf
.optional
+ private[spark] val STAGING_DIR = ConfigBuilder("spark.yarn.stagingDir")
+ .doc("Staging directory used while submitting applications.")
+ .stringConf
+ .optional
+
/* Cluster-mode launcher configuration. */
private[spark] val WAIT_FOR_APP_COMPLETION = ConfigBuilder("spark.yarn.submit.waitAppCompletion")