aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py25
1 files changed, 24 insertions, 1 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 0ffb41d02f..4afa82f4b2 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -220,6 +220,29 @@ class BatchedSerializer(Serializer):
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
+class FlattenedValuesSerializer(BatchedSerializer):
+
+ """
+ Serializes a stream of list of pairs, split the list of values
+ which contain more than a certain number of objects to make them
+ have similar sizes.
+ """
+ def __init__(self, serializer, batchSize=10):
+ BatchedSerializer.__init__(self, serializer, batchSize)
+
+ def _batched(self, iterator):
+ n = self.batchSize
+ for key, values in iterator:
+ for i in xrange(0, len(values), n):
+ yield key, values[i:i + n]
+
+ def load_stream(self, stream):
+ return self.serializer.load_stream(stream)
+
+ def __repr__(self):
+ return "FlattenedValuesSerializer(%d)" % self.batchSize
+
+
class AutoBatchedSerializer(BatchedSerializer):
"""
Choose the size of batch automatically based on the size of object
@@ -251,7 +274,7 @@ class AutoBatchedSerializer(BatchedSerializer):
return (isinstance(other, AutoBatchedSerializer) and
other.serializer == self.serializer and other.bestSize == self.bestSize)
- def __str__(self):
+ def __repr__(self):
return "AutoBatchedSerializer(%s)" % str(self.serializer)