From e1e09e0ef6b18e034727403d81747d899b042219 Mon Sep 17 00:00:00 2001 From: Prabin Banka Date: Mon, 10 Mar 2014 13:27:00 -0700 Subject: SPARK-977 Added Python RDD.zip function was raised earlier as a part of apache/incubator-spark#486 Author: Prabin Banka Closes #76 from prabinb/python-api-zip and squashes the following commits: b1a31a0 [Prabin Banka] Added Python RDD.zip function --- python/pyspark/serializers.py | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) (limited to 'python/pyspark/serializers.py') diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 8c6ad79059..12c63f186a 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer): self.key_ser = key_ser self.val_ser = val_ser - def load_stream(self, stream): + def prepare_keys_values(self, stream): key_stream = self.key_ser._load_stream_without_unbatching(stream) val_stream = self.val_ser._load_stream_without_unbatching(stream) key_is_batched = isinstance(self.key_ser, BatchedSerializer) @@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer): for (keys, vals) in izip(key_stream, val_stream): keys = keys if key_is_batched else [keys] vals = vals if val_is_batched else [vals] + yield (keys, vals) + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): for pair in product(keys, vals): yield pair @@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer): (str(self.key_ser), str(self.val_ser)) +class PairDeserializer(CartesianDeserializer): + """ + Deserializes the JavaRDD zip() of two PythonRDDs. + """ + + def __init__(self, key_ser, val_ser): + self.key_ser = key_ser + self.val_ser = val_ser + + def load_stream(self, stream): + for (keys, vals) in self.prepare_keys_values(stream): + for pair in izip(keys, vals): + yield pair + + def __eq__(self, other): + return isinstance(other, PairDeserializer) and \ + self.key_ser == other.key_ser and self.val_ser == other.val_ser + + def __str__(self): + return "PairDeserializer<%s, %s>" % \ + (str(self.key_ser), str(self.val_ser)) + + class NoOpSerializer(FramedSerializer): def loads(self, obj): return obj -- cgit v1.2.3