aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/java_gateway.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 14:48:45 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 15:05:00 -0800
commitb58340dbd9a741331fc4c3829b08c093560056c2 (patch)
tree52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/pyspark/java_gateway.py
parent170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff)
downloadspark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz
spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2
spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/pyspark/java_gateway.py')
-rw-r--r--python/pyspark/java_gateway.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
new file mode 100644
index 0000000000..2329e536cc
--- /dev/null
+++ b/python/pyspark/java_gateway.py
@@ -0,0 +1,38 @@
+import os
+import sys
+from subprocess import Popen, PIPE
+from threading import Thread
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+
+
+SPARK_HOME = os.environ["SPARK_HOME"]
+
+
+def launch_gateway():
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and SPARK_MEM settings from spark-env.sh
+ command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ sys.stderr.write(line)
+ EchoOutputThread(proc.stdout).start()
+ # Connect to the gateway
+ gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+ # Import the classes used by PySpark
+ java_import(gateway.jvm, "spark.api.java.*")
+ java_import(gateway.jvm, "spark.api.python.*")
+ java_import(gateway.jvm, "scala.Tuple2")
+ return gateway