diff options
author | Tao Li <litao@sogou-inc.com> | 2015-07-08 19:02:24 +0100 |
---|---|---|
committer | Sean Owen <sowen@cloudera.com> | 2015-07-08 19:06:39 +0100 |
commit | e91d87e6631989527c88d3524152d77e92267aea (patch) | |
tree | 95a7af37f6ff1e42816c242a196e34bcb35dd07c | |
parent | de49916ab61a88566a352a6af02319e44d92930f (diff) | |
download | spark-e91d87e6631989527c88d3524152d77e92267aea.tar.gz spark-e91d87e6631989527c88d3524152d77e92267aea.tar.bz2 spark-e91d87e6631989527c88d3524152d77e92267aea.zip |
[SPARK-8657] [YARN] Fail to upload resource to viewfs
Fail to upload resource to viewfs in spark-1.4
JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657
Author: Tao Li <litao@sogou-inc.com>
Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits:
65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs
(cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c)
Signed-off-by: Sean Owen <sowen@cloudera.com>
# Conflicts:
# yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 51 |
1 files changed, 51 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 9296e79ccf..38e5926701 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -304,6 +304,57 @@ private[spark] class Client( } /** + * Distribute a file to the cluster. + * + * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied + * to HDFS (if not already there) and added to the application's distributed cache. + * + * @param path URI of the file to distribute. + * @param resType Type of resource being distributed. + * @param destName Name of the file in the distributed cache. + * @param targetDir Subdirectory where to place the file. + * @param appMasterOnly Whether to distribute only to the AM. + * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the + * localized path for non-local paths, or the input `path` for local paths. + * The localized path will be null if the URI has already been added to the cache. + */ + def distribute( + path: String, + resType: LocalResourceType = LocalResourceType.FILE, + destName: Option[String] = None, + targetDir: Option[String] = None, + appMasterOnly: Boolean = false): (Boolean, String) = { + val localURI = new URI(path.trim()) + if (localURI.getScheme != LOCAL_SCHEME) { + if (addDistributedUri(localURI)) { + val localPath = getQualifiedLocalPath(localURI, hadoopConf) + val linkname = targetDir.map(_ + "/").getOrElse("") + + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache, + appMasterOnly = appMasterOnly) + (false, linkname) + } else { + (false, null) + } + } else { + (true, path.trim()) + } + } + + // If we passed in a keytab, make sure we copy the keytab to the staging directory on + // HDFS, and setup the relevant environment vars, so the AM can login again. + if (loginFromKeytab) { + logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" + + " via the YARN Secure Distributed Cache.") + val (_, localizedPath) = distribute(args.keytab, + destName = Some(sparkConf.get("spark.yarn.keytab")), + appMasterOnly = true) + require(localizedPath != null, "Keytab file already distributed.") + } + + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. * Each resource is represented by a 3-tuple of: |