diff options
author | Holden Karau <holden@pigscanfly.ca> | 2014-04-08 18:15:52 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-08 18:15:59 -0700 |
commit | ce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch) | |
tree | 029a7ba0926eb1a8384ba73e74fc0bb018121528 /core/src/main/scala | |
parent | 12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff) | |
download | spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2 spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip |
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca>
Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits:
f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator
77048f8 [Holden Karau] Fix merge up to master
d3fe909 [Holden Karau] use toSeq instead
7a092a3 [Holden Karau] switch resultitr to resultiterable
eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables
c5075aa [Holden Karau] If guava 14 had iterables
2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API
11e730c [Holden Karau] Fix streaming tests
66b583d [Holden Karau] Fix the core test suite to compile
4ed579b [Holden Karau] Refactor from iterator to iterable
d052c07 [Holden Karau] Python tests now pass with iterator pandas
3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work"
cd1e81c [Holden Karau] Try and make pickling list iterators work
c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well
88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming
a5ee714 [Holden Karau] oops, was checking wrong iterator
e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming
ec8cc3e [Holden Karau] Fix test issues\!
4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions
fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD"
ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas"
b692868 [Holden Karau] Revert
7e533f7 [Holden Karau] Fix the bug
8a5153a [Holden Karau] Revert me, but we have some stuff to debug
b4e86a9 [Holden Karau] Add a join based on the problem in SVD
c4510e2 [Holden Karau] Revert this but for now put things in list pandas
b4e0b1d [Holden Karau] Fix style issues
71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness.
b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work
37888ec [Holden Karau] core/tests now pass
249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes
6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy"
fe992fe [Holden Karau] hmmm try and fix up basic operation suite
172705c [Holden Karau] Fix Java API suite
caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy
88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator
4991af6 [Holden Karau] Fix some tests
be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after
687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'core/src/main/scala')
4 files changed, 46 insertions, 41 deletions
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 9596dbaf75..e6c5d85917 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -18,6 +18,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList} +import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ import scala.reflect.ClassTag @@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] = + def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(partitioner))) /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] = + def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions))) /** @@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): JavaPairRDD[K, JList[V]] = + def groupByKey(): JavaPairRDD[K, JIterable[V]] = fromRDD(groupByResultToJava(rdd.groupByKey())) /** @@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner))) /** @@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], - partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner))) /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other))) /** @@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2))) /** @@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions))) /** @@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions))) /** Alias for cogroup. */ - def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] = + def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] = fromRDD(cogroupResultToJava(rdd.groupWith(other))) /** Alias for cogroup. */ def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2]) - : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] = + : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] = fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2))) /** @@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) object JavaPairRDD { private[spark] - def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = { - rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList) + def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = { + rddToPairRDDFunctions(rdd).mapValues(asJavaIterable) } private[spark] def cogroupResultToJava[K: ClassTag, V, W]( - rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = { - rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2))) + rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = { + rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2))) } private[spark] def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( - rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = { + rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]) + : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = { rddToPairRDDFunctions(rdd) - .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3))) + .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3))) } def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 6e8ec8e0c7..ae577b500c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import java.util.{Comparator, Iterator => JIterator, List => JList} +import java.util.{Comparator, List => JList, Iterator => JIterator} import java.lang.{Iterable => JIterable} import scala.collection.JavaConversions._ @@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag))) @@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = { + def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[JList[T]] = fakeClassTag JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K]))) diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 14386ff5b9..a92a84b534 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ - def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { + def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. @@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2 val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false) - bufs.asInstanceOf[RDD[(K, Seq[V])]] + bufs.mapValues(_.toIterable) } /** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with into `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = { + def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = { groupByKey(new HashPartitioner(numPartitions)) } @@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) */ def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => - for (v <- vs.iterator; w <- ws.iterator) yield (v, w) + for (v <- vs; w <- ws) yield (v, w) } } @@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (ws.isEmpty) { - vs.iterator.map(v => (v, None)) + vs.map(v => (v, None)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w)) + for (v <- vs; w <- ws) yield (v, Some(w)) } } } @@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) : RDD[(K, (Option[V], W))] = { this.cogroup(other, partitioner).flatMapValues { case (vs, ws) => if (vs.isEmpty) { - ws.iterator.map(w => (None, w)) + ws.map(w => (None, w)) } else { - for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w) + for (v <- vs; w <- ws) yield (Some(v), w) } } } @@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. */ - def groupByKey(): RDD[(K, Seq[V])] = { + def groupByKey(): RDD[(K, Iterable[V])] = { groupByKey(defaultPartitioner(self)) } @@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner) + : RDD[(K, (Iterable[V], Iterable[W]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } @@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) { throw new SparkException("Default partitioner cannot partition array keys.") } val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner) cg.mapValues { case Seq(vs, w1s, w2s) => - (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]]) + (vs.asInstanceOf[Seq[V]], + w1s.asInstanceOf[Seq[W1]], + w2s.asInstanceOf[Seq[W2]]) } } @@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } @@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } @@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ - def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = { + def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, new HashPartitioner(numPartitions)) } @@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)]) * tuple with the list of values for that key in `this`, `other1` and `other2`. */ def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, new HashPartitioner(numPartitions)) } /** Alias for cogroup. */ - def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = { + def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(self, other)) } /** Alias for cogroup. */ def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)]) - : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = { + : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = { cogroup(other1, other2, defaultPartitioner(self, other1, other2)) } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index bf3c57ad41..74fa2a4fcd 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag]( /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] = groupBy[K](f, defaultPartitioner(this)) /** * Return an RDD of grouped elements. Each group consists of a key and a sequence of elements * mapping to that key. */ - def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] = + def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] = groupBy(f, new HashPartitioner(numPartitions)) /** * Return an RDD of grouped items. */ - def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = { + def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = { val cleanF = sc.clean(f) this.map(t => (cleanF(t), t)).groupByKey(p) } |