aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTao Li <litao@sogou-inc.com>2015-07-08 19:02:24 +0100
committerSean Owen <sowen@cloudera.com>2015-07-08 19:06:39 +0100
commite91d87e6631989527c88d3524152d77e92267aea (patch)
tree95a7af37f6ff1e42816c242a196e34bcb35dd07c
parentde49916ab61a88566a352a6af02319e44d92930f (diff)
downloadspark-e91d87e6631989527c88d3524152d77e92267aea.tar.gz
spark-e91d87e6631989527c88d3524152d77e92267aea.tar.bz2
spark-e91d87e6631989527c88d3524152d77e92267aea.zip
[SPARK-8657] [YARN] Fail to upload resource to viewfs
Fail to upload resource to viewfs in spark-1.4 JIRA Link: https://issues.apache.org/jira/browse/SPARK-8657 Author: Tao Li <litao@sogou-inc.com> Closes #7125 from litao-buptsse/SPARK-8657-for-master and squashes the following commits: 65b13f4 [Tao Li] [SPARK-8657] [YARN] Fail to upload resource to viewfs (cherry picked from commit 26d9b6b8cae9ac6593f78ab98dd45a25d03cf71c) Signed-off-by: Sean Owen <sowen@cloudera.com> # Conflicts: # yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala51
1 files changed, 51 insertions, 0 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 9296e79ccf..38e5926701 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -304,6 +304,57 @@ private[spark] class Client(
}
/**
+ * Distribute a file to the cluster.
+ *
+ * If the file's path is a "local:" URI, it's actually not distributed. Other files are copied
+ * to HDFS (if not already there) and added to the application's distributed cache.
+ *
+ * @param path URI of the file to distribute.
+ * @param resType Type of resource being distributed.
+ * @param destName Name of the file in the distributed cache.
+ * @param targetDir Subdirectory where to place the file.
+ * @param appMasterOnly Whether to distribute only to the AM.
+ * @return A 2-tuple. First item is whether the file is a "local:" URI. Second item is the
+ * localized path for non-local paths, or the input `path` for local paths.
+ * The localized path will be null if the URI has already been added to the cache.
+ */
+ def distribute(
+ path: String,
+ resType: LocalResourceType = LocalResourceType.FILE,
+ destName: Option[String] = None,
+ targetDir: Option[String] = None,
+ appMasterOnly: Boolean = false): (Boolean, String) = {
+ val localURI = new URI(path.trim())
+ if (localURI.getScheme != LOCAL_SCHEME) {
+ if (addDistributedUri(localURI)) {
+ val localPath = getQualifiedLocalPath(localURI, hadoopConf)
+ val linkname = targetDir.map(_ + "/").getOrElse("") +
+ destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName())
+ val destPath = copyFileToRemote(dst, localPath, replication)
+ distCacheMgr.addResource(
+ fs, hadoopConf, destPath, localResources, resType, linkname, statCache,
+ appMasterOnly = appMasterOnly)
+ (false, linkname)
+ } else {
+ (false, null)
+ }
+ } else {
+ (true, path.trim())
+ }
+ }
+
+ // If we passed in a keytab, make sure we copy the keytab to the staging directory on
+ // HDFS, and setup the relevant environment vars, so the AM can login again.
+ if (loginFromKeytab) {
+ logInfo("To enable the AM to login from keytab, credentials are being copied over to the AM" +
+ " via the YARN Secure Distributed Cache.")
+ val (_, localizedPath) = distribute(args.keytab,
+ destName = Some(sparkConf.get("spark.yarn.keytab")),
+ appMasterOnly = true)
+ require(localizedPath != null, "Keytab file already distributed.")
+ }
+
+ /**
* Copy the given main resource to the distributed cache if the scheme is not "local".
* Otherwise, set the corresponding key in our SparkConf to handle it downstream.
* Each resource is represented by a 3-tuple of: