aboutsummaryrefslogtreecommitdiff
path: root/pyspark
diff options
context:
space:
mode:
authorJosh Rosen <joshrosen@eecs.berkeley.edu>2012-09-17 00:08:50 -0700
committerJosh Rosen <joshrosen@eecs.berkeley.edu>2012-09-17 00:09:16 -0700
commit9abdfa663360252d2edb346e6b3df4ff94ce78d7 (patch)
tree74528cdf3f74c2b6d4faedac1420291c691e2a12 /pyspark
parent414367850982c4f8fc5e63cc94caa422eb736db5 (diff)
downloadspark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.gz
spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.bz2
spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.zip
Fix Python 2.6 compatibility in Python API.
Diffstat (limited to 'pyspark')
-rw-r--r--pyspark/pyspark/rdd.py17
1 files changed, 11 insertions, 6 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 8477f6dd02..e2137fe06c 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -1,5 +1,5 @@
from base64 import standard_b64encode as b64enc
-from collections import Counter
+from collections import defaultdict
from itertools import chain, ifilter, imap
import shlex
from subprocess import Popen, PIPE
@@ -198,13 +198,18 @@ class RDD(object):
def countByValue(self):
"""
- >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common()
- [(2, 3), (1, 2)]
+ >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
+ [(1, 2), (2, 3)]
"""
def countPartition(iterator):
- yield Counter(iterator)
+ counts = defaultdict(int)
+ for obj in iterator:
+ counts[obj] += 1
+ yield counts
def mergeMaps(m1, m2):
- return m1 + m2
+ for (k, v) in m2.iteritems():
+ m1[k] += v
+ return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
def take(self, num):
@@ -271,7 +276,7 @@ class RDD(object):
def countByKey(self):
"""
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> rdd.countByKey().most_common()
+ >>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
"""
return self.map(lambda x: x[0]).countByValue()