aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTao Li <litao@sogou-inc.com>2015-07-08 19:02:24 +0100
committerSean Owen <sowen@cloudera.com>2015-07-08 19:23:29 +0100
commite4313db38e81f6288f1704c22e17d0c6e81b4d75 (patch)
treeb7b9399071df44bace39110295fef6b723363390
parente91d87e6631989527c88d3524152d77e92267aea (diff)
downloadspark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.tar.gz
spark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.tar.bz2
spark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.zip
[SPARK-8657] [YARN] [HOTFIX] Fail to upload resource to viewfs
Fail to upload resource to viewfs in spark-1.4 JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657 Author: Tao Li <litao@sogou-inc.com> Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits: 65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs (cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c) Signed-off-by: Sean Owen <sowen@cloudera.com>
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala57
1 files changed, 4 insertions, 53 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 38e5926701..cc0aa453ca 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -304,57 +304,6 @@ private[spark] class Client(
}
/**
- * Distribute a file to the cluster.
- *
- * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
- * to HDFS (if not already there) and added to the application's distributed cache.
- *
- * @param path URI of the file to distribute.
- * @param resType Type of resource being distributed.
- * @param destName Name of the file in the distributed cache.
- * @param targetDir Subdirectory where to place the file.
- * @param appMasterOnly Whether to distribute only to the AM.
- * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
- * localized path for non-local paths, or the input `path` for local paths.
- * The localized path will be null if the URI has already been added to the cache.
- */
- def distribute(
- path: String,
- resType: LocalResourceType = LocalResourceType.FILE,
- destName: Option[String] = None,
- targetDir: Option[String] = None,
- appMasterOnly: Boolean = false): (Boolean, String) = {
- val localURI = new URI(path.trim())
- if (localURI.getScheme != LOCAL_SCHEME) {
- if (addDistributedUri(localURI)) {
- val localPath = getQualifiedLocalPath(localURI, hadoopConf)
- val linkname = targetDir.map(_ + "/").getOrElse("") +
- destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
- val destPath = copyFileToRemote(dst, localPath, replication)
- distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
- appMasterOnly = appMasterOnly)
- (false, linkname)
- } else {
- (false, null)
- }
- } else {
- (true, path.trim())
- }
- }
-
- // If we passed in a keytab, make sure we copy the keytab to the staging directory on
- // HDFS, and setup the relevant environment vars, so the AM can login again.
- if (loginFromKeytab) {
- logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
- " via the YARN Secure Distributed Cache.")
- val (_, localizedPath) = distribute(args.keytab,
- destName = Some(sparkConf.get("spark.yarn.keytab")),
- appMasterOnly = true)
- require(localizedPath != null, "Keytab file already distributed.")
- }
-
- /**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of:
@@ -389,7 +338,8 @@ private[spark] class Client(
createConfArchive().foreach { file =>
require(addDistributedUri(file.toURI()))
val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication)
- distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
+ distCacheMgr.addResource(destFs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE,
LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true)
}
@@ -414,8 +364,9 @@ private[spark] class Client(
val localPath = new Path(localURI)
val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName())
val destPath = copyFileToRemote(dst, localPath, replication)
+ val destFs = FileSystem.get(destPath.toUri(), hadoopConf)
distCacheMgr.addResource(
- fs, hadoopConf, destPath, localResources, resType, linkname, statCache)
+ destFs, hadoopConf, destPath, localResources, resType, linkname, statCache)
if (addToClasspath) {
cachedSecondaryJarLinks += linkname
}