diff options
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r-- | python/pyspark/rdd.py | 8 |
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e8fcc900ef..309f5a9b60 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -318,9 +318,9 @@ class RDD(object): >>> sorted(rdd.map(lambda x: (x, 1)).collect()) [('a', 1), ('b', 1), ('c', 1)] """ - def func(split, iterator): + def func(_, iterator): return imap(f, iterator) - return PipelinedRDD(self, func, preservesPartitioning) + return self.mapPartitionsWithIndex(func, preservesPartitioning) def flatMap(self, f, preservesPartitioning=False): """ @@ -1184,7 +1184,7 @@ class RDD(object): if not isinstance(x, basestring): x = unicode(x) yield x.encode("utf-8") - keyed = PipelinedRDD(self, func) + keyed = self.mapPartitionsWithIndex(func) keyed._bypass_serializer = True keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path) @@ -1382,7 +1382,7 @@ class RDD(object): yield pack_long(split) yield outputSerializer.dumps(items) - keyed = PipelinedRDD(self, add_shuffle_key) + keyed = self.mapPartitionsWithIndex(add_shuffle_key) keyed._bypass_serializer = True with _JavaStackTrace(self.context) as st: pairRDD = self.ctx._jvm.PairwiseRDD( |