From 13122ceb8c74dc0c4ad37902a3d1b30bf273cc6a Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Sun, 10 Nov 2013 17:48:27 -0800 Subject: FramedSerializer: _dumps => dumps, _loads => loads. --- python/pyspark/context.py | 2 +- python/pyspark/rdd.py | 4 ++-- python/pyspark/serializers.py | 26 +++++++++++++------------- python/pyspark/worker.py | 4 ++-- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 6bb1c6c3a1..cbd41e58c4 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -251,7 +251,7 @@ class SparkContext(object): sent to each cluster only once. """ pickleSer = PickleSerializer() - pickled = pickleSer._dumps(value) + pickled = pickleSer.dumps(value) jbroadcast = self._jsc.broadcast(bytearray(pickled)) return Broadcast(jbroadcast.id(), value, jbroadcast, self._pickled_broadcast_vars) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 062f44f81e..957f3f89c0 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -751,7 +751,7 @@ class RDD(object): buckets[partitionFunc(k) % numPartitions].append((k, v)) for (split, items) in buckets.iteritems(): yield pack_long(split) - yield outputSerializer._dumps(items) + yield outputSerializer.dumps(items) keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD() @@ -970,7 +970,7 @@ class PipelinedRDD(RDD): else: serializer = self.ctx.serializer command = (self.func, self._prev_jrdd_deserializer, serializer) - pickled_command = CloudPickleSerializer()._dumps(command) + pickled_command = CloudPickleSerializer().dumps(command) broadcast_vars = ListConverter().convert( [x._jbroadcast for x in self.ctx._pickled_broadcast_vars], self.ctx._gateway._gateway_client) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index b23804b33c..9338df69ff 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -125,7 +125,7 @@ class FramedSerializer(Serializer): return def _write_with_length(self, obj, stream): - serialized = self._dumps(obj) + serialized = self.dumps(obj) write_int(len(serialized), stream) stream.write(serialized) @@ -134,16 +134,16 @@ class FramedSerializer(Serializer): obj = stream.read(length) if obj == "": raise EOFError - return self._loads(obj) + return self.loads(obj) - def _dumps(self, obj): + def dumps(self, obj): """ Serialize an object into a byte array. When batching is used, this will be called with an array of objects. """ raise NotImplementedError - def _loads(self, obj): + def loads(self, obj): """ Deserialize an object from a byte array. """ @@ -228,8 +228,8 @@ class CartesianDeserializer(FramedSerializer): class NoOpSerializer(FramedSerializer): - def _loads(self, obj): return obj - def _dumps(self, obj): return obj + def loads(self, obj): return obj + def dumps(self, obj): return obj class PickleSerializer(FramedSerializer): @@ -242,12 +242,12 @@ class PickleSerializer(FramedSerializer): not be as fast as more specialized serializers. """ - def _dumps(self, obj): return cPickle.dumps(obj, 2) - _loads = cPickle.loads + def dumps(self, obj): return cPickle.dumps(obj, 2) + loads = cPickle.loads class CloudPickleSerializer(PickleSerializer): - def _dumps(self, obj): return cloudpickle.dumps(obj, 2) + def dumps(self, obj): return cloudpickle.dumps(obj, 2) class MarshalSerializer(FramedSerializer): @@ -259,8 +259,8 @@ class MarshalSerializer(FramedSerializer): This serializer is faster than PickleSerializer but supports fewer datatypes. """ - _dumps = marshal.dumps - _loads = marshal.loads + dumps = marshal.dumps + loads = marshal.loads class MUTF8Deserializer(Serializer): @@ -268,14 +268,14 @@ class MUTF8Deserializer(Serializer): Deserializes streams written by Java's DataOutputStream.writeUTF(). """ - def _loads(self, stream): + def loads(self, stream): length = struct.unpack('>H', stream.read(2))[0] return stream.read(length).decode('utf8') def load_stream(self, stream): while True: try: - yield self._loads(stream) + yield self.loads(stream) except struct.error: return except EOFError: diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index 2751f1239e..f2b3f3c142 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -51,7 +51,7 @@ def main(infile, outfile): return # fetch name of workdir - spark_files_dir = mutf8_deserializer._loads(infile) + spark_files_dir = mutf8_deserializer.loads(infile) SparkFiles._root_directory = spark_files_dir SparkFiles._is_running_on_worker = True @@ -66,7 +66,7 @@ def main(infile, outfile): sys.path.append(spark_files_dir) # *.py files that were added will be copied here num_python_includes = read_int(infile) for _ in range(num_python_includes): - filename = mutf8_deserializer._loads(infile) + filename = mutf8_deserializer.loads(infile) sys.path.append(os.path.join(spark_files_dir, filename)) command = pickleSer._read_with_length(infile) -- cgit v1.2.3