diff options
-rw-r--r-- | yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 7 | ||||
-rw-r--r-- | yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala | 26 |
2 files changed, 27 insertions, 6 deletions
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 244d1a4e33..348f9bf94a 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -831,8 +831,11 @@ private[spark] class Client( env("SPARK_JAVA_OPTS") = value } // propagate PYSPARK_DRIVER_PYTHON and PYSPARK_PYTHON to driver in cluster mode - sys.env.get("PYSPARK_DRIVER_PYTHON").foreach(env("PYSPARK_DRIVER_PYTHON") = _) - sys.env.get("PYSPARK_PYTHON").foreach(env("PYSPARK_PYTHON") = _) + Seq("PYSPARK_DRIVER_PYTHON", "PYSPARK_PYTHON").foreach { envname => + if (!env.contains(envname)) { + sys.env.get(envname).foreach(env(envname) = _) + } + } } sys.env.get(ENV_DIST_CLASSPATH).foreach { dcp => diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 1ccd7e5993..8ab7b21c22 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -138,6 +138,20 @@ class YarnClusterSuite extends BaseYarnClusterSuite { testPySpark(false) } + test("run Python application in yarn-cluster mode using " + + " spark.yarn.appMasterEnv to override local envvar") { + testPySpark( + clientMode = false, + extraConf = Map( + "spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON" + -> sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", "python"), + "spark.yarn.appMasterEnv.PYSPARK_PYTHON" + -> sys.env.getOrElse("PYSPARK_PYTHON", "python")), + extraEnv = Map( + "PYSPARK_DRIVER_PYTHON" -> "not python", + "PYSPARK_PYTHON" -> "not python")) + } + test("user class path first in client mode") { testUseClassPathFirst(true) } @@ -207,7 +221,10 @@ class YarnClusterSuite extends BaseYarnClusterSuite { checkResult(finalState, executorResult, "ORIGINAL") } - private def testPySpark(clientMode: Boolean): Unit = { + private def testPySpark( + clientMode: Boolean, + extraConf: Map[String, String] = Map(), + extraEnv: Map[String, String] = Map()): Unit = { val primaryPyFile = new File(tempDir, "test.py") Files.write(TEST_PYFILE, primaryPyFile, StandardCharsets.UTF_8) @@ -218,9 +235,9 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val pythonPath = Seq( s"$sparkHome/python/lib/py4j-0.10.1-src.zip", s"$sparkHome/python") - val extraEnv = Map( + val extraEnvVars = Map( "PYSPARK_ARCHIVES_PATH" -> pythonPath.map("local:" + _).mkString(File.pathSeparator), - "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) + "PYTHONPATH" -> pythonPath.mkString(File.pathSeparator)) ++ extraEnv val moduleDir = if (clientMode) { @@ -242,7 +259,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite { val finalState = runSpark(clientMode, primaryPyFile.getAbsolutePath(), sparkArgs = Seq("--py-files" -> pyFiles), appArgs = Seq(result.getAbsolutePath()), - extraEnv = extraEnv) + extraEnv = extraEnvVars, + extraConf = extraConf) checkResult(finalState, result) } |