aboutsummaryrefslogtreecommitdiff
path: root/python/pyspark/tests.py
diff options
context:
space:
mode:
authorSandy Ryza <sandy@cloudera.com>2014-06-12 08:14:25 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-12 08:14:25 -0700
commitce92a9c18f033ac9fa2f12143fab00a90e0f4577 (patch)
tree34507639ff2f876630c6868619844c52e13d3720 /python/pyspark/tests.py
parent43d53d51c9ee2626d9de91faa3b192979b86821d (diff)
downloadspark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.gz
spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.tar.bz2
spark-ce92a9c18f033ac9fa2f12143fab00a90e0f4577.zip
SPARK-554. Add aggregateByKey.
Author: Sandy Ryza <sandy@cloudera.com> Closes #705 from sryza/sandy-spark-554 and squashes the following commits: 2302b8f [Sandy Ryza] Add MIMA exclude f52e0ad [Sandy Ryza] Fix Python tests for real 2f3afa3 [Sandy Ryza] Fix Python test 0b735e9 [Sandy Ryza] Fix line lengths ae56746 [Sandy Ryza] Fix doc (replace T with V) c2be415 [Sandy Ryza] Java and Python aggregateByKey 23bf400 [Sandy Ryza] SPARK-554. Add aggregateByKey.
Diffstat (limited to 'python/pyspark/tests.py')
-rw-r--r--python/pyspark/tests.py15
1 files changed, 15 insertions, 0 deletions
diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py
index 184ee810b8..c15bb45775 100644
--- a/python/pyspark/tests.py
+++ b/python/pyspark/tests.py
@@ -188,6 +188,21 @@ class TestRDDFunctions(PySparkTestCase):
os.unlink(tempFile.name)
self.assertRaises(Exception, lambda: filtered_data.count())
+ def testAggregateByKey(self):
+ data = self.sc.parallelize([(1, 1), (1, 1), (3, 2), (5, 1), (5, 3)], 2)
+ def seqOp(x, y):
+ x.add(y)
+ return x
+
+ def combOp(x, y):
+ x |= y
+ return x
+
+ sets = dict(data.aggregateByKey(set(), seqOp, combOp).collect())
+ self.assertEqual(3, len(sets))
+ self.assertEqual(set([1]), sets[1])
+ self.assertEqual(set([2]), sets[3])
+ self.assertEqual(set([1, 3]), sets[5])
class TestIO(PySparkTestCase):