diff options
Diffstat (limited to 'core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala | 9 |
1 files changed, 7 insertions, 2 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 02799ce009..b0bf4e052b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) var daemonPort: Int = 0 + val pythonPath = PythonUtils.mergePythonPaths( + PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", "")) + def create(): Socket = { if (useDaemon) { createThroughDaemon() @@ -78,9 +81,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) val worker = pb.start() // Redirect the worker's stderr to ours @@ -151,9 +155,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours |