aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/accumulators.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/accumulators.py')
-rw-r--r--python/pyspark/accumulators.py6
1 files changed, 4 insertions, 2 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index da3d96689a..2204e9c9ca 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -90,9 +90,11 @@ import struct
import SocketServer
import threading
from pyspark.cloudpickle import CloudPickler
-from pyspark.serializers import read_int, read_with_length, load_pickle
+from pyspark.serializers import read_int, PickleSerializer
+pickleSer = PickleSerializer()
+
# Holds accumulators registered on the current machine, keyed by ID. This is then used to send
# the local accumulator updates back to the driver program at the end of a task.
_accumulatorRegistry = {}
@@ -211,7 +213,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler):
from pyspark.accumulators import _accumulatorRegistry
num_updates = read_int(self.rfile)
for _ in range(num_updates):
- (aid, update) = load_pickle(read_with_length(self.rfile))
+ (aid, update) = pickleSer._read_with_length(self.rfile)
_accumulatorRegistry[aid] += update
# Write a byte in acknowledgement
self.wfile.write(struct.pack("!b", 1))