diff options
author | Saldanha <saldaal1@phusca-l24858.wlan.na.novartis.net> | 2014-12-04 14:22:09 -0800 |
---|---|---|
committer | Josh Rosen <joshrosen@databricks.com> | 2014-12-04 14:57:41 -0800 |
commit | 743a889d2778f797aabc3b1e8146e7aa32b62a48 (patch) | |
tree | 510278e3fe5e303846a53f1063bccb1be7d0153d /core | |
parent | 794f3aec24acb578e258532ad0590554d07958ba (diff) | |
download | spark-743a889d2778f797aabc3b1e8146e7aa32b62a48.tar.gz spark-743a889d2778f797aabc3b1e8146e7aa32b62a48.tar.bz2 spark-743a889d2778f797aabc3b1e8146e7aa32b62a48.zip |
[SPARK-4459] Change groupBy type parameter from K to U
Please see https://issues.apache.org/jira/browse/SPARK-4459
Author: Saldanha <saldaal1@phusca-l24858.wlan.na.novartis.net>
Closes #3327 from alokito/master and squashes the following commits:
54b1095 [Saldanha] [SPARK-4459] changed type parameter for keyBy from K to U
d5f73c3 [Saldanha] [SPARK-4459] added keyBy test
316ad77 [Saldanha] SPARK-4459 changed type parameter for groupBy from K to U.
62ddd4b [Saldanha] SPARK-4459 added failing unit test
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala | 17 | ||||
-rw-r--r-- | core/src/test/java/org/apache/spark/JavaAPISuite.java | 41 |
2 files changed, 51 insertions, 7 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index ac42294d56..bd451634e5 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -211,8 +211,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = { - implicit val ctagK: ClassTag[K] = fakeClassTag + def groupBy[U](f: JFunction[T, U]): JavaPairRDD[U, JIterable[T]] = { + // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 + implicit val ctagK: ClassTag[U] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) } @@ -221,10 +222,11 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = { - implicit val ctagK: ClassTag[K] = fakeClassTag + def groupBy[U](f: JFunction[T, U], numPartitions: Int): JavaPairRDD[U, JIterable[T]] = { + // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 + implicit val ctagK: ClassTag[U] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag - JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) + JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[U]))) } /** @@ -458,8 +460,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Creates tuples of the elements in this RDD by applying `f`. */ - def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = { - implicit val ctag: ClassTag[K] = fakeClassTag + def keyBy[U](f: JFunction[T, U]): JavaPairRDD[U, T] = { + // The type parameter is U instead of K in order to work around a compiler bug; see SPARK-4459 + implicit val ctag: ClassTag[U] = fakeClassTag JavaPairRDD.fromRDD(rdd.keyBy(f)) } diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 59c86eecac..3ad4f2f193 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -323,6 +323,47 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds } + @Test + public void groupByOnPairRDD() { + // Regression test for SPARK-4459 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function<Tuple2<Integer, Integer>, Boolean> areOdd = + new Function<Tuple2<Integer, Integer>, Boolean>() { + @Override + public Boolean call(Tuple2<Integer, Integer> x) { + return (x._1() % 2 == 0) && (x._2() % 2 == 0); + } + }; + JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); + JavaPairRDD<Boolean, Iterable<Tuple2<Integer, Integer>>> oddsAndEvens = pairRDD.groupBy(areOdd); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + + oddsAndEvens = pairRDD.groupBy(areOdd, 1); + Assert.assertEquals(2, oddsAndEvens.count()); + Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens + Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds + } + + @SuppressWarnings("unchecked") + @Test + public void keyByOnPairRDD() { + // Regression test for SPARK-4459 + JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13)); + Function<Tuple2<Integer, Integer>, String> sumToString = + new Function<Tuple2<Integer, Integer>, String>() { + @Override + public String call(Tuple2<Integer, Integer> x) { + return String.valueOf(x._1() + x._2()); + } + }; + JavaPairRDD<Integer, Integer> pairRDD = rdd.zip(rdd); + JavaPairRDD<String, Tuple2<Integer, Integer>> keyed = pairRDD.keyBy(sumToString); + Assert.assertEquals(7, keyed.count()); + Assert.assertEquals(1, (long) keyed.lookup("2").get(0)._1()); + } + @SuppressWarnings("unchecked") @Test public void cogroup() { |