diff options
author | Shixiong Zhu <shixiong@databricks.com> | 2016-01-12 14:27:05 -0800 |
---|---|---|
committer | Shixiong Zhu <shixiong@databricks.com> | 2016-01-12 14:27:05 -0800 |
commit | 4f60651cbec1b4c9cc2e6d832ace77e89a233f3a (patch) | |
tree | 44fd285926f1db63488710d610bc9be38fd274b0 /python/pyspark/streaming | |
parent | 8ed5f12d2bb408bd37e4156b5f1bad9a6b8c3cb5 (diff) | |
download | spark-4f60651cbec1b4c9cc2e6d832ace77e89a233f3a.tar.gz spark-4f60651cbec1b4c9cc2e6d832ace77e89a233f3a.tar.bz2 spark-4f60651cbec1b4c9cc2e6d832ace77e89a233f3a.zip |
[SPARK-12652][PYSPARK] Upgrade Py4J to 0.9.1
- [x] Upgrade Py4J to 0.9.1
- [x] SPARK-12657: Revert SPARK-12617
- [x] SPARK-12658: Revert SPARK-12511
- Still keep the change that only reading checkpoint once. This is a manual change and worth to take a look carefully. https://github.com/zsxwing/spark/commit/bfd4b5c040eb29394c3132af3c670b1a7272457c
- [x] Verify no leak any more after reverting our workarounds
Author: Shixiong Zhu <shixiong@databricks.com>
Closes #10692 from zsxwing/py4j-0.9.1.
Diffstat (limited to 'python/pyspark/streaming')
-rw-r--r-- | python/pyspark/streaming/context.py | 89 | ||||
-rw-r--r-- | python/pyspark/streaming/util.py | 3 |
2 files changed, 5 insertions, 87 deletions
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 0f1f005ce3..ec3ad9933c 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -19,7 +19,6 @@ from __future__ import print_function import os import sys -from threading import RLock, Timer from py4j.java_gateway import java_import, JavaObject @@ -33,63 +32,6 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize __all__ = ["StreamingContext"] -class Py4jCallbackConnectionCleaner(object): - - """ - A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617. - It will scan all callback connections every 30 seconds and close the dead connections. - """ - - def __init__(self, gateway): - self._gateway = gateway - self._stopped = False - self._timer = None - self._lock = RLock() - - def start(self): - if self._stopped: - return - - def clean_closed_connections(): - from py4j.java_gateway import quiet_close, quiet_shutdown - - callback_server = self._gateway._callback_server - if callback_server: - with callback_server.lock: - try: - closed_connections = [] - for connection in callback_server.connections: - if not connection.isAlive(): - quiet_close(connection.input) - quiet_shutdown(connection.socket) - quiet_close(connection.socket) - closed_connections.append(connection) - - for closed_connection in closed_connections: - callback_server.connections.remove(closed_connection) - except Exception: - import traceback - traceback.print_exc() - - self._start_timer(clean_closed_connections) - - self._start_timer(clean_closed_connections) - - def _start_timer(self, f): - with self._lock: - if not self._stopped: - self._timer = Timer(30.0, f) - self._timer.daemon = True - self._timer.start() - - def stop(self): - with self._lock: - self._stopped = True - if self._timer: - self._timer.cancel() - self._timer = None - - class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext @@ -105,9 +47,6 @@ class StreamingContext(object): # Reference to a currently active StreamingContext _activeContext = None - # A cleaner to clean leak sockets of callback server every 30 seconds - _py4j_cleaner = None - def __init__(self, sparkContext, batchDuration=None, jssc=None): """ Create a new StreamingContext. @@ -155,34 +94,12 @@ class StreamingContext(object): # get the GatewayServer object in JVM by ID jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client) # update the port of CallbackClient with real port - gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port) - _py4j_cleaner = Py4jCallbackConnectionCleaner(gw) - _py4j_cleaner.start() + jgws.resetCallbackClient(jgws.getCallbackClient().getAddress(), gw._python_proxy_port) # register serializer for TransformFunction # it happens before creating SparkContext when loading from checkpointing - if cls._transformerSerializer is None: - transformer_serializer = TransformFunctionSerializer() - transformer_serializer.init( - SparkContext._active_spark_context, CloudPickleSerializer(), gw) - # SPARK-12511 streaming driver with checkpointing unable to finalize leading to OOM - # There is an issue that Py4J's PythonProxyHandler.finalize blocks forever. - # (https://github.com/bartdag/py4j/pull/184) - # - # Py4j will create a PythonProxyHandler in Java for "transformer_serializer" when - # calling "registerSerializer". If we call "registerSerializer" twice, the second - # PythonProxyHandler will override the first one, then the first one will be GCed and - # trigger "PythonProxyHandler.finalize". To avoid that, we should not call - # "registerSerializer" more than once, so that "PythonProxyHandler" in Java side won't - # be GCed. - # - # TODO Once Py4J fixes this issue, we should upgrade Py4j to the latest version. - transformer_serializer.gateway.jvm.PythonDStream.registerSerializer( - transformer_serializer) - cls._transformerSerializer = transformer_serializer - else: - cls._transformerSerializer.init( - SparkContext._active_spark_context, CloudPickleSerializer(), gw) + cls._transformerSerializer = TransformFunctionSerializer( + SparkContext._active_spark_context, CloudPickleSerializer(), gw) @classmethod def getOrCreate(cls, checkpointPath, setupFunc): diff --git a/python/pyspark/streaming/util.py b/python/pyspark/streaming/util.py index e617fc9ce9..abbbf6eb93 100644 --- a/python/pyspark/streaming/util.py +++ b/python/pyspark/streaming/util.py @@ -89,10 +89,11 @@ class TransformFunctionSerializer(object): it uses this class to invoke Python, which returns the serialized function as a byte array. """ - def init(self, ctx, serializer, gateway=None): + def __init__(self, ctx, serializer, gateway=None): self.ctx = ctx self.serializer = serializer self.gateway = gateway or self.ctx._gateway + self.gateway.jvm.PythonDStream.registerSerializer(self) self.failure = None def dumps(self, id): |