aboutsummaryrefslogtreecommitdiff
path: root/pyspark/pyspark/java_gateway.py
diff options
context:
space:
mode:
Diffstat (limited to 'pyspark/pyspark/java_gateway.py')
-rw-r--r--pyspark/pyspark/java_gateway.py35
1 files changed, 26 insertions, 9 deletions
diff --git a/pyspark/pyspark/java_gateway.py b/pyspark/pyspark/java_gateway.py
index 3726bcbf17..d4a4434c05 100644
--- a/pyspark/pyspark/java_gateway.py
+++ b/pyspark/pyspark/java_gateway.py
@@ -1,19 +1,36 @@
-import glob
import os
-from py4j.java_gateway import java_import, JavaGateway
+from subprocess import Popen, PIPE
+from threading import Thread
+from py4j.java_gateway import java_import, JavaGateway, GatewayClient
SPARK_HOME = os.environ["SPARK_HOME"]
-assembly_jar = glob.glob(os.path.join(SPARK_HOME, "core/target") + \
- "/spark-core-assembly-*.jar")[0]
- # TODO: what if multiple assembly jars are found?
-
-
def launch_gateway():
- gateway = JavaGateway.launch_gateway(classpath=assembly_jar,
- javaopts=["-Xmx256m"], die_on_exit=True)
+ # Launch the Py4j gateway using Spark's run command so that we pick up the
+ # proper classpath and SPARK_MEM settings from spark-env.sh
+ command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer",
+ "--die-on-broken-pipe", "0"]
+ proc = Popen(command, stdout=PIPE, stdin=PIPE)
+ # Determine which ephemeral port the server started on:
+ port = int(proc.stdout.readline())
+ # Create a thread to echo output from the GatewayServer, which is required
+ # for Java log output to show up:
+ class EchoOutputThread(Thread):
+ def __init__(self, stream):
+ Thread.__init__(self)
+ self.daemon = True
+ self.stream = stream
+
+ def run(self):
+ while True:
+ line = self.stream.readline()
+ print line,
+ EchoOutputThread(proc.stdout).start()
+ # Connect to the gateway
+ gateway = JavaGateway(GatewayClient(port=port))
+ # Import the classes used by PySpark
java_import(gateway.jvm, "spark.api.java.*")
java_import(gateway.jvm, "spark.api.python.*")
java_import(gateway.jvm, "scala.Tuple2")