diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-27 22:47:37 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-12-27 22:47:37 -0800 |
commit | 665466dfff4f89196627a0777eabd3d3894cd296 (patch) | |
tree | 7fa580209756c5fdbb0a52930f30959bbbbc2ba3 /pyspark/pyspark/java_gateway.py | |
parent | ac32447cd38beac8f6bc7a90be9fd24666bb46ad (diff) | |
download | spark-665466dfff4f89196627a0777eabd3d3894cd296.tar.gz spark-665466dfff4f89196627a0777eabd3d3894cd296.tar.bz2 spark-665466dfff4f89196627a0777eabd3d3894cd296.zip |
Simplify PySpark installation.
- Bundle Py4J binaries, since it's hard to install
- Uses Spark's `run` script to launch the Py4J
gateway, inheriting the settings in spark-env.sh
With these changes, (hopefully) nothing more than
running `sbt/sbt package` will be necessary to run
PySpark.
Diffstat (limited to 'pyspark/pyspark/java_gateway.py')
-rw-r--r-- | pyspark/pyspark/java_gateway.py | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/pyspark/pyspark/java_gateway.py b/pyspark/pyspark/java_gateway.py index 3726bcbf17..d4a4434c05 100644 --- a/pyspark/pyspark/java_gateway.py +++ b/pyspark/pyspark/java_gateway.py @@ -1,19 +1,36 @@ -import glob import os -from py4j.java_gateway import java_import, JavaGateway +from subprocess import Popen, PIPE +from threading import Thread +from py4j.java_gateway import java_import, JavaGateway, GatewayClient SPARK_HOME = os.environ["SPARK_HOME"] -assembly_jar = glob.glob(os.path.join(SPARK_HOME, "core/target") + \ - "/spark-core-assembly-*.jar")[0] - # TODO: what if multiple assembly jars are found? - - def launch_gateway(): - gateway = JavaGateway.launch_gateway(classpath=assembly_jar, - javaopts=["-Xmx256m"], die_on_exit=True) + # Launch the Py4j gateway using Spark's run command so that we pick up the + # proper classpath and SPARK_MEM settings from spark-env.sh + command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer", + "--die-on-broken-pipe", "0"] + proc = Popen(command, stdout=PIPE, stdin=PIPE) + # Determine which ephemeral port the server started on: + port = int(proc.stdout.readline()) + # Create a thread to echo output from the GatewayServer, which is required + # for Java log output to show up: + class EchoOutputThread(Thread): + def __init__(self, stream): + Thread.__init__(self) + self.daemon = True + self.stream = stream + + def run(self): + while True: + line = self.stream.readline() + print line, + EchoOutputThread(proc.stdout).start() + # Connect to the gateway + gateway = JavaGateway(GatewayClient(port=port)) + # Import the classes used by PySpark java_import(gateway.jvm, "spark.api.java.*") java_import(gateway.jvm, "spark.api.python.*") java_import(gateway.jvm, "scala.Tuple2") |