aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorShixiong Zhu <shixiong@databricks.com>2016-01-06 12:03:01 -0800
committerShixiong Zhu <shixiong@databricks.com>2016-01-06 12:03:01 -0800
commit1e6648d62fb82b708ea54c51cd23bfe4f542856e (patch)
tree4bd2325f33f643b4eac67887e094217fbf1a42a5 /python
parentf82ebb15224ec5375f25f67d598ec3ef1cb65210 (diff)
downloadspark-1e6648d62fb82b708ea54c51cd23bfe4f542856e.tar.gz
spark-1e6648d62fb82b708ea54c51cd23bfe4f542856e.tar.bz2
spark-1e6648d62fb82b708ea54c51cd23bfe4f542856e.zip
[SPARK-12617][PYSPARK] Move Py4jCallbackConnectionCleaner to Streaming
Move Py4jCallbackConnectionCleaner to Streaming because the callback server starts only in StreamingContext. Author: Shixiong Zhu <shixiong@databricks.com> Closes #10621 from zsxwing/SPARK-12617-2.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py61
-rw-r--r--python/pyspark/streaming/context.py63
2 files changed, 63 insertions, 61 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 5e4aeac330..529d16b480 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -54,64 +54,6 @@ DEFAULT_CONFIGS = {
}
-class Py4jCallbackConnectionCleaner(object):
-
- """
- A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
- It will scan all callback connections every 30 seconds and close the dead connections.
- """
-
- def __init__(self, gateway):
- self._gateway = gateway
- self._stopped = False
- self._timer = None
- self._lock = RLock()
-
- def start(self):
- if self._stopped:
- return
-
- def clean_closed_connections():
- from py4j.java_gateway import quiet_close, quiet_shutdown
-
- callback_server = self._gateway._callback_server
- with callback_server.lock:
- try:
- closed_connections = []
- for connection in callback_server.connections:
- if not connection.isAlive():
- quiet_close(connection.input)
- quiet_shutdown(connection.socket)
- quiet_close(connection.socket)
- closed_connections.append(connection)
-
- for closed_connection in closed_connections:
- callback_server.connections.remove(closed_connection)
- except Exception:
- import traceback
- traceback.print_exc()
-
- self._start_timer(clean_closed_connections)
-
- self._start_timer(clean_closed_connections)
-
- def _start_timer(self, f):
- from threading import Timer
-
- with self._lock:
- if not self._stopped:
- self._timer = Timer(30.0, f)
- self._timer.daemon = True
- self._timer.start()
-
- def stop(self):
- with self._lock:
- self._stopped = True
- if self._timer:
- self._timer.cancel()
- self._timer = None
-
-
class SparkContext(object):
"""
@@ -126,7 +68,6 @@ class SparkContext(object):
_active_spark_context = None
_lock = RLock()
_python_includes = None # zip and egg files that need to be added to PYTHONPATH
- _py4j_cleaner = None
PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
@@ -303,8 +244,6 @@ class SparkContext(object):
if not SparkContext._gateway:
SparkContext._gateway = gateway or launch_gateway()
SparkContext._jvm = SparkContext._gateway.jvm
- _py4j_cleaner = Py4jCallbackConnectionCleaner(SparkContext._gateway)
- _py4j_cleaner.start()
if instance:
if (SparkContext._active_spark_context and
diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py
index 5cc4bbde39..0f1f005ce3 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -19,6 +19,7 @@ from __future__ import print_function
import os
import sys
+from threading import RLock, Timer
from py4j.java_gateway import java_import, JavaObject
@@ -32,6 +33,63 @@ from pyspark.streaming.util import TransformFunction, TransformFunctionSerialize
__all__ = ["StreamingContext"]
+class Py4jCallbackConnectionCleaner(object):
+
+ """
+ A cleaner to clean up callback connections that are not closed by Py4j. See SPARK-12617.
+ It will scan all callback connections every 30 seconds and close the dead connections.
+ """
+
+ def __init__(self, gateway):
+ self._gateway = gateway
+ self._stopped = False
+ self._timer = None
+ self._lock = RLock()
+
+ def start(self):
+ if self._stopped:
+ return
+
+ def clean_closed_connections():
+ from py4j.java_gateway import quiet_close, quiet_shutdown
+
+ callback_server = self._gateway._callback_server
+ if callback_server:
+ with callback_server.lock:
+ try:
+ closed_connections = []
+ for connection in callback_server.connections:
+ if not connection.isAlive():
+ quiet_close(connection.input)
+ quiet_shutdown(connection.socket)
+ quiet_close(connection.socket)
+ closed_connections.append(connection)
+
+ for closed_connection in closed_connections:
+ callback_server.connections.remove(closed_connection)
+ except Exception:
+ import traceback
+ traceback.print_exc()
+
+ self._start_timer(clean_closed_connections)
+
+ self._start_timer(clean_closed_connections)
+
+ def _start_timer(self, f):
+ with self._lock:
+ if not self._stopped:
+ self._timer = Timer(30.0, f)
+ self._timer.daemon = True
+ self._timer.start()
+
+ def stop(self):
+ with self._lock:
+ self._stopped = True
+ if self._timer:
+ self._timer.cancel()
+ self._timer = None
+
+
class StreamingContext(object):
"""
Main entry point for Spark Streaming functionality. A StreamingContext
@@ -47,6 +105,9 @@ class StreamingContext(object):
# Reference to a currently active StreamingContext
_activeContext = None
+ # A cleaner to clean leak sockets of callback server every 30 seconds
+ _py4j_cleaner = None
+
def __init__(self, sparkContext, batchDuration=None, jssc=None):
"""
Create a new StreamingContext.
@@ -95,6 +156,8 @@ class StreamingContext(object):
jgws = JavaObject("GATEWAY_SERVER", gw._gateway_client)
# update the port of CallbackClient with real port
gw.jvm.PythonDStream.updatePythonGatewayPort(jgws, gw._python_proxy_port)
+ _py4j_cleaner = Py4jCallbackConnectionCleaner(gw)
+ _py4j_cleaner.start()
# register serializer for TransformFunction
# it happens before creating SparkContext when loading from checkpointing