aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/running-on-yarn.md26
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala47
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala19
4 files changed, 84 insertions, 12 deletions
diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index 96cf612c54..3f8a093bbe 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -258,6 +258,32 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
Principal to be used to login to KDC, while running on secure HDFS.
</td>
</tr>
+<tr>
+ <td><code>spark.yarn.config.gatewayPath</code></td>
+ <td>(none)</td>
+ <td>
+ A path that is valid on the gateway host (the host where a Spark application is started) but may
+ differ for paths for the same resource in other nodes in the cluster. Coupled with
+ <code>spark.yarn.config.replacementPath</code>, this is used to support clusters with
+ heterogeneous configurations, so that Spark can correctly launch remote processes.
+ <p/>
+ The replacement path normally will contain a reference to some environment variable exported by
+ YARN (and, thus, visible to Spark containers).
+ <p/>
+ For example, if the gateway node has Hadoop libraries installed on <code>/disk1/hadoop</code>, and
+ the location of the Hadoop install is exported by YARN as the <code>HADOOP_HOME</code>
+ environment variable, setting this value to <code>/disk1/hadoop</code> and the replacement path to
+ <code>$HADOOP_HOME</code> will make sure that paths used to launch remote processes properly
+ reference the local YARN configuration.
+ </td>
+</tr>
+<tr>
+ <td><code>spark.yarn.config.replacementPath</code></td>
+ <td>(none)</td>
+ <td>
+ See <code>spark.yarn.config.gatewayPath</code>.
+ </td>
+</tr>
</table>
# Launching Spark on YARN
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index da1ec2a0fe..67a5c95400 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -676,7 +676,7 @@ private[spark] class Client(
val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
- prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+ prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
}
if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
logWarning("spark.yarn.am.extraJavaOptions will not take effect in cluster mode")
@@ -698,7 +698,7 @@ private[spark] class Client(
}
sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
- prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths)))
+ prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
}
}
@@ -1106,10 +1106,10 @@ object Client extends Logging {
env: HashMap[String, String],
isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
- extraClassPath.foreach(addClasspathEntry(_, env))
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
- )
+ extraClassPath.foreach { cp =>
+ addClasspathEntry(getClusterPath(sparkConf, cp), env)
+ }
+ addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
if (isAM) {
addClasspathEntry(
@@ -1125,12 +1125,14 @@ object Client extends Logging {
getUserClasspath(sparkConf)
}
userClassPath.foreach { x =>
- addFileToClasspath(x, null, env)
+ addFileToClasspath(sparkConf, x, null, env)
}
}
- addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
- sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
+ sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
+ addClasspathEntry(getClusterPath(sparkConf, cp), env)
+ }
}
/**
@@ -1159,16 +1161,18 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
+ * @parma conf Spark configuration.
* @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
+ conf: SparkConf,
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
- addClasspathEntry(uri.getPath, env)
+ addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
addClasspathEntry(buildPath(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
@@ -1183,6 +1187,29 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
+ * Returns the path to be sent to the NM for a path that is valid on the gateway.
+ *
+ * This method uses two configuration values:
+ *
+ * - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may
+ * only be valid in the gateway node.
+ * - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may
+ * contain, for example, env variable references, which will be expanded by the NMs when
+ * starting containers.
+ *
+ * If either config is not available, the input path is returned.
+ */
+ def getClusterPath(conf: SparkConf, path: String): String = {
+ val localPath = conf.get("spark.yarn.config.gatewayPath", null)
+ val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
+ if (localPath != null && clusterPath != null) {
+ path.replace(localPath, clusterPath)
+ } else {
+ path
+ }
+ }
+
+ /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b0937083bc..78e27fb7f3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -146,7 +146,7 @@ class ExecutorRunnable(
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
- prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+ prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
}
javaOpts += "-Djava.io.tmpdir=" +
@@ -195,7 +195,7 @@ class ExecutorRunnable(
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
- uri.getPath()
+ Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 4ec976aa31..837f8d3fa5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,6 +151,25 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
}
}
+ test("Cluster path translation") {
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
+ .set("spark.yarn.config.gatewayPath", "/localPath")
+ .set("spark.yarn.config.replacementPath", "/remotePath")
+
+ Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+ Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+ "/remotePath/1:/remotePath/2")
+
+ val env = new MutableHashMap[String, String]()
+ Client.populateClasspath(null, conf, sparkConf, env, false,
+ extraClassPath = Some("/localPath/my1.jar"))
+ val cp = classpath(env)
+ cp should contain ("/remotePath/spark.jar")
+ cp should contain ("/remotePath/my1.jar")
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =