aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-05 13:10:46 -0800
committerDavies Liu <davies.liu@gmail.com>2016-01-05 13:10:46 -0800
commit047a31bb1042867b20132b347b1e08feab4562eb (patch)
tree2bbc3b1a8797f08c8b66f97cf18db010c5ff6c55 /python/pyspark
parentd202ad2fc24b54de38ad7bfb646bf7703069e9f7 (diff)
downloadspark-047a31bb1042867b20132b347b1e08feab4562eb.tar.gz
spark-047a31bb1042867b20132b347b1e08feab4562eb.tar.bz2
spark-047a31bb1042867b20132b347b1e08feab4562eb.zip
[SPARK-12617] [PYSPARK] Clean up the leak sockets of Py4J
This patch added Py4jCallbackConnectionCleaner to clean the leak sockets of Py4J every 30 seconds. This is a workaround before Py4J fixes the leak issue https://github.com/bartdag/py4j/issues/187 Author: Shixiong Zhu <shixiong@databricks.com> Closes #10579 from zsxwing/SPARK-12617.
Diffstat (limited to 'python/pyspark')
-rw-r--r--python/pyspark/context.py61
1 files changed, 61 insertions, 0 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 529d16b480..5e4aeac330 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -54,6 +54,64 @@ DEFAULT_CONFIGS = {
}
+class Py4jCallbackConnectionCleaner(object):
+
+ """
+ A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
+ It will scan all callback connections every 30 seconds and close the dead connections.
+ """
+
+ def __init__(self, gateway):
+ self._gateway = gateway
+ self._stopped = False
+ self._timer = None
+ self._lock = RLock()
+
+ def start(self):
+ if self._stopped:
+ return
+
+ def clean_closed_connections():
+ from py4j.java_gateway import quiet_close, quiet_shutdown
+
+ callback_server = self._gateway._callback_server
+ with callback_server.lock:
+ try:
+ closed_connections = []
+ for connection in callback_server.connections:
+ if not connection.isAlive():
+ quiet_close(connection.input)
+ quiet_shutdown(connection.socket)
+ quiet_close(connection.socket)
+ closed_connections.append(connection)
+
+ for closed_connection in closed_connections:
+ callback_server.connections.remove(closed_connection)
+ except Exception:
+ import traceback
+ traceback.print_exc()
+
+ self._start_timer(clean_closed_connections)
+
+ self._start_timer(clean_closed_connections)
+
+ def _start_timer(self, f):
+ from threading import Timer
+
+ with self._lock:
+ if not self._stopped:
+ self._timer = Timer(30.0, f)
+ self._timer.daemon = True
+ self._timer.start()
+
+ def stop(self):
+ with self._lock:
+ self._stopped = True
+ if self._timer:
+ self._timer.cancel()
+ self._timer = None
+
+
class SparkContext(object):
"""
@@ -68,6 +126,7 @@ class SparkContext(object):
_active_spark_context = None
_lock = RLock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ _py4j_cleaner = None
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
@@ -244,6 +303,8 @@ class SparkContext(object):
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
+ _py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway)
+ _py4j_cleaner.start()
if instance:
if (SparkContext._active_spark_context and