aboutsummaryrefslogtreecommitdiff
path: root/yarn
diff options
context:
space:
mode:
authorMarcelo Vanzin <vanzin@cloudera.com>2015-06-26 08:45:22 -0500
committerImran Rashid <irashid@cloudera.com>2015-06-26 08:45:22 -0500
commit37bf76a2de2143ec6348a3d43b782227849520cc (patch)
tree5c4b07354e7bb3dbf0e896ffe448fa7e6451c324 /yarn
parentc9e05a315a96fbf3026a2b3c6934dd2dec420099 (diff)
downloadspark-37bf76a2de2143ec6348a3d43b782227849520cc.tar.gz
spark-37bf76a2de2143ec6348a3d43b782227849520cc.tar.bz2
spark-37bf76a2de2143ec6348a3d43b782227849520cc.zip
[SPARK-8302] Support heterogeneous cluster install paths on YARN.
Some users have Hadoop installations on different paths across their cluster. Currently, that makes it hard to set up some configuration in Spark since that requires hardcoding paths to jar files or native libraries, which wouldn't work on such a cluster. This change introduces a couple of YARN-specific configurations that instruct the backend to replace certain paths when launching remote processes. That way, if the configuration says the Spark jar is in "/spark/spark.jar", and also says that "/spark" should be replaced with "{{SPARK_INSTALL_DIR}}", YARN will start containers in the NMs with "{{SPARK_INSTALL_DIR}}/spark.jar" as the location of the jar. Coupled with YARN's environment whitelist (which allows certain env variables to be exposed to containers), this allows users to support such heterogeneous environments, as long as a single replacement is enough. (Otherwise, this feature would need to be extended to support multiple path replacements.) Author: Marcelo Vanzin <vanzin@cloudera.com> Closes #6752 from vanzin/SPARK-8302 and squashes the following commits: 4bff8d4 [Marcelo Vanzin] Add docs, rename configs. 0aa2a02 [Marcelo Vanzin] Only do replacement for paths that need it. 2e9cc9d [Marcelo Vanzin] Style. a5e1f68 [Marcelo Vanzin] [SPARK-8302] Support heterogeneous cluster install paths on YARN.
Diffstat (limited to 'yarn')
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala47
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala4
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala19
3 files changed, 58 insertions, 12 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index da1ec2a0fe..67a5c95400 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -676,7 +676,7 @@ private[spark] class Client(
val libraryPaths = Seq(sys.props.get("spark.driver.extraLibraryPath"),
sys.props.get("spark.driver.libraryPath")).flatten
if (libraryPaths.nonEmpty) {
- prefixEnv = Some(Utils.libraryPathEnvPrefix(libraryPaths))
+ prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(libraryPaths)))
}
if (sparkConf.getOption("spark.yarn.am.extraJavaOptions").isDefined) {
logWarning("spark.yarn.am.extraJavaOptions will not take effect in cluster mode")
@@ -698,7 +698,7 @@ private[spark] class Client(
}
sparkConf.getOption("spark.yarn.am.extraLibraryPath").foreach { paths =>
- prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(paths)))
+ prefixEnv = Some(getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(paths))))
}
}
@@ -1106,10 +1106,10 @@ object Client extends Logging {
env: HashMap[String, String],
isAM: Boolean,
extraClassPath: Option[String] = None): Unit = {
- extraClassPath.foreach(addClasspathEntry(_, env))
- addClasspathEntry(
- YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env
- )
+ extraClassPath.foreach { cp =>
+ addClasspathEntry(getClusterPath(sparkConf, cp), env)
+ }
+ addClasspathEntry(YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env)
if (isAM) {
addClasspathEntry(
@@ -1125,12 +1125,14 @@ object Client extends Logging {
getUserClasspath(sparkConf)
}
userClassPath.foreach { x =>
- addFileToClasspath(x, null, env)
+ addFileToClasspath(sparkConf, x, null, env)
}
}
- addFileToClasspath(new URI(sparkJar(sparkConf)), SPARK_JAR, env)
+ addFileToClasspath(sparkConf, new URI(sparkJar(sparkConf)), SPARK_JAR, env)
populateHadoopClasspath(conf, env)
- sys.env.get(ENV_DIST_CLASSPATH).foreach(addClasspathEntry(_, env))
+ sys.env.get(ENV_DIST_CLASSPATH).foreach { cp =>
+ addClasspathEntry(getClusterPath(sparkConf, cp), env)
+ }
}
/**
@@ -1159,16 +1161,18 @@ object Client extends Logging {
*
* If not a "local:" file and no alternate name, the environment is not modified.
*
+ * @parma conf Spark configuration.
* @param uri URI to add to classpath (optional).
* @param fileName Alternate name for the file (optional).
* @param env Map holding the environment variables.
*/
private def addFileToClasspath(
+ conf: SparkConf,
uri: URI,
fileName: String,
env: HashMap[String, String]): Unit = {
if (uri != null && uri.getScheme == LOCAL_SCHEME) {
- addClasspathEntry(uri.getPath, env)
+ addClasspathEntry(getClusterPath(conf, uri.getPath), env)
} else if (fileName != null) {
addClasspathEntry(buildPath(
YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), fileName), env)
@@ -1183,6 +1187,29 @@ object Client extends Logging {
YarnSparkHadoopUtil.addPathToEnvironment(env, Environment.CLASSPATH.name, path)
/**
+ * Returns the path to be sent to the NM for a path that is valid on the gateway.
+ *
+ * This method uses two configuration values:
+ *
+ * - spark.yarn.config.gatewayPath: a string that identifies a portion of the input path that may
+ * only be valid in the gateway node.
+ * - spark.yarn.config.replacementPath: a string with which to replace the gateway path. This may
+ * contain, for example, env variable references, which will be expanded by the NMs when
+ * starting containers.
+ *
+ * If either config is not available, the input path is returned.
+ */
+ def getClusterPath(conf: SparkConf, path: String): String = {
+ val localPath = conf.get("spark.yarn.config.gatewayPath", null)
+ val clusterPath = conf.get("spark.yarn.config.replacementPath", null)
+ if (localPath != null && clusterPath != null) {
+ path.replace(localPath, clusterPath)
+ } else {
+ path
+ }
+ }
+
+ /**
* Obtains token for the Hive metastore and adds them to the credentials.
*/
private def obtainTokenForHiveMetastore(conf: Configuration, credentials: Credentials) {
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index b0937083bc..78e27fb7f3 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -146,7 +146,7 @@ class ExecutorRunnable(
javaOpts ++= Utils.splitCommandString(opts).map(YarnSparkHadoopUtil.escapeForShell)
}
sys.props.get("spark.executor.extraLibraryPath").foreach { p =>
- prefixEnv = Some(Utils.libraryPathEnvPrefix(Seq(p)))
+ prefixEnv = Some(Client.getClusterPath(sparkConf, Utils.libraryPathEnvPrefix(Seq(p))))
}
javaOpts += "-Djava.io.tmpdir=" +
@@ -195,7 +195,7 @@ class ExecutorRunnable(
val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri =>
val absPath =
if (new File(uri.getPath()).isAbsolute()) {
- uri.getPath()
+ Client.getClusterPath(sparkConf, uri.getPath())
} else {
Client.buildPath(Environment.PWD.$(), uri.getPath())
}
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 4ec976aa31..837f8d3fa5 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -151,6 +151,25 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll {
}
}
+ test("Cluster path translation") {
+ val conf = new Configuration()
+ val sparkConf = new SparkConf()
+ .set(Client.CONF_SPARK_JAR, "local:/localPath/spark.jar")
+ .set("spark.yarn.config.gatewayPath", "/localPath")
+ .set("spark.yarn.config.replacementPath", "/remotePath")
+
+ Client.getClusterPath(sparkConf, "/localPath") should be ("/remotePath")
+ Client.getClusterPath(sparkConf, "/localPath/1:/localPath/2") should be (
+ "/remotePath/1:/remotePath/2")
+
+ val env = new MutableHashMap[String, String]()
+ Client.populateClasspath(null, conf, sparkConf, env, false,
+ extraClassPath = Some("/localPath/my1.jar"))
+ val cp = classpath(env)
+ cp should contain ("/remotePath/spark.jar")
+ cp should contain ("/remotePath/my1.jar")
+ }
+
object Fixtures {
val knownDefYarnAppCP: Seq[String] =