diff options
author | Josh Rosen <joshrosen@apache.org> | 2013-11-05 17:52:39 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@apache.org> | 2013-11-10 16:45:38 -0800 |
commit | cbb7f04aef2220ece93dea9f3fa98b5db5f270d6 (patch) | |
tree | 5feaed6b6064b81272fcb74b48ee2579e32de4e6 /python/pyspark/accumulators.py | |
parent | 7d68a81a8ed5f49fefb3bd0fa0b9d3835cc7d86e (diff) | |
download | spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.gz spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.tar.bz2 spark-cbb7f04aef2220ece93dea9f3fa98b5db5f270d6.zip |
Add custom serializer support to PySpark.
For now, this only adds MarshalSerializer, but it lays the groundwork
for other supporting custom serializers. Many of these mechanisms
can also be used to support deserialization of different data formats
sent by Java, such as data encoded by MsgPack.
This also fixes a bug in SparkContext.union().
Diffstat (limited to 'python/pyspark/accumulators.py')
-rw-r--r-- | python/pyspark/accumulators.py | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index da3d96689a..2204e9c9ca 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -90,9 +90,11 @@ import struct import SocketServer import threading from pyspark.cloudpickle import CloudPickler -from pyspark.serializers import read_int, read_with_length, load_pickle +from pyspark.serializers import read_int, PickleSerializer +pickleSer = PickleSerializer() + # Holds accumulators registered on the current machine, keyed by ID. This is then used to send # the local accumulator updates back to the driver program at the end of a task. _accumulatorRegistry = {} @@ -211,7 +213,7 @@ class _UpdateRequestHandler(SocketServer.StreamRequestHandler): from pyspark.accumulators import _accumulatorRegistry num_updates = read_int(self.rfile) for _ in range(num_updates): - (aid, update) = load_pickle(read_with_length(self.rfile)) + (aid, update) = pickleSer._read_with_length(self.rfile) _accumulatorRegistry[aid] += update # Write a byte in acknowledgement self.wfile.write(struct.pack("!b", 1)) |