From ce92a9c18f033ac9fa2f12143fab00a90e0f4577 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 12 Jun 2014 08:14:25 -0700 Subject: SPARK-554. Add aggregateByKey. Author: Sandy Ryza Closes #705 from sryza/sandy-spark-554 and squashes the following commits: 2302b8f [Sandy Ryza] Add MIMA exclude f52e0ad [Sandy Ryza] Fix Python tests for real 2f3afa3 [Sandy Ryza] Fix Python test 0b735e9 [Sandy Ryza] Fix line lengths ae56746 [Sandy Ryza] Fix doc (replace T with V) c2be415 [Sandy Ryza] Java and Python aggregateByKey 23bf400 [Sandy Ryza] SPARK-554. Add aggregateByKey. --- .../test/java/org/apache/spark/JavaAPISuite.java | 31 ++++++++++++++++++++++ .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 13 +++++++++ 2 files changed, 44 insertions(+) (limited to 'core/src/test') diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 50a6212911..ef41bfb88d 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -317,6 +317,37 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(33, sum); } + @Test + public void aggregateByKey() { + JavaPairRDD pairs = sc.parallelizePairs( + Arrays.asList( + new Tuple2(1, 1), + new Tuple2(1, 1), + new Tuple2(3, 2), + new Tuple2(5, 1), + new Tuple2(5, 3)), 2); + + Map> sets = pairs.aggregateByKey(new HashSet(), + new Function2, Integer, Set>() { + @Override + public Set call(Set a, Integer b) { + a.add(b); + return a; + } + }, + new Function2, Set, Set>() { + @Override + public Set call(Set a, Set b) { + a.addAll(b); + return a; + } + }).collectAsMap(); + Assert.assertEquals(3, sets.size()); + Assert.assertEquals(new HashSet(Arrays.asList(1)), sets.get(1)); + Assert.assertEquals(new HashSet(Arrays.asList(2)), sets.get(3)); + Assert.assertEquals(new HashSet(Arrays.asList(1, 3)), sets.get(5)); + } + @SuppressWarnings("unchecked") @Test public void foldByKey() { diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 9ddafc4518..0b9004448a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -30,6 +30,19 @@ import org.apache.spark.SparkContext._ import org.apache.spark.{Partitioner, SharedSparkContext} class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { + test("aggregateByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) + + val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect() + assert(sets.size === 3) + val valuesFor1 = sets.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1)) + val valuesFor3 = sets.find(_._1 == 3).get._2 + assert(valuesFor3.toList.sorted === List(2)) + val valuesFor5 = sets.find(_._1 == 5).get._2 + assert(valuesFor5.toList.sorted === List(1, 3)) + } + test("groupByKey") { val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey().collect() -- cgit v1.2.3