aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--pyspark/pyspark/rdd.py17
-rw-r--r--python/tc.py22
2 files changed, 11 insertions, 28 deletions
diff --git a/pyspark/pyspark/rdd.py b/pyspark/pyspark/rdd.py
index 8477f6dd02..e2137fe06c 100644
--- a/pyspark/pyspark/rdd.py
+++ b/pyspark/pyspark/rdd.py
@@ -1,5 +1,5 @@
from base64 import standard_b64encode as b64enc
-from collections import Counter
+from collections import defaultdict
from itertools import chain, ifilter, imap
import shlex
from subprocess import Popen, PIPE
@@ -198,13 +198,18 @@ class RDD(object):
def countByValue(self):
"""
- >>> sc.parallelize([1, 2, 1, 2, 2]).countByValue().most_common()
- [(2, 3), (1, 2)]
+ >>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
+ [(1, 2), (2, 3)]
"""
def countPartition(iterator):
- yield Counter(iterator)
+ counts = defaultdict(int)
+ for obj in iterator:
+ counts[obj] += 1
+ yield counts
def mergeMaps(m1, m2):
- return m1 + m2
+ for (k, v) in m2.iteritems():
+ m1[k] += v
+ return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
def take(self, num):
@@ -271,7 +276,7 @@ class RDD(object):
def countByKey(self):
"""
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> rdd.countByKey().most_common()
+ >>> sorted(rdd.countByKey().items())
[('a', 2), ('b', 1)]
"""
return self.map(lambda x: x[0]).countByValue()
diff --git a/python/tc.py b/python/tc.py
deleted file mode 100644
index 5dcc4317e0..0000000000
--- a/python/tc.py
+++ /dev/null
@@ -1,22 +0,0 @@
-from rdd import SparkContext
-
-sc = SparkContext("local", "PythonWordCount")
-e = [(1, 2), (2, 3), (4, 1)]
-
-tc = sc.parallelizePairs(e)
-
-edges = tc.mapPairs(lambda (x, y): (y, x))
-
-oldCount = 0
-nextCount = tc.count()
-
-def project(x):
- return (x[1][1], x[1][0])
-
-while nextCount != oldCount:
- oldCount = nextCount
- tc = tc.union(tc.join(edges).mapPairs(project)).distinct()
- nextCount = tc.count()
-
-print "TC has %i edges" % tc.count()
-print tc.collect()