aboutsummaryrefslogtreecommitdiff
path: root/core/src/test
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/test')
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java63
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala33
2 files changed, 96 insertions, 0 deletions
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index e46298c6a9..761f2d6a77 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -21,6 +21,9 @@ import java.io.*;
import java.util.*;
import scala.Tuple2;
+import scala.Tuple3;
+import scala.Tuple4;
+
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
@@ -306,6 +309,66 @@ public class JavaAPISuite implements Serializable {
@SuppressWarnings("unchecked")
@Test
+ public void cogroup3() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 2),
+ new Tuple2<String, Integer>("Apples", 3)
+ ));
+ JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 21),
+ new Tuple2<String, Integer>("Apples", 42)
+ ));
+
+ JavaPairRDD<String, Tuple3<Iterable<String>, Iterable<Integer>, Iterable<Integer>>> cogrouped =
+ categories.cogroup(prices, quantities);
+ Assert.assertEquals("[Fruit, Citrus]",
+ Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+ Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+
+
+ cogrouped.collect();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void cogroup4() {
+ JavaPairRDD<String, String> categories = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Apples", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Fruit"),
+ new Tuple2<String, String>("Oranges", "Citrus")
+ ));
+ JavaPairRDD<String, Integer> prices = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 2),
+ new Tuple2<String, Integer>("Apples", 3)
+ ));
+ JavaPairRDD<String, Integer> quantities = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, Integer>("Oranges", 21),
+ new Tuple2<String, Integer>("Apples", 42)
+ ));
+ JavaPairRDD<String, String> countries = sc.parallelizePairs(Arrays.asList(
+ new Tuple2<String, String>("Oranges", "BR"),
+ new Tuple2<String, String>("Apples", "US")
+ ));
+
+ JavaPairRDD<String, Tuple4<Iterable<String>, Iterable<Integer>, Iterable<Integer>, Iterable<String>>> cogrouped =
+ categories.cogroup(prices, quantities, countries);
+ Assert.assertEquals("[Fruit, Citrus]",
+ Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
+ Assert.assertEquals("[42]", Iterables.toString(cogrouped.lookup("Apples").get(0)._3()));
+ Assert.assertEquals("[BR]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._4()));
+
+ cogrouped.collect();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index 0b9004448a..447e38ec9d 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -249,6 +249,39 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
))
}
+ test("groupWith3") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
+ val joined = rdd1.groupWith(rdd2, rdd3).collect()
+ assert(joined.size === 4)
+ val joinedSet = joined.map(x => (x._1,
+ (x._2._1.toList, x._2._2.toList, x._2._3.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'), List('a'))),
+ (2, (List(1), List('y', 'z'), List())),
+ (3, (List(1), List(), List('b'))),
+ (4, (List(), List('w'), List('c', 'd')))
+ ))
+ }
+
+ test("groupWith4") {
+ val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
+ val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
+ val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd')))
+ val rdd4 = sc.parallelize(Array((2, '@')))
+ val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect()
+ assert(joined.size === 4)
+ val joinedSet = joined.map(x => (x._1,
+ (x._2._1.toList, x._2._2.toList, x._2._3.toList, x._2._4.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'), List('a'), List())),
+ (2, (List(1), List('y', 'z'), List(), List('@'))),
+ (3, (List(1), List(), List('b'), List())),
+ (4, (List(), List('w'), List('c', 'd'), List()))
+ ))
+ }
+
test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
emptyDir.deleteOnExit()