aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py8
1 files changed, 4 insertions, 4 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index e8fcc900ef..309f5a9b60 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -318,9 +318,9 @@ class RDD(object):
>>> sorted(rdd.map(lambda x: (x, 1)).collect())
[('a', 1), ('b', 1), ('c', 1)]
"""
- def func(split, iterator):
+ def func(_, iterator):
return imap(f, iterator)
- return PipelinedRDD(self, func, preservesPartitioning)
+ return self.mapPartitionsWithIndex(func, preservesPartitioning)
def flatMap(self, f, preservesPartitioning=False):
"""
@@ -1184,7 +1184,7 @@ class RDD(object):
if not isinstance(x, basestring):
x = unicode(x)
yield x.encode("utf-8")
- keyed = PipelinedRDD(self, func)
+ keyed = self.mapPartitionsWithIndex(func)
keyed._bypass_serializer = True
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
@@ -1382,7 +1382,7 @@ class RDD(object):
yield pack_long(split)
yield outputSerializer.dumps(items)
- keyed = PipelinedRDD(self, add_shuffle_key)
+ keyed = self.mapPartitionsWithIndex(add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.context) as st:
pairRDD = self.ctx._jvm.PairwiseRDD(