diff options
Diffstat (limited to 'python/pyspark/accumulators.py')
-rw-r--r-- | python/pyspark/accumulators.py | 7 |
1 files changed, 7 insertions, 0 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 45d36e5d0e..f133cf6f7b 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -110,6 +110,7 @@ def _deserialize_accumulator(aid, zero_value, accum_param): class Accumulator(object): + """ A shared variable that can be accumulated, i.e., has a commutative and associative "add" operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=} @@ -166,6 +167,7 @@ class Accumulator(object): class AccumulatorParam(object): + """ Helper object that defines how to accumulate values of a given type. """ @@ -186,6 +188,7 @@ class AccumulatorParam(object): class AddingAccumulatorParam(AccumulatorParam): + """ An AccumulatorParam that uses the + operators to add values. Designed for simple types such as integers, floats, and lists. Requires the zero value for the underlying type @@ -210,6 +213,7 @@ COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j) class _UpdateRequestHandler(SocketServer.StreamRequestHandler): + """ This handler will keep polling updates from the same socket until the server is shutdown. @@ -228,7 +232,9 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): # Write a byte in acknowledgement self.wfile.write(struct.pack("!b", 1)) + class AccumulatorServer(SocketServer.TCPServer): + """ A simple TCP server that intercepts shutdown() in order to interrupt our continuous polling on the handler. @@ -239,6 +245,7 @@ class AccumulatorServer(SocketServer.TCPServer): self.server_shutdown = True SocketServer.TCPServer.shutdown(self) + def _start_update_server(): """Start a TCP server to receive accumulator updates in a daemon thread, and returns it""" server = AccumulatorServer(("localhost", 0), _UpdateRequestHandler) |