aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSaldanha <saldaal1@phusca-l24858.wlan.na.novartis.net>2014-12-04 14:22:09 -0800
committerJosh Rosen <joshrosen@databricks.com>2014-12-04 14:58:37 -0800
commit0d159de39a21ab82d4ecdfa2d88fa525339daee1 (patch)
tree91e95eff1c523724145318a43b964b1e0e057b58
parenta00d0aa6e8cb64f00656fdf4d46ea7842b884e5e (diff)
downloadspark-0d159de39a21ab82d4ecdfa2d88fa525339daee1.tar.gz
spark-0d159de39a21ab82d4ecdfa2d88fa525339daee1.tar.bz2
spark-0d159de39a21ab82d4ecdfa2d88fa525339daee1.zip
[SPARK-4459] Change groupBy type parameter from K to U
Please see https://issues.apache.org/jira/browse/SPARK-4459 Author: Saldanha <saldaal1@phusca-l24858.wlan.na.novartis.net> Closes #3327 from alokito/master and squashes the following commits: 54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U d5f73c3 [Saldanha] [SPARK-4459] added keyBy test 316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U. 62ddd4b [Saldanha] SPARK-4459 added failing unit test (cherry picked from commit 743a889d2778f797aabc3b1e8146e7aa32b62a48) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala17
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java41
2 files changed, 51 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 5a8e5bb1f7..fa2c1c28c9 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -212,8 +212,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
- implicit val ctagK: ClassTag[K] = fakeClassTag
+ def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = {
+ // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
+ implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
}
@@ -222,10 +223,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
- implicit val ctagK: ClassTag[K] = fakeClassTag
+ def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = {
+ // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
+ implicit val ctagK: ClassTag[U] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
- JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
+ JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U])))
}
/**
@@ -459,8 +461,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Creates tuples of the elements in this RDD by applying `f`.
*/
- def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
- implicit val ctag: ClassTag[K] = fakeClassTag
+ def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = {
+ // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459
+ implicit val ctag: ClassTag[U] = fakeClassTag
JavaPairRDD.fromRDD(rdd.keyBy(f))
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 59c86eecac..3ad4f2f193 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -323,6 +323,47 @@ public class JavaAPISuite implements Serializable {
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
+ @Test
+ public void groupByOnPairRDD() {
+ // Regression test for SPARK-4459
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Tuple2<Integer, Integer>, Boolean> areOdd =
+ new Function<Tuple2<Integer, Integer>, Boolean>() {
+ @Override
+ public Boolean call(Tuple2<Integer, Integer> x) {
+ return (x._1() % 2 == 0) && (x._2() % 2 == 0);
+ }
+ };
+ JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+ JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+
+ oddsAndEvens = pairRDD.groupBy(areOdd, 1);
+ Assert.assertEquals(2, oddsAndEvens.count());
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
+ }
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void keyByOnPairRDD() {
+ // Regression test for SPARK-4459
+ JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
+ Function<Tuple2<Integer, Integer>, String> sumToString =
+ new Function<Tuple2<Integer, Integer>, String>() {
+ @Override
+ public String call(Tuple2<Integer, Integer> x) {
+ return String.valueOf(x._1() + x._2());
+ }
+ };
+ JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd);
+ JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString);
+ Assert.assertEquals(7, keyed.count());
+ Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1());
+ }
+
@SuppressWarnings("unchecked")
@Test
public void cogroup() {