aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/streaming/context.py
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2015-10-20 10:52:49 -0700
committerJosh Rosen <joshrosen@databricks.com>2015-10-20 10:52:49 -0700
commite18b571c3374ecbfc0b20a5064cb58d57a2a7d21 (patch)
treee80aeebc7f8a247dff692f46a2bf7860a0e592dc /python/pyspark/streaming/context.py
parent94139557c56cea318d4a4f82a4deaf72198f349a (diff)
downloadspark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.tar.gz
spark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.tar.bz2
spark-e18b571c3374ecbfc0b20a5064cb58d57a2a7d21.zip
[SPARK-10447][SPARK-3842][PYSPARK] upgrade pyspark to py4j0.9
Upgrade to Py4j0.9 Author: Holden Karau <holden@pigscanfly.ca> Author: Holden Karau <holden@us.ibm.com> Closes #8615 from holdenk/SPARK-10447-upgrade-pyspark-to-py4j0.9.
Diffstat (limited to 'python/pyspark/streaming/context.py')
-rw-r--r--python/pyspark/streaming/context.py54
1 files changed, 8 insertions, 46 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index a8c9ffc235..975c754732 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -32,48 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
__all__ = ["StreamingContext"]
-def _daemonize_callback_server():
- """
- Hack Py4J to daemonize callback server
-
- The thread of callback server has daemon=False, it will block the driver
- from exiting if it's not shutdown. The following code replace `start()`
- of CallbackServer with a new version, which set daemon=True for this
- thread.
-
- Also, it will update the port number (0) with real port
- """
- # TODO: create a patch for Py4J
- import socket
- import py4j.java_gateway
- logger = py4j.java_gateway.logger
- from py4j.java_gateway import Py4JNetworkError
- from threading import Thread
-
- def start(self):
- """Starts the CallbackServer. This method should be called by the
- client instead of run()."""
- self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR,
- 1)
- try:
- self.server_socket.bind((self.address, self.port))
- if not self.port:
- # update port with real port
- self.port = self.server_socket.getsockname()[1]
- except Exception as e:
- msg = 'An error occurred while trying to start the callback server: %s' % e
- logger.exception(msg)
- raise Py4JNetworkError(msg)
-
- # Maybe thread needs to be cleanup up?
- self.thread = Thread(target=self.run)
- self.thread.daemon = True
- self.thread.start()
-
- py4j.java_gateway.CallbackServer.start = start
-
-
class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
@@ -123,10 +81,14 @@ class StreamingContext(object):
# start callback server
# getattr will fallback to JVM, so we cannot test by hasattr()
- if "_callback_server" not in gw.__dict__:
- _daemonize_callback_server()
- # use random port
- gw._start_callback_server(0)
+ if "_callback_server" not in gw.__dict__ or gw._callback_server is None:
+ gw.callback_server_parameters.eager_load = True
+ gw.callback_server_parameters.daemonize = True
+ gw.callback_server_parameters.daemonize_connections = True
+ gw.callback_server_parameters.port = 0
+ gw.start_callback_server(gw.callback_server_parameters)
+ cbport = gw._callback_server.server_socket.getsockname()[1]
+ gw._callback_server.port = cbport
# gateway with real port
gw._python_proxy_port = gw._callback_server.port
# get the GatewayServer object in JVM by ID