aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorLianhui Wang <lianhuiwang09@gmail.com>2016-04-19 19:48:03 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-04-19 19:48:03 -0700
commit4514aebd1e807a665c270bfdc3f1127b3a1da898 (patch)
treea12c92c3b577cf671694f50f47093dfab95401c1 /yarn
parent3ae25f244bd471ef77002c703f2cc7ed6b524f11 (diff)
downloadspark-4514aebd1e807a665c270bfdc3f1127b3a1da898.tar.gz
spark-4514aebd1e807a665c270bfdc3f1127b3a1da898.tar.bz2
spark-4514aebd1e807a665c270bfdc3f1127b3a1da898.zip
[SPARK-14705][YARN] support Multiple FileSystem for YARN STAGING DIR
## What changes were proposed in this pull request? In SPARK-13063, It makes the SPARK YARN STAGING DIR as configurable. But it only support default FileSystem. If there are many clusters, It can be different FileSystem for different cluster in our spark. ## How was this patch tested? I have tested it successfully with following commands: MASTER=yarn-client ./bin/spark-shell --conf spark.yarn.stagingDir=hdfs:namenode2/temp $SPARK_HOME/bin/spark-submit --conf spark.yarn.stagingDir=hdfs:namenode2/temp cc tgravescs vanzin andrewor14 Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #12473 from lianhuiwang/SPARK-14705.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala49
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala12
2 files changed, 25 insertions, 36 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 7c168ed279..ae5fb6bbd4 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -117,6 +117,11 @@ private[spark] class Client(
private var appId: ApplicationId = null
+ // The app staging dir based on the STAGING_DIR configuration if configured
+ // otherwise based on the users home directory.
+ private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) }
+ .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory())
+
def reportLauncherState(state: SparkAppHandle.State): Unit = {
launcherBackend.setState(state)
}
@@ -179,18 +184,17 @@ private[spark] class Client(
* Cleanup application staging directory.
*/
private def cleanupStagingDir(appId: ApplicationId): Unit = {
- val appStagingDir = getAppStagingDir(appId)
+ val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
try {
val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES)
- val fs = FileSystem.get(hadoopConf)
- val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir)
+ val fs = stagingDirPath.getFileSystem(hadoopConf)
if (!preserveFiles && fs.exists(stagingDirPath)) {
logInfo("Deleting staging directory " + stagingDirPath)
fs.delete(stagingDirPath, true)
}
} catch {
case ioe: IOException =>
- logWarning("Failed to cleanup staging dir " + appStagingDir, ioe)
+ logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe)
}
}
@@ -351,14 +355,13 @@ private[spark] class Client(
* Exposed for testing.
*/
def prepareLocalResources(
- appStagingDir: String,
+ destDir: Path,
pySparkArchives: Seq[String]): HashMap[String, LocalResource] = {
logInfo("Preparing resources for our AM container")
// Upload Spark and the application JAR to the remote file system if necessary,
// and add them as local resources to the application master.
- val fs = FileSystem.get(hadoopConf)
- val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir)
- val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst
+ val fs = destDir.getFileSystem(hadoopConf)
+ val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir
YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials)
// Used to keep track of URIs added to the distributed cache. If the same URI is added
// multiple times, YARN will fail to launch containers for the app with an internal
@@ -372,9 +375,9 @@ private[spark] class Client(
YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials)
val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort)
- .getOrElse(fs.getDefaultReplication(dst))
+ .getOrElse(fs.getDefaultReplication(destDir))
val localResources = HashMap[String, LocalResource]()
- FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION))
+ FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION))
val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]()
@@ -422,7 +425,7 @@ private[spark] class Client(
val localPath = getQualifiedLocalPath(localURI, hadoopConf)
val linkname = targetDir.map(_ + "/").getOrElse("") +
destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
+ val destPath = copyFileToRemote(destDir, localPath, replication)
val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
destFs, hadoopConf, destPath, localResources, resType, linkname, statCache,
@@ -666,17 +669,15 @@ private[spark] class Client(
* Set up the environment for launching our ApplicationMaster container.
*/
private def setupLaunchEnv(
- stagingDir: String,
+ stagingDirPath: Path,
pySparkArchives: Seq[String]): HashMap[String, String] = {
logInfo("Setting up the launch environment for our AM container")
val env = new HashMap[String, String]()
populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH))
env("SPARK_YARN_MODE") = "true"
- env("SPARK_YARN_STAGING_DIR") = stagingDir
+ env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString
env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName()
if (loginFromKeytab) {
- val remoteFs = FileSystem.get(hadoopConf)
- val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir)
val credentialsFile = "credentials-" + UUID.randomUUID().toString
sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString)
logInfo(s"Credentials file set to: $credentialsFile")
@@ -776,15 +777,15 @@ private[spark] class Client(
: ContainerLaunchContext = {
logInfo("Setting up container launch context for our AM")
val appId = newAppResponse.getApplicationId
- val appStagingDir = getAppStagingDir(appId)
+ val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId))
val pySparkArchives =
if (sparkConf.get(IS_PYTHON_APP)) {
findPySparkArchives()
} else {
Nil
}
- val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives)
- val localResources = prepareLocalResources(appStagingDir, pySparkArchives)
+ val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives)
+ val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives)
// Set the environment variables to be passed on to the executors.
distCacheMgr.setDistFilesEnv(launchEnv)
@@ -1446,16 +1447,4 @@ private object Client extends Logging {
uri.startsWith(s"$LOCAL_SCHEME:")
}
- /**
- * Returns the app staging dir based on the STAGING_DIR configuration if configured
- * otherwise based on the users home directory.
- */
- private def getAppStagingDirPath(
- conf: SparkConf,
- fs: FileSystem,
- appStagingDir: String): Path = {
- val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory())
- new Path(baseDir, appStagingDir)
- }
-
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 23050e8c1d..06efd44b5d 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -147,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val tempDir = Utils.createTempDir()
try {
- client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
sparkConf.get(APP_JAR) should be (Some(USER))
// The non-local path should be propagated by name only, since it will end up in the app's
@@ -238,7 +238,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
- client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
assert(sparkConf.get(SPARK_JARS) ===
Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*")))
@@ -260,14 +260,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath())
val client = createClient(sparkConf)
- client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath())
intercept[IllegalArgumentException] {
- client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
}
}
@@ -280,7 +280,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath()))
val client = createClient(sparkConf)
- client.prepareLocalResources(temp.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort())
classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
}
@@ -308,7 +308,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll
val client = createClient(sparkConf)
val tempDir = Utils.createTempDir()
- client.prepareLocalResources(tempDir.getAbsolutePath(), Nil)
+ client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil)
// Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be
// ignored.