aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2014-04-08 18:15:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 18:15:59 -0700
commitce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch)
tree029a7ba0926eb1a8384ba73e74fc0bb018121528 /core
parent12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff)
downloadspark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca> Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'core')
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java20
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala4
9 files changed, 67 insertions, 62 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 9596dbaf75..e6c5d85917 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.java
import java.util.{Comparator, List => JList}
+import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
- def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
+ def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
+ def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
@@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
- def groupByKey(): JavaPairRDD[K, JList[V]] =
+ def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))
/**
@@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (JList[V], JList[W])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
/**
@@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
- partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
/**
@@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
/**
@@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
- : JavaPairRDD[K, (JList[V], JList[W])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
/**
@@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
/** Alias for cogroup. */
- def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
/**
@@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
object JavaPairRDD {
private[spark]
- def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
- rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
+ def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
+ rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
}
private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
- rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
- rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+ rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
+ rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
}
private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
- rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
+ rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
+ : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
rddToPairRDDFunctions(rdd)
- .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
+ .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6e8ec8e0c7..ae577b500c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.java
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
@@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
+ def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
+ def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 14386ff5b9..a92a84b534 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
- def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
@@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ bufs.mapValues(_.toIterable)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}
@@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ for (v <- vs; w <- ws) yield (v, w)
}
}
@@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
- vs.iterator.map(v => (v, None))
+ vs.map(v => (v, None))
} else {
- for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+ for (v <- vs; w <- ws) yield (v, Some(w))
}
}
}
@@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
- ws.iterator.map(w => (None, w))
+ ws.map(w => (None, w))
} else {
- for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+ for (v <- vs; w <- ws) yield (Some(v), w)
}
}
}
@@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
- def groupByKey(): RDD[(K, Seq[V])] = {
+ def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self))
}
@@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Iterable[V], Iterable[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ (vs.asInstanceOf[Seq[V]],
+ w1s.asInstanceOf[Seq[W1]],
+ w2s.asInstanceOf[Seq[W2]])
}
}
@@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
@@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}
@@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
/** Alias for cogroup. */
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index bf3c57ad41..74fa2a4fcd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 762405be2a..ab2fdac553 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -18,10 +18,12 @@
package org.apache.spark;
import java.io.*;
+import java.lang.StringBuilder;
import java.util.*;
import scala.Tuple2;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
@@ -197,7 +199,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("Oranges", "Citrus")
));
Assert.assertEquals(2, categories.lookup("Oranges").size());
- Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+ Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
}
@Test
@@ -209,15 +211,15 @@ public class JavaAPISuite implements Serializable {
return x % 2 == 0;
}
};
- JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
@SuppressWarnings("unchecked")
@@ -232,9 +234,9 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("Oranges", 2),
new Tuple2<String, Integer>("Apples", 3)
));
- JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
- Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
- Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
+ JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
+ Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
cogrouped.collect();
}
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index f3fb64d87a..12dbebcb28 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
throw new Exception("Intentional task failure")
}
}
- (k, v(0) * v(0))
+ (k, v.head * v.head)
}.collect()
FailureSuiteState.synchronized {
assert(FailureSuiteState.tasksRun === 4)
@@ -137,5 +137,3 @@ class FailureSuite extends FunSuite with LocalSparkContext {
// TODO: Need to add tests with shuffle fetch failures.
}
-
-
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 627e9b5cd9..867b28cc0d 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
(f: String => Unit) => {
bl.value.map(f(_)); f("\u0001")
},
- (i: Tuple2[String, Seq[String]], f: String => Unit) => {
+ (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
for (e <- i._2) {
f(e + "_")
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index f9e994b13d..8f3e6bd21b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
- assert(joined.toSet === Set(
- (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
- (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
- (3, (ArrayBuffer(1), ArrayBuffer())),
- (4, (ArrayBuffer(), ArrayBuffer('w')))
+ val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'))),
+ (2, (List(1), List('y', 'z'))),
+ (3, (List(1), List())),
+ (4, (List(), List('w')))
))
}
@@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
super.getRecordWriter(p1)
}
}
-
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fce1184d46..cdebefb675 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -174,9 +174,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
// groupByKey
- val result2 = rdd.groupByKey().collect()
+ val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
assert(result2.toSet == Set[(Int, Seq[Int])]
- ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
+ ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1))))
}
test("simple cogroup") {