diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-17 00:08:50 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-17 00:09:16 -0700 |
commit | 9abdfa663360252d2edb346e6b3df4ff94ce78d7 (patch) | |
tree | 74528cdf3f74c2b6d4faedac1420291c691e2a12 /pyspark | |
parent | 414367850982c4f8fc5e63cc94caa422eb736db5 (diff) | |
download | spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.gz spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.bz2 spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.zip |
Fix Python 2.6 compatibility in Python API.
Diffstat (limited to 'pyspark')
-rw-r--r-- | pyspark/pyspark/rdd.py | 17 |
1 files changed, 11 insertions, 6 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 8477f6dd02..e2137fe06c 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -1,5 +1,5 @@ from base64 import standard_b64encode as b64enc -from collections import Counter +from collections import defaultdict from itertools import chain, ifilter, imap import shlex from subprocess import Popen, PIPE @@ -198,13 +198,18 @@ class RDD(object): def countByValue(self): """ - >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common() - [(2, 3), (1, 2)] + >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) + [(1, 2), (2, 3)] """ def countPartition(iterator): - yield Counter(iterator) + counts = defaultdict(int) + for obj in iterator: + counts[obj] += 1 + yield counts def mergeMaps(m1, m2): - return m1 + m2 + for (k, v) in m2.iteritems(): + m1[k] += v + return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) def take(self, num): @@ -271,7 +276,7 @@ class RDD(object): def countByKey(self): """ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> rdd.countByKey().most_common() + >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] """ return self.map(lambda x: x[0]).countByValue() |