aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main
diff options
context:
space:
mode:
authorHolden Karau <holden@pigscanfly.ca>2014-04-08 18:15:52 -0700
committerPatrick Wendell <pwendell@gmail.com>2014-04-08 18:15:59 -0700
commitce8ec5456169682f27f846e7b8d51e6c4bcf75e3 (patch)
tree029a7ba0926eb1a8384ba73e74fc0bb018121528 /streaming/src/main
parent12c077d5aa0b76a808a55db625c9677a52bd43f9 (diff)
downloadspark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.gz
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.tar.bz2
spark-ce8ec5456169682f27f846e7b8d51e6c4bcf75e3.zip
Spark 1271: Co-Group and Group-By should pass Iterable[X]
Author: Holden Karau <holden@pigscanfly.ca> Closes #242 from holdenk/spark-1320-cogroupandgroupshouldpassiterator and squashes the following commits: f289536 [Holden Karau] Fix bad merge, should have been Iterable rather than Iterator 77048f8 [Holden Karau] Fix merge up to master d3fe909 [Holden Karau] use toSeq instead 7a092a3 [Holden Karau] switch resultitr to resultiterable eb06216 [Holden Karau] maybe I should have had a coffee first. use correct import for guava iterables c5075aa [Holden Karau] If guava 14 had iterables 2d06e10 [Holden Karau] Fix Java 8 cogroup tests for the new API 11e730c [Holden Karau] Fix streaming tests 66b583d [Holden Karau] Fix the core test suite to compile 4ed579b [Holden Karau] Refactor from iterator to iterable d052c07 [Holden Karau] Python tests now pass with iterator pandas 3bcd81d [Holden Karau] Revert "Try and make pickling list iterators work" cd1e81c [Holden Karau] Try and make pickling list iterators work c60233a [Holden Karau] Start investigating moving to iterators for python API like the Java/Scala one. tl;dr: We will have to write our own iterator since the default one doesn't pickle well 88a5cef [Holden Karau] Fix cogroup test in JavaAPISuite for streaming a5ee714 [Holden Karau] oops, was checking wrong iterator e687f21 [Holden Karau] Fix groupbykey test in JavaAPISuite of streaming ec8cc3e [Holden Karau] Fix test issues\! 4b0eeb9 [Holden Karau] Switch cast in PairDStreamFunctions fa395c9 [Holden Karau] Revert "Add a join based on the problem in SVD" ec99e32 [Holden Karau] Revert "Revert this but for now put things in list pandas" b692868 [Holden Karau] Revert 7e533f7 [Holden Karau] Fix the bug 8a5153a [Holden Karau] Revert me, but we have some stuff to debug b4e86a9 [Holden Karau] Add a join based on the problem in SVD c4510e2 [Holden Karau] Revert this but for now put things in list pandas b4e0b1d [Holden Karau] Fix style issues 71e8b9f [Holden Karau] I really need to stop calling size on iterators, it is the path of sadness. b1ae51a [Holden Karau] Fix some of the types in the streaming JavaAPI suite. Probably still needs more work 37888ec [Holden Karau] core/tests now pass 249abde [Holden Karau] org.apache.spark.rdd.PairRDDFunctionsSuite passes 6698186 [Holden Karau] Revert "I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy" fe992fe [Holden Karau] hmmm try and fix up basic operation suite 172705c [Holden Karau] Fix Java API suite caafa63 [Holden Karau] I think this might be a bad rabbit hole. Started work to make CoGroupedRDD use iterator and then went crazy 88b3329 [Holden Karau] Fix groupbykey to actually give back an iterator 4991af6 [Holden Karau] Fix some tests be50246 [Holden Karau] Calling size on an iterator is not so good if we want to use it after 687ffbc [Holden Karau] This is the it compiles point of replacing Seq with Iterator and JList with JIterator in the groupby and cogroup signatures
Diffstat (limited to 'streaming/src/main')
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala42
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala29
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala13
3 files changed, 45 insertions, 39 deletions
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index ac451d1913..2ac943d7bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong}
+import java.lang.{Long => JLong, Iterable => JIterable}
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -115,15 +115,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey().mapValues(seqAsJavaList _)
+ def groupByKey(): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey().mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey(numPartitions).mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
@@ -131,8 +131,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
* is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey(partitioner).mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
@@ -196,8 +196,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+ def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _)
}
/**
@@ -211,8 +211,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
+ : JavaPairDStream[K, JIterable[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _)
}
/**
@@ -227,9 +227,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- :JavaPairDStream[K, JList[V]] = {
+ :JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
- .mapValues(seqAsJavaList _)
+ .mapValues(asJavaIterable _)
}
/**
@@ -247,9 +247,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ):JavaPairDStream[K, JList[V]] = {
+ ):JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
- .mapValues(seqAsJavaList _)
+ .mapValues(asJavaIterable _)
}
/**
@@ -518,9 +518,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
- dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
@@ -530,10 +530,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def cogroup[W](
other: JavaPairDStream[K, W],
numPartitions: Int
- ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
@@ -543,10 +543,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def cogroup[W](
other: JavaPairDStream[K, W],
partitioner: Partitioner
- ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 2473496949..354bc132dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -51,7 +51,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): DStream[(K, Seq[V])] = {
+ def groupByKey(): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner())
}
@@ -59,7 +59,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
@@ -67,12 +67,12 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
- .asInstanceOf[DStream[(K, Seq[V])]]
+ .asInstanceOf[DStream[(K, Iterable[V])]]
}
/**
@@ -126,7 +126,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : DStream[(K, Iterable[V])] =
{
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -161,7 +162,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, Seq[V])] = {
+ ): DStream[(K, Iterable[V])] = {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
@@ -180,14 +181,14 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, Seq[V])] = {
- val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
- val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+ ): DStream[(K, Iterable[V])] = {
+ val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
+ val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
self.groupByKey(partitioner)
.window(windowDuration, slideDuration)
.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
- .asInstanceOf[DStream[(K, Seq[V])]]
+ .asInstanceOf[DStream[(K, Iterable[V])]]
}
/**
@@ -438,7 +439,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner())
}
@@ -447,7 +448,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
- : DStream[(K, (Seq[V], Seq[W]))] = {
+ : DStream[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -458,7 +459,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Seq[V], Seq[W]))] = {
+ ): DStream[(K, (Iterable[V], Iterable[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 5f7d3ba26c..7e22268767 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -56,9 +56,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map(t => {
- (t._1, t._2._1, t._2._2.headOption)
+ val itr = t._2._2.iterator
+ val headOption = itr.hasNext match {
+ case true => Some(itr.next())
+ case false => None
+ }
+ (t._1, t._2._1.toSeq, headOption)
})
updateFuncLocal(i)
}
@@ -90,8 +95,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator: Iterator[(K, Seq[V])]) => {
- updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None)))
+ val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
}
val groupedRDD = parentRDD.groupByKey(partitioner)