aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
Diffstat (limited to 'core')
-rw-r--r--core/pom.xml42
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala10
2 files changed, 44 insertions, 8 deletions
diff --git a/core/pom.xml b/core/pom.xml
index 73f573a414..822b5b1dd7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -294,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
+ <!-- Unzip py4j so we can include its files in the jar -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>unzip</executable>
+ <workingDirectory>../python</workingDirectory>
+ <arguments>
+ <argument>-o</argument>
+ <argument>lib/py4j*.zip</argument>
+ <argument>-d</argument>
+ <argument>build</argument>
+ </arguments>
+ </configuration>
+ </plugin>
</plugins>
+
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>../python</directory>
+ <includes>
+ <include>pyspark/*.py</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>../python/build</directory>
+ <includes>
+ <include>py4j/*.py</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index a5f0f3d5e7..02799ce009 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
- val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
- val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
- workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
- val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
- val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
- workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours