diff options
author | Holden Karau <holden@pigscanfly.ca> | 2014-04-08 18:15:52 -0700 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-04-08 18:15:59 -0700 |
commit | ce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch) | |
tree | 029a7ba0926eb1a8384ba73e74fc0bb018121528 /streaming/src/main | |
parent | 12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff) | |
download | spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2 spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip |
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca>
Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits:
f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator
77048f8 [Holden Karau] Fix merge up to master
d3fe909 [Holden Karau] use toSeq instead
7a092a3 [Holden Karau] switch resultitr to resultiterable
eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables
c5075aa [Holden Karau] If guava 14 had iterables
2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API
11e730c [Holden Karau] Fix streaming tests
66b583d [Holden Karau] Fix the core test suite to compile
4ed579b [Holden Karau] Refactor from iterator to iterable
d052c07 [Holden Karau] Python tests now pass with iterator pandas
3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work"
cd1e81c [Holden Karau] Try and make pickling list iterators work
c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well
88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming
a5ee714 [Holden Karau] oops, was checking wrong iterator
e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming
ec8cc3e [Holden Karau] Fix test issues\!
4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions
fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD"
ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas"
b692868 [Holden Karau] Revert
7e533f7 [Holden Karau] Fix the bug
8a5153a [Holden Karau] Revert me, but we have some stuff to debug
b4e86a9 [Holden Karau] Add a join based on the problem in SVD
c4510e2 [Holden Karau] Revert this but for now put things in list pandas
b4e0b1d [Holden Karau] Fix style issues
71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness.
b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work
37888ec [Holden Karau] core/tests now pass
249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes
6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy"
fe992fe [Holden Karau] hmmm try and fix up basic operation suite
172705c [Holden Karau] Fix Java API suite
caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy
88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator
4991af6 [Holden Karau] Fix some tests
be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after
687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'streaming/src/main')
3 files changed, 45 insertions, 39 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala index ac451d1913..2ac943d7bf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.api.java -import java.lang.{Long => JLong} +import java.lang.{Long => JLong, Iterable => JIterable} import java.util.{List => JList} import scala.collection.JavaConversions._ @@ -115,15 +115,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): JavaPairDStream[K, JList[V]] = - dstream.groupByKey().mapValues(seqAsJavaList _) + def groupByKey(): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey().mapValues(asJavaIterable _) /** * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] = - dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _) + def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey(numPartitions).mapValues(asJavaIterable _) /** * Return a new DStream by applying `groupByKey` on each RDD of `this` DStream. @@ -131,8 +131,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner * is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] = - dstream.groupByKey(partitioner).mapValues(seqAsJavaList _) + def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] = + dstream.groupByKey(partitioner).mapValues(asJavaIterable _) /** * Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are @@ -196,8 +196,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _) + def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = { + dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _) } /** @@ -211,8 +211,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * DStream's batching interval */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) - : JavaPairDStream[K, JList[V]] = { - dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _) + : JavaPairDStream[K, JIterable[V]] = { + dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _) } /** @@ -227,9 +227,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * @param numPartitions Number of partitions of each RDD in the new DStream. */ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int) - :JavaPairDStream[K, JList[V]] = { + :JavaPairDStream[K, JIterable[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions) - .mapValues(seqAsJavaList _) + .mapValues(asJavaIterable _) } /** @@ -247,9 +247,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ):JavaPairDStream[K, JList[V]] = { + ):JavaPairDStream[K, JIterable[V]] = { dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner) - .mapValues(seqAsJavaList _) + .mapValues(asJavaIterable _) } /** @@ -518,9 +518,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = { + def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag - dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** @@ -530,10 +530,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def cogroup[W]( other: JavaPairDStream[K, W], numPartitions: Int - ): JavaPairDStream[K, (JList[V], JList[W])] = { + ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, numPartitions) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** @@ -543,10 +543,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])( def cogroup[W]( other: JavaPairDStream[K, W], partitioner: Partitioner - ): JavaPairDStream[K, (JList[V], JList[W])] = { + ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = { implicit val cm: ClassTag[W] = fakeClassTag dstream.cogroup(other.dstream, partitioner) - .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2)))) + .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2)))) } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index 2473496949..354bc132dc 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -51,7 +51,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with Spark's default number of partitions. */ - def groupByKey(): DStream[(K, Seq[V])] = { + def groupByKey(): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner()) } @@ -59,7 +59,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to * generate the RDDs with `numPartitions` partitions. */ - def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = { + def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = { groupByKey(defaultPartitioner(numPartitions)) } @@ -67,12 +67,12 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Return a new DStream by applying `groupByKey` on each RDD. The supplied * org.apache.spark.Partitioner is used to control the partitioning of each RDD. */ - def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = { + def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = { val createCombiner = (v: V) => ArrayBuffer[V](v) val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v) val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2) combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] + .asInstanceOf[DStream[(K, Iterable[V])]] } /** @@ -126,7 +126,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * @param windowDuration width of the window; must be a multiple of this DStream's * batching interval */ - def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = { + def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner()) } @@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * the new DStream will generate RDDs); must be a multiple of this * DStream's batching interval */ - def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] = + def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration) + : DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner()) } @@ -161,7 +162,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, numPartitions: Int - ): DStream[(K, Seq[V])] = { + ): DStream[(K, Iterable[V])] = { groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions)) } @@ -180,14 +181,14 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) windowDuration: Duration, slideDuration: Duration, partitioner: Partitioner - ): DStream[(K, Seq[V])] = { - val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v - val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v + ): DStream[(K, Iterable[V])] = { + val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v + val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2 self.groupByKey(partitioner) .window(windowDuration, slideDuration) .combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner) - .asInstanceOf[DStream[(K, Seq[V])]] + .asInstanceOf[DStream[(K, Iterable[V])]] } /** @@ -438,7 +439,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Hash partitioning is used to generate the RDDs with Spark's default number * of partitions. */ - def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = { + def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner()) } @@ -447,7 +448,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) * Hash partitioning is used to generate the RDDs with `numPartitions` partitions. */ def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int) - : DStream[(K, (Seq[V], Seq[W]))] = { + : DStream[(K, (Iterable[V], Iterable[W]))] = { cogroup(other, defaultPartitioner(numPartitions)) } @@ -458,7 +459,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)]) def cogroup[W: ClassTag]( other: DStream[(K, W)], partitioner: Partitioner - ): DStream[(K, (Seq[V], Seq[W]))] = { + ): DStream[(K, (Iterable[V], Iterable[W]))] = { self.transformWith( other, (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala index 5f7d3ba26c..7e22268767 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala @@ -56,9 +56,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // first map the cogrouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => { + val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => { val i = iterator.map(t => { - (t._1, t._2._1, t._2._2.headOption) + val itr = t._2._2.iterator + val headOption = itr.hasNext match { + case true => Some(itr.next()) + case false => None + } + (t._1, t._2._1.toSeq, headOption) }) updateFuncLocal(i) } @@ -90,8 +95,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag]( // first map the grouped tuple to tuples of required type, // and then apply the update function val updateFuncLocal = updateFunc - val finalFunc = (iterator: Iterator[(K, Seq[V])]) => { - updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None))) + val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => { + updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None))) } val groupedRDD = parentRDD.groupByKey(partitioner) |