aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorJeff Zhang <zjffdu@apache.org>2016-08-11 20:08:25 -0700
committerMarcelo Vanzin <vanzin@cloudera.com>2016-08-11 20:08:39 -0700
commit7a9e25c38380e6c62080d62ad38a4830e44fe753 (patch)
treec7ee1437a3dec8abc0fef57b10406a5bd0b72987 /core
parentea0bf91b4a2ca3ef472906e50e31fd6268b6f53e (diff)
downloadspark-7a9e25c38380e6c62080d62ad38a4830e44fe753.tar.gz
spark-7a9e25c38380e6c62080d62ad38a4830e44fe753.tar.bz2
spark-7a9e25c38380e6c62080d62ad38a4830e44fe753.zip
[SPARK-13081][PYSPARK][SPARK_SUBMIT] Allow set pythonExec of driver and executor through conf…
Before this PR, user have to export environment variable to specify the python of driver & executor which is not so convenient for users. This PR is trying to allow user to specify python through configuration "--pyspark-driver-python" & "--pyspark-executor-python" Manually test in local & yarn mode for pyspark-shell and pyspark batch mode. Author: Jeff Zhang <zjffdu@apache.org> Closes #13146 from zjffdu/SPARK-13081.
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala14
-rw-r--r--core/src/main/scala/org/apache/spark/internal/config/package.scala8
-rw-r--r--core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java8
-rw-r--r--core/src/test/scala/org/apache/spark/SparkConfSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala5
5 files changed, 34 insertions, 3 deletions
diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
index 6227a30dc9..0b1cec2df8 100644
--- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala
@@ -24,8 +24,9 @@ import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConverters._
import scala.util.Try
-import org.apache.spark.SparkUserAppException
+import org.apache.spark.{SparkConf, SparkUserAppException}
import org.apache.spark.api.python.PythonUtils
+import org.apache.spark.internal.config._
import org.apache.spark.util.{RedirectThread, Utils}
/**
@@ -37,8 +38,12 @@ object PythonRunner {
val pythonFile = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)
- val pythonExec =
- sys.env.getOrElse("PYSPARK_DRIVER_PYTHON", sys.env.getOrElse("PYSPARK_PYTHON", "python"))
+ val sparkConf = new SparkConf()
+ val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
+ .orElse(sparkConf.get(PYSPARK_PYTHON))
+ .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
+ .orElse(sys.env.get("PYSPARK_PYTHON"))
+ .getOrElse("python")
// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
@@ -77,6 +82,9 @@ object PythonRunner {
// This is equivalent to setting the -u flag; we use it because ipython doesn't support -u:
env.put("PYTHONUNBUFFERED", "YES") // value is needed to be set to a non-empty string
env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort)
+ // pass conf spark.pyspark.python to python process, the only way to pass info to
+ // python process is through environment variable.
+ sparkConf.get(PYSPARK_PYTHON).foreach(env.put("PYSPARK_PYTHON", _))
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
try {
val process = builder.start()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index e646d9964a..be3dac4d24 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -106,4 +106,12 @@ package object config {
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
.createOptional
+
+ private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
+ .stringConf
+ .createOptional
+
+ private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
+ .stringConf
+ .createOptional
}
diff --git a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
index e393db06a0..682d98867b 100644
--- a/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
+++ b/core/src/test/java/org/apache/spark/launcher/SparkLauncherSuite.java
@@ -28,6 +28,8 @@ import org.slf4j.LoggerFactory;
import org.slf4j.bridge.SLF4JBridgeHandler;
import static org.junit.Assert.*;
+import org.apache.spark.internal.config.package$;
+
/**
* These tests require the Spark assembly to be built before they can be run.
*/
@@ -89,6 +91,12 @@ public class SparkLauncherSuite {
launcher.setConf("spark.foo", "foo");
launcher.addSparkArg(opts.CONF, "spark.foo=bar");
assertEquals("bar", launcher.builder.conf.get("spark.foo"));
+
+ launcher.setConf(SparkLauncher.PYSPARK_DRIVER_PYTHON, "python3.4");
+ launcher.setConf(SparkLauncher.PYSPARK_PYTHON, "python3.5");
+ assertEquals("python3.4", launcher.builder.conf.get(
+ package$.MODULE$.PYSPARK_DRIVER_PYTHON().key()));
+ assertEquals("python3.5", launcher.builder.conf.get(package$.MODULE$.PYSPARK_PYTHON().key()));
}
@Test(expected=IllegalStateException.class)
diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
index a883d1b57e..1f0f655a15 100644
--- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala
@@ -51,8 +51,10 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
test("loading from system properties") {
System.setProperty("spark.test.testProperty", "2")
+ System.setProperty("nonspark.test.testProperty", "0")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
+ assert(!conf.contains("nonspark.test.testProperty"))
}
test("initializing without loading defaults") {
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index b2bc886108..961ece3e00 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -31,6 +31,7 @@ import org.apache.spark._
import org.apache.spark.api.r.RUtils
import org.apache.spark.deploy.SparkSubmit._
import org.apache.spark.deploy.SparkSubmitUtils.MavenCoordinate
+import org.apache.spark.internal.config._
import org.apache.spark.internal.Logging
import org.apache.spark.TestUtils.JavaSourceFromString
import org.apache.spark.util.{ResetSystemProperties, Utils}
@@ -512,6 +513,8 @@ class SparkSubmitSuite
val clArgs3 = Seq(
"--master", "local",
"--py-files", pyFiles,
+ "--conf", "spark.pyspark.driver.python=python3.4",
+ "--conf", "spark.pyspark.python=python3.5",
"mister.py"
)
val appArgs3 = new SparkSubmitArguments(clArgs3)
@@ -519,6 +522,8 @@ class SparkSubmitSuite
appArgs3.pyFiles should be (Utils.resolveURIs(pyFiles))
sysProps3("spark.submit.pyFiles") should be (
PythonRunner.formatPaths(Utils.resolveURIs(pyFiles)).mkString(","))
+ sysProps3(PYSPARK_DRIVER_PYTHON.key) should be ("python3.4")
+ sysProps3(PYSPARK_PYTHON.key) should be ("python3.5")
}
test("resolves config paths correctly") {