aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
authorAllan Douglas R. de Oliveira <allandouglas@gmail.com>2014-06-20 11:03:03 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-06-20 11:03:03 -0700
commit6a224c31e8563156ad5732a23667e73076984ae1 (patch)
treee3364e0ab07258a483668635a90da442a6d0a8df /core/src/test
parentd484ddeff1440d8e14e05c3cd7e7a18746f1a586 (diff)
downloadspark-6a224c31e8563156ad5732a23667e73076984ae1.tar.gz
spark-6a224c31e8563156ad5732a23667e73076984ae1.tar.bz2
spark-6a224c31e8563156ad5732a23667e73076984ae1.zip
SPARK-1868: Users should be allowed to cogroup at least 4 RDDs
Adds cogroup for 4 RDDs. Author: Allan Douglas R. de Oliveira <allandouglas@gmail.com> Closes #813 from douglaz/more_cogroups and squashes the following commits: f8d6273 [Allan Douglas R. de Oliveira] Test python groupWith for one more case 0e9009c [Allan Douglas R. de Oliveira] Added scala tests c3ffcdd [Allan Douglas R. de Oliveira] Added java tests 517a67f [Allan Douglas R. de Oliveira] Added tests for python groupWith 2f402d5 [Allan Douglas R. de Oliveira] Removed TODO 17474f4 [Allan Douglas R. de Oliveira] Use new cogroup function 7877a2a [Allan Douglas R. de Oliveira] Fixed code ba02414 [Allan Douglas R. de Oliveira] Added varargs cogroup to pyspark c4a8a51 [Allan Douglas R. de Oliveira] Added java cogroup 4 e94963c [Allan Douglas R. de Oliveira] Fixed spacing f1ee57b [Allan Douglas R. de Oliveira] Fixed scala style issues d7196f1 [Allan Douglas R. de Oliveira] Allow the cogroup of 4 RDDs
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java63
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala33
2 files changed, 96 insertions, 0 deletions
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index e46298c6a9..761f2d6a77 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,6 +21,9 @@ import java.io.*;
import java.util.*;
import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
@@ -306,6 +309,66 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
+ public void cogroup3() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 2),
+ new Tuple2<String, Integer>("Apples", 3)
+ ));
+ JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 21),
+ new Tuple2<String, Integer>("Apples", 42)
+ ));
+
+ JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
+ categories.cogroup(prices, quantities);
+ Assert.assertEquals("[Fruit, Citrus]",
+ Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+ Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+
+
+ cogrouped.collect();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void cogroup4() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 2),
+ new Tuple2<String, Integer>("Apples", 3)
+ ));
+ JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 21),
+ new Tuple2<String, Integer>("Apples", 42)
+ ));
+ JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Oranges", "BR"),
+ new Tuple2<String, String>("Apples", "US")
+ ));
+
+ JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped =
+ categories.cogroup(prices, quantities, countries);
+ Assert.assertEquals("[Fruit, Citrus]",
+ Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+ Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+ Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
+
+ cogrouped.collect();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 0b9004448a..447e38ec9d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -249,6 +249,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}
+ test("groupWith3") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
+ val joined = rdd1.groupWith(rdd2, rdd3).collect()
+ assert(joined.size === 4)
+ val joinedSet = joined.map(x => (x._1,
+ (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'), List('a'))),
+ (2, (List(1), List('y', 'z'), List())),
+ (3, (List(1), List(), List('b'))),
+ (4, (List(), List('w'), List('c', 'd')))
+ ))
+ }
+
+ test("groupWith4") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
+ val rdd4 = sc.parallelize(Array((2, '@')))
+ val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect()
+ assert(joined.size === 4)
+ val joinedSet = joined.map(x => (x._1,
+ (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'), List('a'), List())),
+ (2, (List(1), List('y', 'z'), List(), List('@'))),
+ (3, (List(1), List(), List('b'), List())),
+ (4, (List(), List('w'), List('c', 'd'), List()))
+ ))
+ }
+
test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
emptyDir.deleteOnExit()