aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/serializers.py
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 14:48:45 -0800
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2013-01-01 15:05:00 -0800
commitb58340dbd9a741331fc4c3829b08c093560056c2 (patch)
tree52b0e94c47892a8f884b2f80a59ccdb1a428b389 /python/pyspark/serializers.py
parent170e451fbdd308ae77065bd9c0f2bd278abf0cb7 (diff)
downloadspark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.gz
spark-b58340dbd9a741331fc4c3829b08c093560056c2.tar.bz2
spark-b58340dbd9a741331fc4c3829b08c093560056c2.zip
Rename top-level 'pyspark' directory to 'python'
Diffstat (limited to 'python/pyspark/serializers.py')
-rw-r--r--python/pyspark/serializers.py78
1 files changed, 78 insertions, 0 deletions
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
new file mode 100644
index 0000000000..9a5151ea00
--- /dev/null
+++ b/python/pyspark/serializers.py
@@ -0,0 +1,78 @@
+import struct
+import cPickle
+
+
+class Batch(object):
+ """
+ Used to store multiple RDD entries as a single Java object.
+
+ This relieves us from having to explicitly track whether an RDD
+ is stored as batches of objects and avoids problems when processing
+ the union() of batched and unbatched RDDs (e.g. the union() of textFile()
+ with another RDD).
+ """
+ def __init__(self, items):
+ self.items = items
+
+
+def batched(iterator, batchSize):
+ if batchSize == -1: # unlimited batch size
+ yield Batch(list(iterator))
+ else:
+ items = []
+ count = 0
+ for item in iterator:
+ items.append(item)
+ count += 1
+ if count == batchSize:
+ yield Batch(items)
+ items = []
+ count = 0
+ if items:
+ yield Batch(items)
+
+
+def dump_pickle(obj):
+ return cPickle.dumps(obj, 2)
+
+
+load_pickle = cPickle.loads
+
+
+def read_long(stream):
+ length = stream.read(8)
+ if length == "":
+ raise EOFError
+ return struct.unpack("!q", length)[0]
+
+
+def read_int(stream):
+ length = stream.read(4)
+ if length == "":
+ raise EOFError
+ return struct.unpack("!i", length)[0]
+
+def write_with_length(obj, stream):
+ stream.write(struct.pack("!i", len(obj)))
+ stream.write(obj)
+
+
+def read_with_length(stream):
+ length = read_int(stream)
+ obj = stream.read(length)
+ if obj == "":
+ raise EOFError
+ return obj
+
+
+def read_from_pickle_file(stream):
+ try:
+ while True:
+ obj = load_pickle(read_with_length(stream))
+ if type(obj) == Batch: # We don't care about inheritance
+ for item in obj.items:
+ yield item
+ else:
+ yield obj
+ except EOFError:
+ return