aboutsummaryrefslogtreecommitdiff
path: root/python
diff options
context:
space:
mode:
authorKan Zhang <kzhang@apache.org>2014-06-03 18:18:25 -0700
committerMatei Zaharia <matei@databricks.com>2014-06-03 18:18:25 -0700
commit21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f (patch)
tree64b67ee5a6c6048b274747dfa0769afcd9edc9b0 /python
parentf4dd665c85713d4c09731080fca58aee0fa2a85a (diff)
downloadspark-21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f.tar.gz
spark-21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f.tar.bz2
spark-21e40ed88bf2c205c3d7f947fde5d5a6f3e29f7f.zip
[SPARK-1161] Add saveAsPickleFile and SparkContext.pickleFile in Python
Author: Kan Zhang <kzhang@apache.org> Closes #755 from kanzhang/SPARK-1161 and squashes the following commits: 24ed8a2 [Kan Zhang] [SPARK-1161] Fixing doc tests 44e0615 [Kan Zhang] [SPARK-1161] Adding an optional batchSize with default value 10 d929429 [Kan Zhang] [SPARK-1161] Add saveAsObjectFile and SparkContext.objectFile in Python
Diffstat (limited to 'python')
-rw-r--r--python/pyspark/context.py14
-rw-r--r--python/pyspark/rdd.py33
2 files changed, 39 insertions, 8 deletions
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 9ae9305d4f..211918f5a0 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -271,6 +271,20 @@ class SparkContext(object):
jrdd = readRDDFromFile(self._jsc, tempFile.name, numSlices)
return RDD(jrdd, self, serializer)
+ def pickleFile(self, name, minPartitions=None):
+ """
+ Load an RDD previously saved using L{RDD.saveAsPickleFile} method.
+
+ >>> tmpFile = NamedTemporaryFile(delete=True)
+ >>> tmpFile.close()
+ >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5)
+ >>> sorted(sc.pickleFile(tmpFile.name, 3).collect())
+ [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
+ """
+ minPartitions = minPartitions or self.defaultMinPartitions
+ return RDD(self._jsc.objectFile(name, minPartitions), self,
+ BatchedSerializer(PickleSerializer()))
+
def textFile(self, name, minPartitions=None):
"""
Read a text file from HDFS, a local file system (available on all
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 1b3c460dd6..ca0a95578f 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -33,7 +33,8 @@ import heapq
from random import Random
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
- BatchedSerializer, CloudPickleSerializer, PairDeserializer, pack_long
+ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
+ PickleSerializer, pack_long
from pyspark.join import python_join, python_left_outer_join, \
python_right_outer_join, python_cogroup
from pyspark.statcounter import StatCounter
@@ -427,11 +428,14 @@ class RDD(object):
.filter(lambda x: (len(x[1][0]) != 0) and (len(x[1][1]) != 0)) \
.keys()
- def _reserialize(self):
- if self._jrdd_deserializer == self.ctx.serializer:
+ def _reserialize(self, serializer=None):
+ serializer = serializer or self.ctx.serializer
+ if self._jrdd_deserializer == serializer:
return self
else:
- return self.map(lambda x: x, preservesPartitioning=True)
+ converted = self.map(lambda x: x, preservesPartitioning=True)
+ converted._jrdd_deserializer = serializer
+ return converted
def __add__(self, other):
"""
@@ -897,6 +901,20 @@ class RDD(object):
"""
return self.take(1)[0]
+ def saveAsPickleFile(self, path, batchSize=10):
+ """
+ Save this RDD as a SequenceFile of serialized objects. The serializer used is
+ L{pyspark.serializers.PickleSerializer}, default batch size is 10.
+
+ >>> tmpFile = NamedTemporaryFile(delete=True)
+ >>> tmpFile.close()
+ >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
+ >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
+ [1, 2, 'rdd', 'spark']
+ """
+ self._reserialize(BatchedSerializer(PickleSerializer(),
+ batchSize))._jrdd.saveAsObjectFile(path)
+
def saveAsTextFile(self, path):
"""
Save this RDD as a text file, using string representations of elements.
@@ -1421,10 +1439,9 @@ class PipelinedRDD(RDD):
if self._jrdd_val:
return self._jrdd_val
if self._bypass_serializer:
- serializer = NoOpSerializer()
- else:
- serializer = self.ctx.serializer
- command = (self.func, self._prev_jrdd_deserializer, serializer)
+ self._jrdd_deserializer = NoOpSerializer()
+ command = (self.func, self._prev_jrdd_deserializer,
+ self._jrdd_deserializer)
pickled_command = CloudPickleSerializer().dumps(command)
broadcast_vars = ListConverter().convert(
[x._jbroadcast for x in self.ctx._pickled_broadcast_vars],