diff options
author | Tao Li <litao@sogou-inc.com> | 2015-07-08 19:02:24 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-07-08 19:23:29 +0100 |
commit | e4313db38e81f6288f1704c22e17d0c6e81b4d75 (patch) | |
tree | b7b9399071df44bace39110295fef6b723363390 | |
parent | e91d87e6631989527c88d3524152d77e92267aea (diff) | |
download | spark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.tar.gz spark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.tar.bz2 spark-e4313db38e81f6288f1704c22e17d0c6e81b4d75.zip |
[SPARK-8657] [YARN] [HOTFIX] Fail to upload resource to viewfs
Fail to upload resource to viewfs in spark-1.4
JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657
Author: Tao Li <litao@sogou-inc.com>
Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits:
65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs
(cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c)
Signed-off-by: Sean Owen <sowen@cloudera.com>
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 57 |
1 files changed, 4 insertions, 53 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 38e5926701..cc0aa453ca 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -304,57 +304,6 @@ private[spark] class Client( } /** - * Distribute a file to the cluster. - * - * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied - * to HDFS (if not already there) and added to the application's distributed cache. - * - * @param path URI of the file to distribute. - * @param resType Type of resource being distributed. - * @param destName Name of the file in the distributed cache. - * @param targetDir Subdirectory where to place the file. - * @param appMasterOnly Whether to distribute only to the AM. - * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the - * localized path for non-local paths, or the input `path` for local paths. - * The localized path will be null if the URI has already been added to the cache. - */ - def distribute( - path: String, - resType: LocalResourceType = LocalResourceType.FILE, - destName: Option[String] = None, - targetDir: Option[String] = None, - appMasterOnly: Boolean = false): (Boolean, String) = { - val localURI = new URI(path.trim()) - if (localURI.getScheme != LOCAL_SCHEME) { - if (addDistributedUri(localURI)) { - val localPath = getQualifiedLocalPath(localURI, hadoopConf) - val linkname = targetDir.map(_ + "/").getOrElse("") + - destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache, - appMasterOnly = appMasterOnly) - (false, linkname) - } else { - (false, null) - } - } else { - (true, path.trim()) - } - } - - // If we passed in a keytab, make sure we copy the keytab to the staging directory on - // HDFS, and setup the relevant environment vars, so the AM can login again. - if (loginFromKeytab) { - logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + - " via the YARN Secure Distributed Cache.") - val (_, localizedPath) = distribute(args.keytab, - destName = Some(sparkConf.get("spark.yarn.keytab")), - appMasterOnly = true) - require(localizedPath != null, "Keytab file already distributed.") - } - - /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. * Each resource is represented by a 3-tuple of: @@ -389,7 +338,8 @@ private[spark] class Client( createConfArchive().foreach { file => require(addDistributedUri(file.toURI())) val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) - distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) } @@ -414,8 +364,9 @@ private[spark] class Client( val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyFileToRemote(dst, localPath, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) + destFs, hadoopConf, destPath, localResources, resType, linkname, statCache) if (addToClasspath) { cachedSecondaryJarLinks += linkname } |