diff options
author | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
---|---|---|
committer | Prashant Sharma <prashant.s@imaginea.com> | 2013-09-06 17:53:01 +0530 |
commit | 4106ae9fbf8a582697deba2198b3b966dec00bfe (patch) | |
tree | 7c3046faee5f62f9ec4c4176125988d7cb5d70e2 /python/pyspark/java_gateway.py | |
parent | e0dd24dc858777904335218f3001a24bffe73b27 (diff) | |
parent | 5c7494d7c1b7301138fb3dc155a1b0c961126ec6 (diff) | |
download | spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.gz spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.tar.bz2 spark-4106ae9fbf8a582697deba2198b3b966dec00bfe.zip |
Merged with master
Diffstat (limited to 'python/pyspark/java_gateway.py')
-rw-r--r-- | python/pyspark/java_gateway.py | 36 |
1 files changed, 32 insertions, 4 deletions
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 2329e536cc..e615c1e9b6 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -1,5 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + import os import sys +import signal +import platform from subprocess import Popen, PIPE from threading import Thread from py4j.java_gateway import java_import, JavaGateway, GatewayClient @@ -11,9 +30,18 @@ SPARK_HOME = os.environ["SPARK_HOME"] def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and SPARK_MEM settings from spark-env.sh - command = [os.path.join(SPARK_HOME, "run"), "py4j.GatewayServer", + on_windows = platform.system() == "Windows" + script = "spark-class.cmd" if on_windows else "spark-class" + command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] - proc = Popen(command, stdout=PIPE, stdin=PIPE) + if not on_windows: + # Don't send ctrl-c / SIGINT to the Java gateway: + def preexec_func(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + else: + # preexec_fn not supported on Windows + proc = Popen(command, stdout=PIPE, stdin=PIPE) # Determine which ephemeral port the server started on: port = int(proc.stdout.readline()) # Create a thread to echo output from the GatewayServer, which is required @@ -32,7 +60,7 @@ def launch_gateway(): # Connect to the gateway gateway = JavaGateway(GatewayClient(port=port), auto_convert=False) # Import the classes used by PySpark - java_import(gateway.jvm, "spark.api.java.*") - java_import(gateway.jvm, "spark.api.python.*") + java_import(gateway.jvm, "org.apache.spark.api.java.*") + java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "scala.Tuple2") return gateway |