aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala')
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala9
1 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 02799ce009..b0bf4e052b 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1))
var daemonPort: Int = 0
+ val pythonPath = PythonUtils.mergePythonPaths(
+ PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", ""))
+
def create(): Socket = {
if (useDaemon) {
createThroughDaemon()
@@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
@@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
+ workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours