From 9abdfa663360252d2edb346e6b3df4ff94ce78d7 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 17 Sep 2012 00:08:50 -0700 Subject: Fix Python 2.6 compatibility in Python API. --- pyspark/pyspark/rdd.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) (limited to 'pyspark') diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 8477f6dd02..e2137fe06c 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -1,5 +1,5 @@ from base64 import standard_b64encode as b64enc -from collections import Counter +from collections import defaultdict from itertools import chain, ifilter, imap import shlex from subprocess import Popen, PIPE @@ -198,13 +198,18 @@ class RDD(object): def countByValue(self): """ - >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common() - [(2, 3), (1, 2)] + >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) + [(1, 2), (2, 3)] """ def countPartition(iterator): - yield Counter(iterator) + counts = defaultdict(int) + for obj in iterator: + counts[obj] += 1 + yield counts def mergeMaps(m1, m2): - return m1 + m2 + for (k, v) in m2.iteritems(): + m1[k] += v + return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) def take(self, num): @@ -271,7 +276,7 @@ class RDD(object): def countByKey(self): """ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> rdd.countByKey().most_common() + >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] """ return self.map(lambda x: x[0]).countByValue() -- cgit v1.2.3