aboutsummaryrefslogtreecommitdiff
path: root/pyspark/pyspark/java_gateway.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 22:47:37 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-12-27 22:47:37 -0800
commit665466dfff4f89196627a0777eabd3d3894cd296 (patch)
tree7fa580209756c5fdbb0a52930f30959bbbbc2ba3 /pyspark/pyspark/java_gateway.py
parentac32447cd38beac8f6bc7a90be9fd24666bb46ad (diff)
downloadspark-665466dfff4f89196627a0777eabd3d3894cd296.tar.gz
spark-665466dfff4f89196627a0777eabd3d3894cd296.tar.bz2
spark-665466dfff4f89196627a0777eabd3d3894cd296.zip
Simplify PySpark installation.
- Bundle Py4J binaries, since it's hard to install - Uses Spark's `run` script to launch the Py4J gateway, inheriting the settings in spark-env.sh With these changes, (hopefully) nothing more than running `sbt/sbt package` will be necessary to run PySpark.
Diffstat (limited to 'pyspark/pyspark/java_gateway.py')
-rw-r--r--pyspark/pyspark/java_gateway.py35
1 files changed, 26 insertions, 9 deletions
diff --git a/pyspark/pyspark/java_gateway.py b/pyspark/pyspark/java_gateway.py
index 3726bcbf17..d4a4434c05 100644
--- a/pyspark/pyspark/java_gateway.py
+++ b/pyspark/pyspark/java_gateway.py
@@ -1,19 +1,36 @@
-import glob
import os
-from py4j.java_gateway import java_import, JavaGateway
+from subprocess import Popen, PIPE
+from threading import Thread
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
SPARK_HOME = os.environ["SPARK_HOME"]
-assembly_jar = glob.glob(os.path.join(SPARK_HOME, "core/target") + \
- "/spark-core-assembly-*.jar")[0]
- # TODO: what if multiple assembly jars are found?
-
-
def launch_gateway():
- gateway = JavaGateway.launch_gateway(classpath=assembly_jar,
- javaopts=["-Xmx256m"], die_on_exit=True)
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and SPARK_MEM settings from spark-env.sh
+ command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ print line,
+ EchoOutputThread(proc.stdout).start()
+ # Connect to the gateway
+ gateway = JavaGateway(GatewayClient(port=port))
+ # Import the classes used by PySpark
java_import(gateway.jvm, "spark.api.java.*")
java_import(gateway.jvm, "spark.api.python.*")
java_import(gateway.jvm, "scala.Tuple2")