aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@apache.org>2013-11-10 17:48:27 -0800
committerJosh Rosen <joshrosen@apache.org>2013-11-10 17:53:25 -0800
commit13122ceb8c74dc0c4ad37902a3d1b30bf273cc6a (patch)
tree5c41f195c2c989b0c90770ac2c33960d10266c4f /python
parentffa5bedf46fbc89ad5c5658f3b423dfff49b70f0 (diff)
downloadspark-13122ceb8c74dc0c4ad37902a3d1b30bf273cc6a.tar.gz
spark-13122ceb8c74dc0c4ad37902a3d1b30bf273cc6a.tar.bz2
spark-13122ceb8c74dc0c4ad37902a3d1b30bf273cc6a.zip
FramedSerializer: _dumps => dumps, _loads => loads.
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py2
-rw-r--r--python/pyspark/rdd.py4
-rw-r--r--python/pyspark/serializers.py26
-rw-r--r--python/pyspark/worker.py4
4 files changed, 18 insertions, 18 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 6bb1c6c3a1..cbd41e58c4 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -251,7 +251,7 @@ class SparkContext(object):
sent to each cluster only once.
"""
pickleSer = PickleSerializer()
- pickled = pickleSer._dumps(value)
+ pickled = pickleSer.dumps(value)
jbroadcast = self._jsc.broadcast(bytearray(pickled))
return Broadcast(jbroadcast.id(), value, jbroadcast,
self._pickled_broadcast_vars)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 062f44f81e..957f3f89c0 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -751,7 +751,7 @@ class RDD(object):
buckets[partitionFunc(k) % numPartitions].append((k, v))
for (split, items) in buckets.iteritems():
yield pack_long(split)
- yield outputSerializer._dumps(items)
+ yield outputSerializer.dumps(items)
keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
@@ -970,7 +970,7 @@ class PipelinedRDD(RDD):
else:
serializer = self.ctx.serializer
command = (self.func, self._prev_jrdd_deserializer, serializer)
- pickled_command = CloudPickleSerializer()._dumps(command)
+ pickled_command = CloudPickleSerializer().dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],
self.ctx._gateway._gateway_client)
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index b23804b33c..9338df69ff 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -125,7 +125,7 @@ class FramedSerializer(Serializer):
return
def _write_with_length(self, obj, stream):
- serialized = self._dumps(obj)
+ serialized = self.dumps(obj)
write_int(len(serialized), stream)
stream.write(serialized)
@@ -134,16 +134,16 @@ class FramedSerializer(Serializer):
obj = stream.read(length)
if obj == "":
raise EOFError
- return self._loads(obj)
+ return self.loads(obj)
- def _dumps(self, obj):
+ def dumps(self, obj):
"""
Serialize an object into a byte array.
When batching is used, this will be called with an array of objects.
"""
raise NotImplementedError
- def _loads(self, obj):
+ def loads(self, obj):
"""
Deserialize an object from a byte array.
"""
@@ -228,8 +228,8 @@ class CartesianDeserializer(FramedSerializer):
class NoOpSerializer(FramedSerializer):
- def _loads(self, obj): return obj
- def _dumps(self, obj): return obj
+ def loads(self, obj): return obj
+ def dumps(self, obj): return obj
class PickleSerializer(FramedSerializer):
@@ -242,12 +242,12 @@ class PickleSerializer(FramedSerializer):
not be as fast as more specialized serializers.
"""
- def _dumps(self, obj): return cPickle.dumps(obj, 2)
- _loads = cPickle.loads
+ def dumps(self, obj): return cPickle.dumps(obj, 2)
+ loads = cPickle.loads
class CloudPickleSerializer(PickleSerializer):
- def _dumps(self, obj): return cloudpickle.dumps(obj, 2)
+ def dumps(self, obj): return cloudpickle.dumps(obj, 2)
class MarshalSerializer(FramedSerializer):
@@ -259,8 +259,8 @@ class MarshalSerializer(FramedSerializer):
This serializer is faster than PickleSerializer but supports fewer datatypes.
"""
- _dumps = marshal.dumps
- _loads = marshal.loads
+ dumps = marshal.dumps
+ loads = marshal.loads
class MUTF8Deserializer(Serializer):
@@ -268,14 +268,14 @@ class MUTF8Deserializer(Serializer):
Deserializes streams written by Java's DataOutputStream.writeUTF().
"""
- def _loads(self, stream):
+ def loads(self, stream):
length = struct.unpack('>H', stream.read(2))[0]
return stream.read(length).decode('utf8')
def load_stream(self, stream):
while True:
try:
- yield self._loads(stream)
+ yield self.loads(stream)
except struct.error:
return
except EOFError:
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 2751f1239e..f2b3f3c142 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -51,7 +51,7 @@ def main(infile, outfile):
return
# fetch name of workdir
- spark_files_dir = mutf8_deserializer._loads(infile)
+ spark_files_dir = mutf8_deserializer.loads(infile)
SparkFiles._root_directory = spark_files_dir
SparkFiles._is_running_on_worker = True
@@ -66,7 +66,7 @@ def main(infile, outfile):
sys.path.append(spark_files_dir) # *.py files that were added will be copied here
num_python_includes = read_int(infile)
for _ in range(num_python_includes):
- filename = mutf8_deserializer._loads(infile)
+ filename = mutf8_deserializer.loads(infile)
sys.path.append(os.path.join(spark_files_dir, filename))
command = pickleSer._read_with_length(infile)