aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorAaron Davidson <aaron@databricks.com>2014-07-31 15:31:53 -0700
committerJosh Rosen <joshrosen@apache.org>2014-07-31 15:31:53 -0700
commitef4ff00f87a4e8d38866f163f01741c2673e41da (patch)
treee5674bacee57daaaca98201703197bee0c94a4b8 /python
parent492a195c5c4d68c85b8b1b48e3aa85165bbb5dc3 (diff)
downloadspark-ef4ff00f87a4e8d38866f163f01741c2673e41da.tar.gz
spark-ef4ff00f87a4e8d38866f163f01741c2673e41da.tar.bz2
spark-ef4ff00f87a4e8d38866f163f01741c2673e41da.zip
SPARK-2282: Reuse Socket for sending accumulator updates to Pyspark
Prior to this change, every PySpark task completion opened a new socket to the accumulator server, passed its updates through, and then quit. I'm not entirely sure why PySpark always sends accumulator updates, but regardless this causes a very rapid buildup of ephemeral TCP connections that remain in the TCP_WAIT state for around a minute before being cleaned up. Rather than trying to allow these sockets to be cleaned up faster, this patch simply reuses the connection between tasks completions (since they're fed updates in a single-threaded manner by the DAGScheduler anyway). The only tricky part here was making sure that the AccumulatorServer was able to shutdown in a timely manner (i.e., stop polling for new data), and this was accomplished via minor feats of magic. I have confirmed that this patch eliminates the buildup of ephemeral sockets due to the accumulator updates. However, I did note that there were still significant sockets being created against the PySpark daemon port, but my machine was not able to create enough sockets fast enough to fail. This may not be the last time we've seen this issue, though. Author: Aaron Davidson <aaron@databricks.com> Closes #1503 from aarondav/accum and squashes the following commits: b3e12f7 [Aaron Davidson] SPARK-2282: Reuse Socket for sending accumulator updates to Pyspark
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/accumulators.py34
1 files changed, 27 insertions, 7 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 2204e9c9ca..45d36e5d0e 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -86,6 +86,7 @@ Traceback (most recent call last):
Exception:...
"""
+import select
import struct
import SocketServer
import threading
@@ -209,19 +210,38 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
+ """
+ This handler will keep polling updates from the same socket until the
+ server is shutdown.
+ """
+
def handle(self):
from pyspark.accumulators import _accumulatorRegistry
- num_updates = read_int(self.rfile)
- for _ in range(num_updates):
- (aid, update) = pickleSer._read_with_length(self.rfile)
- _accumulatorRegistry[aid] += update
- # Write a byte in acknowledgement
- self.wfile.write(struct.pack("!b", 1))
+ while not self.server.server_shutdown:
+ # Poll every 1 second for new data -- don't block in case of shutdown.
+ r, _, _ = select.select([self.rfile], [], [], 1)
+ if self.rfile in r:
+ num_updates = read_int(self.rfile)
+ for _ in range(num_updates):
+ (aid, update) = pickleSer._read_with_length(self.rfile)
+ _accumulatorRegistry[aid] += update
+ # Write a byte in acknowledgement
+ self.wfile.write(struct.pack("!b", 1))
+
+class AccumulatorServer(SocketServer.TCPServer):
+ """
+ A simple TCP server that intercepts shutdown() in order to interrupt
+ our continuous polling on the handler.
+ """
+ server_shutdown = False
+ def shutdown(self):
+ self.server_shutdown = True
+ SocketServer.TCPServer.shutdown(self)
def _start_update_server():
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
- server = SocketServer.TCPServer(("localhost", 0), _UpdateRequestHandler)
+ server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)
thread = threading.Thread(target=server.serve_forever)
thread.daemon = True
thread.start()