aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/rdd.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/pyspark/rdd.py')
-rw-r--r--python/pyspark/rdd.py10
1 files changed, 5 insertions, 5 deletions
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fb27863e07..91fc7e637e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -38,6 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
+from pyspark.resultiterable import ResultIterable
from py4j.java_collections import ListConverter, MapConverter
@@ -1118,7 +1119,7 @@ class RDD(object):
Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(x.groupByKey().collect())
+ >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])]
"""
@@ -1133,7 +1134,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numPartitions)
+ numPartitions).mapValues(lambda x: ResultIterable(x))
# TODO: add tests
def flatMapValues(self, f):
@@ -1180,7 +1181,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup(self, other, numPartitions)
@@ -1217,7 +1218,7 @@ class RDD(object):
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
[(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
"""
return self.map(lambda x: (f(x), x))
@@ -1317,7 +1318,6 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
-
class PipelinedRDD(RDD):
"""
Pipelined maps: