diff options
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r-- | python/pyspark/serializers.py | 25 |
1 files changed, 24 insertions, 1 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 0ffb41d02f..4afa82f4b2 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -220,6 +220,29 @@ class BatchedSerializer(Serializer): return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize) +class FlattenedValuesSerializer(BatchedSerializer): + + """ + Serializes a stream of list of pairs, split the list of values + which contain more than a certain number of objects to make them + have similar sizes. + """ + def __init__(self, serializer, batchSize=10): + BatchedSerializer.__init__(self, serializer, batchSize) + + def _batched(self, iterator): + n = self.batchSize + for key, values in iterator: + for i in xrange(0, len(values), n): + yield key, values[i:i + n] + + def load_stream(self, stream): + return self.serializer.load_stream(stream) + + def __repr__(self): + return "FlattenedValuesSerializer(%d)" % self.batchSize + + class AutoBatchedSerializer(BatchedSerializer): """ Choose the size of batch automatically based on the size of object @@ -251,7 +274,7 @@ class AutoBatchedSerializer(BatchedSerializer): return (isinstance(other, AutoBatchedSerializer) and other.serializer == self.serializer and other.bestSize == self.bestSize) - def __str__(self): + def __repr__(self): return "AutoBatchedSerializer(%s)" % str(self.serializer) |