aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
authorPrabin Banka <prabin.banka@imaginea.com>2014-03-10 13:27:00 -0700
committerMatei Zaharia <matei@databricks.com>2014-03-10 13:27:00 -0700
commite1e09e0ef6b18e034727403d81747d899b042219 (patch)
treec9bc6792ef9f25e4f78e32755ef8c84d9f82ffb0 /python/pyspark/serializers.py
parent5d98cfc1c8fb17fbbeacc7192ac21c0b038cbd16 (diff)
downloadspark-e1e09e0ef6b18e034727403d81747d899b042219.tar.gz
spark-e1e09e0ef6b18e034727403d81747d899b042219.tar.bz2
spark-e1e09e0ef6b18e034727403d81747d899b042219.zip
SPARK-977 Added Python RDD.zip function
was raised earlier as a part of apache/incubator-spark#486 Author: Prabin Banka <prabin.banka@imaginea.com> Closes #76 from prabinb/python-api-zip and squashes the following commits: b1a31a0 [Prabin Banka] Added Python RDD.zip function
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py29
1 files changed, 28 insertions, 1 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 8c6ad79059..12c63f186a 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -204,7 +204,7 @@ class CartesianDeserializer(FramedSerializer):
self.key_ser = key_ser
self.val_ser = val_ser
- def load_stream(self, stream):
+ def prepare_keys_values(self, stream):
key_stream = self.key_ser._load_stream_without_unbatching(stream)
val_stream = self.val_ser._load_stream_without_unbatching(stream)
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
@@ -212,6 +212,10 @@ class CartesianDeserializer(FramedSerializer):
for (keys, vals) in izip(key_stream, val_stream):
keys = keys if key_is_batched else [keys]
vals = vals if val_is_batched else [vals]
+ yield (keys, vals)
+
+ def load_stream(self, stream):
+ for (keys, vals) in self.prepare_keys_values(stream):
for pair in product(keys, vals):
yield pair
@@ -224,6 +228,29 @@ class CartesianDeserializer(FramedSerializer):
(str(self.key_ser), str(self.val_ser))
+class PairDeserializer(CartesianDeserializer):
+ """
+ Deserializes the JavaRDD zip() of two PythonRDDs.
+ """
+
+ def __init__(self, key_ser, val_ser):
+ self.key_ser = key_ser
+ self.val_ser = val_ser
+
+ def load_stream(self, stream):
+ for (keys, vals) in self.prepare_keys_values(stream):
+ for pair in izip(keys, vals):
+ yield pair
+
+ def __eq__(self, other):
+ return isinstance(other, PairDeserializer) and \
+ self.key_ser == other.key_ser and self.val_ser == other.val_ser
+
+ def __str__(self):
+ return "PairDeserializer<%s, %s>" % \
+ (str(self.key_ser), str(self.val_ser))
+
+
class NoOpSerializer(FramedSerializer):
def loads(self, obj): return obj