aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py22
1 files changed, 13 insertions, 9 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 309f5a9b60..30b834d208 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -233,7 +233,7 @@ class RDD(object):
def _toPickleSerialization(self):
if (self._jrdd_deserializer == PickleSerializer() or
- self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
+ self._jrdd_deserializer == BatchedSerializer(PickleSerializer())):
return self
else:
return self._reserialize(BatchedSerializer(PickleSerializer(), 10))
@@ -1079,7 +1079,9 @@ class RDD(object):
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path,
- outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf)
+ outputFormatClass,
+ keyClass, valueClass,
+ keyConverter, valueConverter, jconf)
def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None):
"""
@@ -1125,8 +1127,10 @@ class RDD(object):
pickledRDD = self._toPickleSerialization()
batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer)
self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path,
- outputFormatClass, keyClass, valueClass, keyConverter, valueConverter,
- jconf, compressionCodecClass)
+ outputFormatClass,
+ keyClass, valueClass,
+ keyConverter, valueConverter,
+ jconf, compressionCodecClass)
def saveAsSequenceFile(self, path, compressionCodecClass=None):
"""
@@ -1348,7 +1352,7 @@ class RDD(object):
outputSerializer = self.ctx._unbatched_serializer
limit = (_parse_memory(self.ctx._conf.get(
- "spark.python.worker.memory", "512m")) / 2)
+ "spark.python.worker.memory", "512m")) / 2)
def add_shuffle_key(split, iterator):
@@ -1430,12 +1434,12 @@ class RDD(object):
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = _parse_memory(self.ctx._conf.get(
- "spark.python.worker.memory", "512m"))
+ "spark.python.worker.memory", "512m"))
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
def combineLocally(iterator):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
- if spill else InMemoryMerger(agg)
+ if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()
@@ -1444,7 +1448,7 @@ class RDD(object):
def _mergeCombiners(iterator):
merger = ExternalMerger(agg, memory, serializer) \
- if spill else InMemoryMerger(agg)
+ if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
return merger.iteritems()
@@ -1588,7 +1592,7 @@ class RDD(object):
"""
for fraction in fractions.values():
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
- return self.mapPartitionsWithIndex( \
+ return self.mapPartitionsWithIndex(
RDDStratifiedSampler(withReplacement, fractions, seed).func, True)
def subtractByKey(self, other, numPartitions=None):