aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-04-29 23:24:34 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-29 23:24:34 -0700
commitff5be9a41e52454e0f9cae83dd1fd50fbeaa684a (patch)
tree5bd17eaa50b3120317370821104c9c6d9e238b56
parent7025dda8fa84b57d6f12bc770df2fa10eef21d88 (diff)
downloadspark-ff5be9a41e52454e0f9cae83dd1fd50fbeaa684a.tar.gz
spark-ff5be9a41e52454e0f9cae83dd1fd50fbeaa684a.tar.bz2
spark-ff5be9a41e52454e0f9cae83dd1fd50fbeaa684a.zip
SPARK-1004. PySpark on YARN
This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo Author: Sandy Ryza <sandy@cloudera.com> Closes #30 from sryza/sandy-spark-1004 and squashes the following commits: 89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time 5165a02 [Sandy Ryza] Fix docs fd0df79 [Sandy Ryza] PySpark on YARN
-rwxr-xr-xbin/pyspark1
-rw-r--r--bin/pyspark2.cmd1
-rw-r--r--core/pom.xml42
-rw-r--r--core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala10
-rw-r--r--docs/python-programming-guide.md3
-rw-r--r--python/.gitignore3
-rw-r--r--python/lib/PY4J_VERSION.txt1
-rw-r--r--python/pyspark/__init__.py7
-rw-r--r--python/pyspark/java_gateway.py29
-rw-r--r--python/pyspark/tests.py4
-rwxr-xr-xsbin/spark-config.sh3
11 files changed, 85 insertions, 19 deletions
diff --git a/bin/pyspark b/bin/pyspark
index cad982bc33..f5558853e8 100755
--- a/bin/pyspark
+++ b/bin/pyspark
@@ -46,6 +46,7 @@ export PYSPARK_PYTHON
# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
+export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH
# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd
index 95791095ec..d7cfd5eec5 100644
--- a/bin/pyspark2.cmd
+++ b/bin/pyspark2.cmd
@@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python
set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
+set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
diff --git a/core/pom.xml b/core/pom.xml
index 73f573a414..822b5b1dd7 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -294,6 +294,48 @@
</environmentVariables>
</configuration>
</plugin>
+ <!-- Unzip py4j so we can include its files in the jar -->
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>exec-maven-plugin</artifactId>
+ <version>1.2.1</version>
+ <executions>
+ <execution>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>exec</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <executable>unzip</executable>
+ <workingDirectory>../python</workingDirectory>
+ <arguments>
+ <argument>-o</argument>
+ <argument>lib/py4j*.zip</argument>
+ <argument>-d</argument>
+ <argument>build</argument>
+ </arguments>
+ </configuration>
+ </plugin>
</plugins>
+
+ <resources>
+ <resource>
+ <directory>src/main/resources</directory>
+ </resource>
+ <resource>
+ <directory>../python</directory>
+ <includes>
+ <include>pyspark/*.py</include>
+ </includes>
+ </resource>
+ <resource>
+ <directory>../python/build</directory>
+ <includes>
+ <include>py4j/*.py</include>
+ </includes>
+ </resource>
+ </resources>
</build>
</project>
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index a5f0f3d5e7..02799ce009 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
- val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
- val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
- workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()
// Redirect the worker's stderr to ours
@@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
- val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
+ val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
- val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
- workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()
// Redirect the stderr to ours
diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md
index 98233bf556..98c456228a 100644
--- a/docs/python-programming-guide.md
+++ b/docs/python-programming-guide.md
@@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.
+# Running PySpark on YARN
+
+To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client".
# Interactive Use
diff --git a/python/.gitignore b/python/.gitignore
index 5c56e638f9..80b361ffbd 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -1,2 +1,5 @@
*.pyc
docs/
+pyspark.egg-info
+build/
+dist/
diff --git a/python/lib/PY4J_VERSION.txt b/python/lib/PY4J_VERSION.txt
deleted file mode 100644
index 04a0cd52a8..0000000000
--- a/python/lib/PY4J_VERSION.txt
+++ /dev/null
@@ -1 +0,0 @@
-b7924aabe9c5e63f0a4d8bbd17019534c7ec014e
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 73fe7378ff..07df8697bd 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -49,13 +49,6 @@ Hive:
Main entry point for accessing data stored in Apache Hive..
"""
-
-
-import sys
-import os
-sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))
-
-
from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 6bb6c877c9..032d960e40 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -24,10 +24,11 @@ from threading import Thread
from py4j.java_gateway import java_import, JavaGateway, GatewayClient
-SPARK_HOME = os.environ["SPARK_HOME"]
+def launch_gateway():
+ SPARK_HOME = os.environ["SPARK_HOME"]
+ set_env_vars_for_yarn()
-def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
@@ -70,3 +71,27 @@ def launch_gateway():
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
+
+def set_env_vars_for_yarn():
+ # Add the spark jar, which includes the pyspark files, to the python path
+ env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
+ if "PYTHONPATH" in env_map:
+ env_map["PYTHONPATH"] += ":spark.jar"
+ else:
+ env_map["PYTHONPATH"] = "spark.jar"
+
+ os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())
+
+def parse_env(env_str):
+ # Turns a comma-separated of env settings into a dict that maps env vars to
+ # their values.
+ env = {}
+ for var_str in env_str.split(","):
+ parts = var_str.split("=")
+ if len(parts) == 2:
+ env[parts[0]] = parts[1]
+ elif len(var_str) > 0:
+ print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
+ sys.exit(1)
+
+ return env
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 527104587f..8cf9d9cf1b 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -30,10 +30,12 @@ import unittest
from pyspark.context import SparkContext
from pyspark.files import SparkFiles
-from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int
+SPARK_HOME = os.environ["SPARK_HOME"]
+
+
class PySparkTestCase(unittest.TestCase):
def setUp(self):
diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh
index cd2c7b7b0d..147b506dd5 100755
--- a/sbin/spark-config.sh
+++ b/sbin/spark-config.sh
@@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
+# Add the PySpark classes to the PYTHONPATH:
+export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
+export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH