aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/java_gateway.py
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-02-18 13:26:12 -0800
commit6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1 (patch)
tree3848e9e09a2c8b7537f4a0635ea0a32daee1f9a8 /python/pyspark/java_gateway.py
parent56b9bd197c522f33e354c2e9ad7e76440cf817e9 (diff)
parent8ad561dc7d6475d7b217ec3f57bac3b584fed31a (diff)
downloadspark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.gz
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.tar.bz2
spark-6a6e6bda5713ccc6da9ca977321a1fcc6d38a1c1.zip
Merge branch 'streaming' into ScrapCode-streaming
Conflicts: streaming/src/main/scala/spark/streaming/dstream/KafkaInputDStream.scala streaming/src/main/scala/spark/streaming/dstream/NetworkInputDStream.scala
Diffstat (limited to 'python/pyspark/java_gateway.py')
-rw-r--r--python/pyspark/java_gateway.py38
1 files changed, 38 insertions, 0 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
new file mode 100644
index 0000000000..2329e536cc
--- /dev/null
+++ b/python/pyspark/java_gateway.py
@@ -0,0 +1,38 @@
+import os
+import sys
+from subprocess import Popen, PIPE
+from threading import Thread
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
+
+
+SPARK_HOME = os.environ["SPARK_HOME"]
+
+
+def launch_gateway():
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and SPARK_MEM settings from spark-env.sh
+ command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ sys.stderr.write(line)
+ EchoOutputThread(proc.stdout).start()
+ # Connect to the gateway
+ gateway = JavaGateway(GatewayClient(port=port), auto_convert=False)
+ # Import the classes used by PySpark
+ java_import(gateway.jvm, "spark.api.java.*")
+ java_import(gateway.jvm, "spark.api.python.*")
+ java_import(gateway.jvm, "scala.Tuple2")
+ return gateway