diff options
author | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-17 00:08:50 -0700 |
---|---|---|
committer | Josh Rosen <joshrosen@eecs.berkeley.edu> | 2012-09-17 00:09:16 -0700 |
commit | 9abdfa663360252d2edb346e6b3df4ff94ce78d7 (patch) | |
tree | 74528cdf3f74c2b6d4faedac1420291c691e2a12 | |
parent | 414367850982c4f8fc5e63cc94caa422eb736db5 (diff) | |
download | spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.gz spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.tar.bz2 spark-9abdfa663360252d2edb346e6b3df4ff94ce78d7.zip |
Fix Python 2.6 compatibility in Python API.
-rw-r--r-- | pyspark/pyspark/rdd.py | 17 | ||||
-rw-r--r-- | python/tc.py | 22 |
2 files changed, 11 insertions, 28 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py index 8477f6dd02..e2137fe06c 100644 --- a/pyspark/pyspark/rdd.py +++ b/pyspark/pyspark/rdd.py @@ -1,5 +1,5 @@ from base64 import standard_b64encode as b64enc -from collections import Counter +from collections import defaultdict from itertools import chain, ifilter, imap import shlex from subprocess import Popen, PIPE @@ -198,13 +198,18 @@ class RDD(object): def countByValue(self): """ - >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common() - [(2, 3), (1, 2)] + >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) + [(1, 2), (2, 3)] """ def countPartition(iterator): - yield Counter(iterator) + counts = defaultdict(int) + for obj in iterator: + counts[obj] += 1 + yield counts def mergeMaps(m1, m2): - return m1 + m2 + for (k, v) in m2.iteritems(): + m1[k] += v + return m1 return self.mapPartitions(countPartition).reduce(mergeMaps) def take(self, num): @@ -271,7 +276,7 @@ class RDD(object): def countByKey(self): """ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) - >>> rdd.countByKey().most_common() + >>> sorted(rdd.countByKey().items()) [('a', 2), ('b', 1)] """ return self.map(lambda x: x[0]).countByValue() diff --git a/python/tc.py b/python/tc.py deleted file mode 100644 index 5dcc4317e0..0000000000 --- a/python/tc.py +++ /dev/null @@ -1,22 +0,0 @@ -from rdd import SparkContext - -sc = SparkContext("local", "PythonWordCount") -e = [(1, 2), (2, 3), (4, 1)] - -tc = sc.parallelizePairs(e) - -edges = tc.mapPairs(lambda (x, y): (y, x)) - -oldCount = 0 -nextCount = tc.count() - -def project(x): - return (x[1][1], x[1][0]) - -while nextCount != oldCount: - oldCount = nextCount - tc = tc.union(tc.join(edges).mapPairs(project)).distinct() - nextCount = tc.count() - -print "TC has %i edges" % tc.count() -print tc.collect() |