diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 22 |
1 files changed, 13 insertions, 9 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 309f5a9b60..30b834d208 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -233,7 +233,7 @@ class RDD(object): def _toPickleSerialization(self): if (self._jrdd_deserializer == PickleSerializer() or - self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): + self._jrdd_deserializer == BatchedSerializer(PickleSerializer())): return self else: return self._reserialize(BatchedSerializer(PickleSerializer(), 10)) @@ -1079,7 +1079,9 @@ class RDD(object): pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) self.ctx._jvm.PythonRDD.saveAsNewAPIHadoopFile(pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, jconf) + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, jconf) def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ @@ -1125,8 +1127,10 @@ class RDD(object): pickledRDD = self._toPickleSerialization() batched = isinstance(pickledRDD._jrdd_deserializer, BatchedSerializer) self.ctx._jvm.PythonRDD.saveAsHadoopFile(pickledRDD._jrdd, batched, path, - outputFormatClass, keyClass, valueClass, keyConverter, valueConverter, - jconf, compressionCodecClass) + outputFormatClass, + keyClass, valueClass, + keyConverter, valueConverter, + jconf, compressionCodecClass) def saveAsSequenceFile(self, path, compressionCodecClass=None): """ @@ -1348,7 +1352,7 @@ class RDD(object): outputSerializer = self.ctx._unbatched_serializer limit = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) / 2) + "spark.python.worker.memory", "512m")) / 2) def add_shuffle_key(split, iterator): @@ -1430,12 +1434,12 @@ class RDD(object): spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true') memory = _parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m")) + "spark.python.worker.memory", "512m")) agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): merger = ExternalMerger(agg, memory * 0.9, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeValues(iterator) return merger.iteritems() @@ -1444,7 +1448,7 @@ class RDD(object): def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ - if spill else InMemoryMerger(agg) + if spill else InMemoryMerger(agg) merger.mergeCombiners(iterator) return merger.iteritems() @@ -1588,7 +1592,7 @@ class RDD(object): """ for fraction in fractions.values(): assert fraction >= 0.0, "Negative fraction value: %s" % fraction - return self.mapPartitionsWithIndex( \ + return self.mapPartitionsWithIndex( RDDStratifiedSampler(withReplacement, fractions, seed).func, True) def subtractByKey(self, other, numPartitions=None): |