aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKevinGrealish <KevinGre@microsoft.com>2016-07-27 13:50:31 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-07-27 13:50:31 -0700
commitb14d7b5cf4f173a1e45a4b1ae2a5e4e7ac5e9bb1 (patch)
treec5bf2444ca8bee5fab106b9a10cd26bcd456d2fd
parentbc4851adeb386edc5bef47027a12ca44eda82b09 (diff)
downloadspark-b14d7b5cf4f173a1e45a4b1ae2a5e4e7ac5e9bb1.tar.gz
spark-b14d7b5cf4f173a1e45a4b1ae2a5e4e7ac5e9bb1.tar.bz2
spark-b14d7b5cf4f173a1e45a4b1ae2a5e4e7ac5e9bb1.zip
[SPARK-16110][YARN][PYSPARK] Fix allowing python version to be specified per submit for cluster mode.
## What changes were proposed in this pull request? This fix allows submit of pyspark jobs to specify python 2 or 3. Change ordering in setup for application master environment so env vars PYSPARK_PYTHON and PYSPARK_DRIVER_PYTHON can be overridden by spark.yarn.appMasterEnv.* conf settings. This applies to YARN in cluster mode. This allows them to be set per submission without needing the unset the env vars (which is not always possible - e.g. batch submit with LIVY only exposes the arguments to spark-submit) ## How was this patch tested? Manual and existing unit tests. Author: KevinGrealish <KevinGre@microsoft.com> Closes #13824 from KevinGrealish/SPARK-16110.
-rw-r--r--yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala7
-rw-r--r--yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala26
2 files changed, 27 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 244d1a4e33..348f9bf94a 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -831,8 +831,11 @@ private[spark] class Client(
env("SPARK_JAVA_OPTS") = value
}
// propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode
- sys.env.get("PYSPARK_DRIVER_PYTHON").foreach(env("PYSPARK_DRIVER_PYTHON") = _)
- sys.env.get("PYSPARK_PYTHON").foreach(env("PYSPARK_PYTHON") = _)
+ Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname =>
+ if (!env.contains(envname)) {
+ sys.env.get(envname).foreach(env(envname) = _)
+ }
+ }
}
sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp =>
diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
index 1ccd7e5993..8ab7b21c22 100644
--- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
+++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala
@@ -138,6 +138,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
testPySpark(false)
}
+ test("run Python application in yarn-cluster mode using " +
+ " spark.yarn.appMasterEnv to override local envvar") {
+ testPySpark(
+ clientMode = false,
+ extraConf = Map(
+ "spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON"
+ -> sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python"),
+ "spark.yarn.appMasterEnv.PYSPARK_PYTHON"
+ -> sys.env.getOrElse("PYSPARK_PYTHON", "python")),
+ extraEnv = Map(
+ "PYSPARK_DRIVER_PYTHON" -> "not python",
+ "PYSPARK_PYTHON" -> "not python"))
+ }
+
test("user class path first in client mode") {
testUseClassPathFirst(true)
}
@@ -207,7 +221,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
checkResult(finalState, executorResult, "ORIGINAL")
}
- private def testPySpark(clientMode: Boolean): Unit = {
+ private def testPySpark(
+ clientMode: Boolean,
+ extraConf: Map[String, String] = Map(),
+ extraEnv: Map[String, String] = Map()): Unit = {
val primaryPyFile = new File(tempDir, "test.py")
Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8)
@@ -218,9 +235,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val pythonPath = Seq(
s"$sparkHome/python/lib/py4j-0.10.1-src.zip",
s"$sparkHome/python")
- val extraEnv = Map(
+ val extraEnvVars = Map(
"PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator),
- "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator))
+ "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv
val moduleDir =
if (clientMode) {
@@ -242,7 +259,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(),
sparkArgs = Seq("--py-files" -> pyFiles),
appArgs = Seq(result.getAbsolutePath()),
- extraEnv = extraEnv)
+ extraEnv = extraEnvVars,
+ extraConf = extraConf)
checkResult(finalState, result)
}