aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/accumulators.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/accumulators.py')
-rw-r--r--python/pyspark/accumulators.py7
1 files changed, 7 insertions, 0 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 45d36e5d0e..f133cf6f7b 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param):
class Accumulator(object):
+
"""
A shared variable that can be accumulated, i.e., has a commutative and associative "add"
operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=}
@@ -166,6 +167,7 @@ class Accumulator(object):
class AccumulatorParam(object):
+
"""
Helper object that defines how to accumulate values of a given type.
"""
@@ -186,6 +188,7 @@ class AccumulatorParam(object):
class AddingAccumulatorParam(AccumulatorParam):
+
"""
An AccumulatorParam that uses the + operators to add values. Designed for simple types
such as integers, floats, and lists. Requires the zero value for the underlying type
@@ -210,6 +213,7 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)
class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
+
"""
This handler will keep polling updates from the same socket until the
server is shutdown.
@@ -228,7 +232,9 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
# Write a byte in acknowledgement
self.wfile.write(struct.pack("!b", 1))
+
class AccumulatorServer(SocketServer.TCPServer):
+
"""
A simple TCP server that intercepts shutdown() in order to interrupt
our continuous polling on the handler.
@@ -239,6 +245,7 @@ class AccumulatorServer(SocketServer.TCPServer):
self.server_shutdown = True
SocketServer.TCPServer.shutdown(self)
+
def _start_update_server():
"""Start a TCP server to receive accumulator updates in a daemon thread, and returns it"""
server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler)