aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala20
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala36
-rw-r--r--core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala6
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala39
-rw-r--r--core/src/main/scala/org/apache/spark/rdd/RDD.scala6
-rw-r--r--core/src/test/java/org/apache/spark/JavaAPISuite.java20
-rw-r--r--core/src/test/scala/org/apache/spark/FailureSuite.scala4
-rw-r--r--core/src/test/scala/org/apache/spark/PipedRDDSuite.scala2
-rw-r--r--core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala12
-rw-r--r--core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala4
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java21
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala14
-rw-r--r--extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java11
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala6
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala4
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala6
-rw-r--r--python/pyspark/join.py5
-rw-r--r--python/pyspark/rdd.py10
-rw-r--r--python/pyspark/resultiterable.py33
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala42
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala29
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala13
-rw-r--r--streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java58
-rw-r--r--streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala4
24 files changed, 252 insertions, 153 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70c7474a93..70a99b33d7 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -220,20 +220,23 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
- grouped: RDD[(K, (Seq[C], Seq[V]))],
+ grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
- val processed = grouped.flatMapValues {
- case (_, vs) if vs.size == 0 => None
- case (c, vs) =>
+ val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
+ .flatMapValues {
+ case (_, vs) if !vs.hasNext => None
+ case (c, vs) => {
val (newVert, newMsgs) =
- compute(vs(0), c match {
- case Seq(comb) => Some(comb)
- case Seq() => None
- })
+ compute(vs.next,
+ c.hasNext match {
+ case true => Some(c.next)
+ case false => None
+ }
+ )
numMsgs += newMsgs.size
if (newVert.active) {
@@ -241,6 +244,7 @@ object Bagel extends Logging {
}
Some((newVert, newMsgs))
+ }
}.persist(storageLevel)
// Force evaluation of processed RDD for accurate performance measurements
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 9596dbaf75..e6c5d85917 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -18,6 +18,7 @@
package org.apache.spark.api.java
import java.util.{Comparator, List => JList}
+import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -250,14 +251,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
- def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JList[V]] =
+ def groupByKey(partitioner: Partitioner): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(partitioner)))
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): JavaPairRDD[K, JList[V]] =
+ def groupByKey(numPartitions: Int): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey(numPartitions)))
/**
@@ -367,7 +368,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
- def groupByKey(): JavaPairRDD[K, JList[V]] =
+ def groupByKey(): JavaPairRDD[K, JIterable[V]] =
fromRDD(groupByResultToJava(rdd.groupByKey()))
/**
@@ -462,7 +463,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], partitioner: Partitioner)
- : JavaPairRDD[K, (JList[V], JList[W])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, partitioner)))
/**
@@ -470,14 +471,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2],
- partitioner: Partitioner): JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ partitioner: Partitioner): JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, partitioner)))
/**
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def cogroup[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other)))
/**
@@ -485,7 +486,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2)))
/**
@@ -493,7 +494,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* list of values for that key in `this` as well as `other`.
*/
def cogroup[W](other: JavaPairRDD[K, W], numPartitions: Int)
- : JavaPairRDD[K, (JList[V], JList[W])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.cogroup(other, numPartitions)))
/**
@@ -501,16 +502,16 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2], numPartitions: Int)
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.cogroup(other1, other2, numPartitions)))
/** Alias for cogroup. */
- def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JList[V], JList[W])] =
+ def groupWith[W](other: JavaPairRDD[K, W]): JavaPairRDD[K, (JIterable[V], JIterable[W])] =
fromRDD(cogroupResultToJava(rdd.groupWith(other)))
/** Alias for cogroup. */
def groupWith[W1, W2](other1: JavaPairRDD[K, W1], other2: JavaPairRDD[K, W2])
- : JavaPairRDD[K, (JList[V], JList[W1], JList[W2])] =
+ : JavaPairRDD[K, (JIterable[V], JIterable[W1], JIterable[W2])] =
fromRDD(cogroupResult2ToJava(rdd.groupWith(other1, other2)))
/**
@@ -695,21 +696,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
object JavaPairRDD {
private[spark]
- def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Seq[T])]): RDD[(K, JList[T])] = {
- rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList)
+ def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = {
+ rddToPairRDDFunctions(rdd).mapValues(asJavaIterable)
}
private[spark]
def cogroupResultToJava[K: ClassTag, V, W](
- rdd: RDD[(K, (Seq[V], Seq[W]))]): RDD[(K, (JList[V], JList[W]))] = {
- rddToPairRDDFunctions(rdd).mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
+ rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = {
+ rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2)))
}
private[spark]
def cogroupResult2ToJava[K: ClassTag, V, W1, W2](
- rdd: RDD[(K, (Seq[V], Seq[W1], Seq[W2]))]): RDD[(K, (JList[V], JList[W1], JList[W2]))] = {
+ rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))])
+ : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = {
rddToPairRDDFunctions(rdd)
- .mapValues(x => (seqAsJavaList(x._1), seqAsJavaList(x._2), seqAsJavaList(x._3)))
+ .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3)))
}
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
index 6e8ec8e0c7..ae577b500c 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
@@ -17,7 +17,7 @@
package org.apache.spark.api.java
-import java.util.{Comparator, Iterator => JIterator, List => JList}
+import java.util.{Comparator, List => JList, Iterator => JIterator}
import java.lang.{Iterable => JIterable}
import scala.collection.JavaConversions._
@@ -204,7 +204,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
+ def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(fakeClassTag)))
@@ -214,7 +214,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
+ def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JIterable[T]] = {
implicit val ctagK: ClassTag[K] = fakeClassTag
implicit val ctagV: ClassTag[JList[T]] = fakeClassTag
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(fakeClassTag[K])))
diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
index 14386ff5b9..a92a84b534 100644
--- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
@@ -261,7 +261,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
*/
- def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = {
+ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
@@ -270,14 +270,14 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def mergeCombiners(c1: ArrayBuffer[V], c2: ArrayBuffer[V]) = c1 ++ c2
val bufs = combineByKey[ArrayBuffer[V]](
createCombiner _, mergeValue _, mergeCombiners _, partitioner, mapSideCombine=false)
- bufs.asInstanceOf[RDD[(K, Seq[V])]]
+ bufs.mapValues(_.toIterable)
}
/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): RDD[(K, Seq[V])] = {
+ def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = {
groupByKey(new HashPartitioner(numPartitions))
}
@@ -298,7 +298,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
*/
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
- for (v <- vs.iterator; w <- ws.iterator) yield (v, w)
+ for (v <- vs; w <- ws) yield (v, w)
}
}
@@ -311,9 +311,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
def leftOuterJoin[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, Option[W]))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (ws.isEmpty) {
- vs.iterator.map(v => (v, None))
+ vs.map(v => (v, None))
} else {
- for (v <- vs.iterator; w <- ws.iterator) yield (v, Some(w))
+ for (v <- vs; w <- ws) yield (v, Some(w))
}
}
}
@@ -328,9 +328,9 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
: RDD[(K, (Option[V], W))] = {
this.cogroup(other, partitioner).flatMapValues { case (vs, ws) =>
if (vs.isEmpty) {
- ws.iterator.map(w => (None, w))
+ ws.map(w => (None, w))
} else {
- for (v <- vs.iterator; w <- ws.iterator) yield (Some(v), w)
+ for (v <- vs; w <- ws) yield (Some(v), w)
}
}
}
@@ -358,7 +358,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
*/
- def groupByKey(): RDD[(K, Seq[V])] = {
+ def groupByKey(): RDD[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(self))
}
@@ -453,7 +453,8 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
+ : RDD[(K, (Iterable[V], Iterable[W]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
@@ -468,13 +469,15 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], partitioner: Partitioner)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
if (partitioner.isInstanceOf[HashPartitioner] && getKeyClass().isArray) {
throw new SparkException("Default partitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other1, other2), partitioner)
cg.mapValues { case Seq(vs, w1s, w2s) =>
- (vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
+ (vs.asInstanceOf[Seq[V]],
+ w1s.asInstanceOf[Seq[W1]],
+ w2s.asInstanceOf[Seq[W2]])
}
}
@@ -482,7 +485,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
@@ -491,7 +494,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
@@ -499,7 +502,7 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the
* list of values for that key in `this` as well as `other`.
*/
- def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W](other: RDD[(K, W)], numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, new HashPartitioner(numPartitions))
}
@@ -508,18 +511,18 @@ class PairRDDFunctions[K: ClassTag, V: ClassTag](self: RDD[(K, V)])
* tuple with the list of values for that key in `this`, `other1` and `other2`.
*/
def cogroup[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)], numPartitions: Int)
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, new HashPartitioner(numPartitions))
}
/** Alias for cogroup. */
- def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Seq[V], Seq[W]))] = {
+ def groupWith[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(self, other))
}
/** Alias for cogroup. */
def groupWith[W1, W2](other1: RDD[(K, W1)], other2: RDD[(K, W2)])
- : RDD[(K, (Seq[V], Seq[W1], Seq[W2]))] = {
+ : RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))] = {
cogroup(other1, other2, defaultPartitioner(self, other1, other2))
}
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index bf3c57ad41..74fa2a4fcd 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -438,20 +438,20 @@ abstract class RDD[T: ClassTag](
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K): RDD[(K, Iterable[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
- def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
+ def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Iterable[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items.
*/
- def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
+ def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Iterable[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 762405be2a..ab2fdac553 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -18,10 +18,12 @@
package org.apache.spark;
import java.io.*;
+import java.lang.StringBuilder;
import java.util.*;
import scala.Tuple2;
+import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.base.Optional;
import com.google.common.base.Charsets;
@@ -197,7 +199,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("Oranges", "Citrus")
));
Assert.assertEquals(2, categories.lookup("Oranges").size());
- Assert.assertEquals(2, categories.groupByKey().lookup("Oranges").get(0).size());
+ Assert.assertEquals(2, Iterables.size(categories.groupByKey().lookup("Oranges").get(0)));
}
@Test
@@ -209,15 +211,15 @@ public class JavaAPISuite implements Serializable {
return x % 2 == 0;
}
};
- JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
@SuppressWarnings("unchecked")
@@ -232,9 +234,9 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("Oranges", 2),
new Tuple2<String, Integer>("Apples", 3)
));
- JavaPairRDD<String, Tuple2<List<String>, List<Integer>>> cogrouped = categories.cogroup(prices);
- Assert.assertEquals("[Fruit, Citrus]", cogrouped.lookup("Oranges").get(0)._1().toString());
- Assert.assertEquals("[2]", cogrouped.lookup("Oranges").get(0)._2().toString());
+ JavaPairRDD<String, Tuple2<Iterable<String>, Iterable<Integer>>> cogrouped = categories.cogroup(prices);
+ Assert.assertEquals("[Fruit, Citrus]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._1()));
+ Assert.assertEquals("[2]", Iterables.toString(cogrouped.lookup("Oranges").get(0)._2()));
cogrouped.collect();
}
diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala
index f3fb64d87a..12dbebcb28 100644
--- a/core/src/test/scala/org/apache/spark/FailureSuite.scala
+++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala
@@ -72,7 +72,7 @@ class FailureSuite extends FunSuite with LocalSparkContext {
throw new Exception("Intentional task failure")
}
}
- (k, v(0) * v(0))
+ (k, v.head * v.head)
}.collect()
FailureSuiteState.synchronized {
assert(FailureSuiteState.tasksRun === 4)
@@ -137,5 +137,3 @@ class FailureSuite extends FunSuite with LocalSparkContext {
// TODO: Need to add tests with shuffle fetch failures.
}
-
-
diff --git a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
index 627e9b5cd9..867b28cc0d 100644
--- a/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/PipedRDDSuite.scala
@@ -85,7 +85,7 @@ class PipedRDDSuite extends FunSuite with SharedSparkContext {
(f: String => Unit) => {
bl.value.map(f(_)); f("\u0001")
},
- (i: Tuple2[String, Seq[String]], f: String => Unit) => {
+ (i: Tuple2[String, Iterable[String]], f: String => Unit) => {
for (e <- i._2) {
f(e + "_")
}
diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index f9e994b13d..8f3e6bd21b 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -225,11 +225,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
- assert(joined.toSet === Set(
- (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
- (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
- (3, (ArrayBuffer(1), ArrayBuffer())),
- (4, (ArrayBuffer(), ArrayBuffer('w')))
+ val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet
+ assert(joinedSet === Set(
+ (1, (List(1, 2), List('x'))),
+ (2, (List(1), List('y', 'z'))),
+ (3, (List(1), List())),
+ (4, (List(), List('w')))
))
}
@@ -447,4 +448,3 @@ class ConfigTestFormat() extends FakeFormat() with Configurable {
super.getRecordWriter(p1)
}
}
-
diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fce1184d46..cdebefb675 100644
--- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -174,9 +174,9 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
assert(result1.toSet == Set[(Int, Int)]((0, 5), (1, 5)))
// groupByKey
- val result2 = rdd.groupByKey().collect()
+ val result2 = rdd.groupByKey().collect().map(x => (x._1, x._2.toList)).toSet
assert(result2.toSet == Set[(Int, Seq[Int])]
- ((0, ArrayBuffer[Int](1, 1, 1, 1, 1)), (1, ArrayBuffer[Int](1, 1, 1, 1, 1))))
+ ((0, List[Int](1, 1, 1, 1, 1)), (1, List[Int](1, 1, 1, 1, 1))))
}
test("simple cogroup") {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index eb70fb5475..8513ba07e7 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -17,7 +17,10 @@
package org.apache.spark.examples;
+
import scala.Tuple2;
+
+import com.google.common.collect.Iterables;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -26,8 +29,9 @@ import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
-import java.util.List;
import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
import java.util.regex.Pattern;
/**
@@ -66,7 +70,7 @@ public final class JavaPageRank {
JavaRDD<String> lines = ctx.textFile(args[1], 1);
// Loads all URLs from input file and initialize their neighbors.
- JavaPairRDD<String, List<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
+ JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
String[] parts = SPACES.split(s);
@@ -75,9 +79,9 @@ public final class JavaPageRank {
}).distinct().groupByKey().cache();
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
- JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
+ JavaPairRDD<String, Double> ranks = links.mapValues(new Function<Iterable<String>, Double>() {
@Override
- public Double call(List<String> rs) {
+ public Double call(Iterable<String> rs) {
return 1.0;
}
});
@@ -86,12 +90,13 @@ public final class JavaPageRank {
for (int current = 0; current < Integer.parseInt(args[2]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
- .flatMapToPair(new PairFlatMapFunction<Tuple2<List<String>, Double>, String, Double>() {
+ .flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
@Override
- public Iterable<Tuple2<String, Double>> call(Tuple2<List<String>, Double> s) {
+ public Iterable<Tuple2<String, Double>> call(Tuple2<Iterable<String>, Double> s) {
+ int urlCount = Iterables.size(s._1);
List<Tuple2<String, Double>> results = new ArrayList<Tuple2<String, Double>>();
- for (String n : s._1()) {
- results.add(new Tuple2<String, Double>(n, s._2() / s._1().size()));
+ for (String n : s._1) {
+ results.add(new Tuple2<String, Double>(n, s._2() / urlCount));
}
return results;
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
index 27afa6b642..7aac6a1359 100644
--- a/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/bagel/WikipediaPageRankStandalone.scala
@@ -115,12 +115,16 @@ object WikipediaPageRankStandalone {
var ranks = links.mapValues { edges => defaultRank }
for (i <- 1 to numIterations) {
val contribs = links.groupWith(ranks).flatMap {
- case (id, (linksWrapper, rankWrapper)) =>
- if (linksWrapper.length > 0) {
- if (rankWrapper.length > 0) {
- linksWrapper(0).map(dest => (dest, rankWrapper(0) / linksWrapper(0).size))
+ case (id, (linksWrapperIterable, rankWrapperIterable)) =>
+ val linksWrapper = linksWrapperIterable.iterator
+ val rankWrapper = rankWrapperIterable.iterator
+ if (linksWrapper.hasNext) {
+ val linksWrapperHead = linksWrapper.next
+ if (rankWrapper.hasNext) {
+ val rankWrapperHead = rankWrapper.next
+ linksWrapperHead.map(dest => (dest, rankWrapperHead / linksWrapperHead.size))
} else {
- linksWrapper(0).map(dest => (dest, defaultRank / linksWrapper(0).size))
+ linksWrapperHead.map(dest => (dest, defaultRank / linksWrapperHead.size))
}
} else {
Array[(String, Double)]()
diff --git a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
index f67251217e..7eb8b45fc3 100644
--- a/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
+++ b/extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java
@@ -23,6 +23,7 @@ import java.util.*;
import scala.Tuple2;
+import com.google.common.collections.Iterables;
import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
@@ -85,15 +86,15 @@ public class Java8APISuite implements Serializable {
public void groupBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
- JavaPairRDD<Boolean, List<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
+ JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
- Assert.assertEquals(2, oddsAndEvens.lookup(true).get(0).size()); // Evens
- Assert.assertEquals(5, oddsAndEvens.lookup(false).get(0).size()); // Odds
+ Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
+ Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
@Test
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
index 3e7cc648d1..0d97b7d92f 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/SVD.scala
@@ -69,11 +69,11 @@ class SVD {
/**
* Compute SVD using the current set parameters
- * Returns (U, S, V) such that A = USV^T
+ * Returns (U, S, V) such that A = USV^T
* U is a row-by-row dense matrix
* S is a simple double array of singular values
* V is a 2d array matrix
- * See [[denseSVD]] for more documentation
+ * See [[denseSVD]] for more documentation
*/
def compute(matrix: RDD[Array[Double]]):
(RDD[Array[Double]], Array[Double], Array[Array[Double]]) = {
@@ -393,5 +393,3 @@ object SVD {
System.exit(0)
}
}
-
-
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 0cc9f48769..3124fac326 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -421,12 +421,12 @@ class ALS private (
* Compute the new feature vectors for a block of the users matrix given the list of factors
* it received from each product and its InLinkBlock.
*/
- private def updateBlock(messages: Seq[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
+ private def updateBlock(messages: Iterable[(Int, Array[Array[Double]])], inLinkBlock: InLinkBlock,
rank: Int, lambda: Double, alpha: Double, YtY: Option[Broadcast[DoubleMatrix]])
: Array[Array[Double]] =
{
// Sort the incoming block factor messages by block ID and make them an array
- val blockFactors = messages.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
+ val blockFactors = messages.toSeq.sortBy(_._1).map(_._2).toArray // Array[Array[Double]]
val numBlocks = blockFactors.length
val numUsers = inLinkBlock.elementIds.length
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
index afe081295b..87aac34757 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LAUtils.scala
@@ -38,8 +38,10 @@ object LAUtils {
case (i, cols) =>
val rowArray = Array.ofDim[Double](n)
var j = 0
- while (j < cols.size) {
- rowArray(cols(j)._1) = cols(j)._2
+ val colsItr = cols.iterator
+ while (colsItr.hasNext) {
+ val element = colsItr.next
+ rowArray(element._1) = element._2
j += 1
}
MatrixRow(i, rowArray)
diff --git a/python/pyspark/join.py b/python/pyspark/join.py
index 5f4294fb1b..6f94d26ef8 100644
--- a/python/pyspark/join.py
+++ b/python/pyspark/join.py
@@ -31,11 +31,12 @@ THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
+from pyspark.resultiterable import ResultIterable
def _do_python_join(rdd, other, numPartitions, dispatch):
vs = rdd.map(lambda (k, v): (k, (1, v)))
ws = other.map(lambda (k, v): (k, (2, v)))
- return vs.union(ws).groupByKey(numPartitions).flatMapValues(dispatch)
+ return vs.union(ws).groupByKey(numPartitions).flatMapValues(lambda x : dispatch(x.__iter__()))
def python_join(rdd, other, numPartitions):
@@ -88,5 +89,5 @@ def python_cogroup(rdd, other, numPartitions):
vbuf.append(v)
elif n == 2:
wbuf.append(v)
- return (vbuf, wbuf)
+ return (ResultIterable(vbuf), ResultIterable(wbuf))
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index fb27863e07..91fc7e637e 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -38,6 +38,7 @@ from pyspark.join import python_join, python_left_outer_join, \
from pyspark.statcounter import StatCounter
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
+from pyspark.resultiterable import ResultIterable
from py4j.java_collections import ListConverter, MapConverter
@@ -1118,7 +1119,7 @@ class RDD(object):
Hash-partitions the resulting RDD with into numPartitions partitions.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(x.groupByKey().collect())
+ >>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])]
"""
@@ -1133,7 +1134,7 @@ class RDD(object):
return a + b
return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
- numPartitions)
+ numPartitions).mapValues(lambda x: ResultIterable(x))
# TODO: add tests
def flatMapValues(self, f):
@@ -1180,7 +1181,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup(self, other, numPartitions)
@@ -1217,7 +1218,7 @@ class RDD(object):
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
- >>> sorted(x.cogroup(y).collect())
+ >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
[(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
"""
return self.map(lambda x: (f(x), x))
@@ -1317,7 +1318,6 @@ class RDD(object):
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
-
class PipelinedRDD(RDD):
"""
Pipelined maps:
diff --git a/python/pyspark/resultiterable.py b/python/pyspark/resultiterable.py
new file mode 100644
index 0000000000..7f418f8d2e
--- /dev/null
+++ b/python/pyspark/resultiterable.py
@@ -0,0 +1,33 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__all__ = ["ResultIterable"]
+
+import collections
+
+class ResultIterable(collections.Iterable):
+ """
+ A special result iterable. This is used because the standard iterator can not be pickled
+ """
+ def __init__(self, data):
+ self.data = data
+ self.index = 0
+ self.maxindex = len(data)
+ def __iter__(self):
+ return iter(self.data)
+ def __len__(self):
+ return len(self.data)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
index ac451d1913..2ac943d7bf 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaPairDStream.scala
@@ -17,7 +17,7 @@
package org.apache.spark.streaming.api.java
-import java.lang.{Long => JLong}
+import java.lang.{Long => JLong, Iterable => JIterable}
import java.util.{List => JList}
import scala.collection.JavaConversions._
@@ -115,15 +115,15 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey().mapValues(seqAsJavaList _)
+ def groupByKey(): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey().mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
+ def groupByKey(numPartitions: Int): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey(numPartitions).mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `groupByKey` on each RDD of `this` DStream.
@@ -131,8 +131,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* single sequence to generate the RDDs of the new DStream. org.apache.spark.Partitioner
* is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
- dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
+ def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JIterable[V]] =
+ dstream.groupByKey(partitioner).mapValues(asJavaIterable _)
/**
* Return a new DStream by applying `reduceByKey` to each RDD. The values for each key are
@@ -196,8 +196,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
+ def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JIterable[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration).mapValues(asJavaIterable _)
}
/**
@@ -211,8 +211,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
- : JavaPairDStream[K, JList[V]] = {
- dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
+ : JavaPairDStream[K, JIterable[V]] = {
+ dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(asJavaIterable _)
}
/**
@@ -227,9 +227,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
- :JavaPairDStream[K, JList[V]] = {
+ :JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
- .mapValues(seqAsJavaList _)
+ .mapValues(asJavaIterable _)
}
/**
@@ -247,9 +247,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ):JavaPairDStream[K, JList[V]] = {
+ ):JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
- .mapValues(seqAsJavaList _)
+ .mapValues(asJavaIterable _)
}
/**
@@ -518,9 +518,9 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
+ def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
- dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ dstream.cogroup(other.dstream).mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
@@ -530,10 +530,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def cogroup[W](
other: JavaPairDStream[K, W],
numPartitions: Int
- ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, numPartitions)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
@@ -543,10 +543,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
def cogroup[W](
other: JavaPairDStream[K, W],
partitioner: Partitioner
- ): JavaPairDStream[K, (JList[V], JList[W])] = {
+ ): JavaPairDStream[K, (JIterable[V], JIterable[W])] = {
implicit val cm: ClassTag[W] = fakeClassTag
dstream.cogroup(other.dstream, partitioner)
- .mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
+ .mapValues(t => (asJavaIterable(t._1), asJavaIterable((t._2))))
}
/**
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
index 2473496949..354bc132dc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala
@@ -51,7 +51,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
- def groupByKey(): DStream[(K, Seq[V])] = {
+ def groupByKey(): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner())
}
@@ -59,7 +59,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
- def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
+ def groupByKey(numPartitions: Int): DStream[(K, Iterable[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
@@ -67,12 +67,12 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Return a new DStream by applying `groupByKey` on each RDD. The supplied
* org.apache.spark.Partitioner is used to control the partitioning of each RDD.
*/
- def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
+ def groupByKey(partitioner: Partitioner): DStream[(K, Iterable[V])] = {
val createCombiner = (v: V) => ArrayBuffer[V](v)
val mergeValue = (c: ArrayBuffer[V], v: V) => (c += v)
val mergeCombiner = (c1: ArrayBuffer[V], c2: ArrayBuffer[V]) => (c1 ++ c2)
combineByKey(createCombiner, mergeValue, mergeCombiner, partitioner)
- .asInstanceOf[DStream[(K, Seq[V])]]
+ .asInstanceOf[DStream[(K, Iterable[V])]]
}
/**
@@ -126,7 +126,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Seq[V])] = {
+ def groupByKeyAndWindow(windowDuration: Duration): DStream[(K, Iterable[V])] = {
groupByKeyAndWindow(windowDuration, self.slideDuration, defaultPartitioner())
}
@@ -140,7 +140,8 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
- def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration): DStream[(K, Seq[V])] =
+ def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
+ : DStream[(K, Iterable[V])] =
{
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner())
}
@@ -161,7 +162,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
- ): DStream[(K, Seq[V])] = {
+ ): DStream[(K, Iterable[V])] = {
groupByKeyAndWindow(windowDuration, slideDuration, defaultPartitioner(numPartitions))
}
@@ -180,14 +181,14 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
- ): DStream[(K, Seq[V])] = {
- val createCombiner = (v: Seq[V]) => new ArrayBuffer[V] ++= v
- val mergeValue = (buf: ArrayBuffer[V], v: Seq[V]) => buf ++= v
+ ): DStream[(K, Iterable[V])] = {
+ val createCombiner = (v: Iterable[V]) => new ArrayBuffer[V] ++= v
+ val mergeValue = (buf: ArrayBuffer[V], v: Iterable[V]) => buf ++= v
val mergeCombiner = (buf1: ArrayBuffer[V], buf2: ArrayBuffer[V]) => buf1 ++= buf2
self.groupByKey(partitioner)
.window(windowDuration, slideDuration)
.combineByKey[ArrayBuffer[V]](createCombiner, mergeValue, mergeCombiner, partitioner)
- .asInstanceOf[DStream[(K, Seq[V])]]
+ .asInstanceOf[DStream[(K, Iterable[V])]]
}
/**
@@ -438,7 +439,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with Spark's default number
* of partitions.
*/
- def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
+ def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner())
}
@@ -447,7 +448,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def cogroup[W: ClassTag](other: DStream[(K, W)], numPartitions: Int)
- : DStream[(K, (Seq[V], Seq[W]))] = {
+ : DStream[(K, (Iterable[V], Iterable[W]))] = {
cogroup(other, defaultPartitioner(numPartitions))
}
@@ -458,7 +459,7 @@ class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
- ): DStream[(K, (Seq[V], Seq[W]))] = {
+ ): DStream[(K, (Iterable[V], Iterable[W]))] = {
self.transformWith(
other,
(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.cogroup(rdd2, partitioner)
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
index 5f7d3ba26c..7e22268767 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/StateDStream.scala
@@ -56,9 +56,14 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the cogrouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator: Iterator[(K, (Seq[V], Seq[S]))]) => {
+ val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map(t => {
- (t._1, t._2._1, t._2._2.headOption)
+ val itr = t._2._2.iterator
+ val headOption = itr.hasNext match {
+ case true => Some(itr.next())
+ case false => None
+ }
+ (t._1, t._2._1.toSeq, headOption)
})
updateFuncLocal(i)
}
@@ -90,8 +95,8 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
// first map the grouped tuple to tuples of required type,
// and then apply the update function
val updateFuncLocal = updateFunc
- val finalFunc = (iterator: Iterator[(K, Seq[V])]) => {
- updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2, None)))
+ val finalFunc = (iterator: Iterator[(K, Iterable[V])]) => {
+ updateFuncLocal(iterator.map(tuple => (tuple._1, tuple._2.toSeq, None)))
}
val groupedRDD = parentRDD.groupByKey(partitioner)
diff --git a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
index e93bf18b6d..13fa64894b 100644
--- a/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
+++ b/streaming/src/test/java/org/apache/spark/streaming/JavaAPISuite.java
@@ -23,6 +23,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.io.*;
import java.util.*;
+import java.lang.Iterable;
import com.google.common.base.Optional;
import com.google.common.collect.Lists;
@@ -45,6 +46,18 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite extends LocalJavaStreamingContext implements Serializable {
+ public void equalIterator(Iterator<?> a, Iterator<?> b) {
+ while (a.hasNext() && b.hasNext()) {
+ Assert.assertEquals(a.next(), b.next());
+ }
+ Assert.assertEquals(a.hasNext(), b.hasNext());
+ }
+
+ public void equalIterable(Iterable<?> a, Iterable<?> b) {
+ equalIterator(a.iterator(), b.iterator());
+ }
+
+
@SuppressWarnings("unchecked")
@Test
public void testCount() {
@@ -1016,11 +1029,24 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
+ JavaPairDStream<String, Iterable<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
+ List<List<Tuple2<String, Iterable<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected.size(), result.size());
+ Iterator<List<Tuple2<String, Iterable<String>>>> resultItr = result.iterator();
+ Iterator<List<Tuple2<String, List<String>>>> expectedItr = expected.iterator();
+ while (resultItr.hasNext() && expectedItr.hasNext()) {
+ Iterator<Tuple2<String, Iterable<String>>> resultElements = resultItr.next().iterator();
+ Iterator<Tuple2<String, List<String>>> expectedElements = expectedItr.next().iterator();
+ while (resultElements.hasNext() && expectedElements.hasNext()) {
+ Tuple2<String, Iterable<String>> resultElement = resultElements.next();
+ Tuple2<String, List<String>> expectedElement = expectedElements.next();
+ Assert.assertEquals(expectedElement._1(), resultElement._1());
+ equalIterable(expectedElement._2(), resultElement._2());
+ }
+ Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+ }
}
@SuppressWarnings("unchecked")
@@ -1128,7 +1154,7 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
- JavaPairDStream<String, List<Integer>> groupWindowed =
+ JavaPairDStream<String, Iterable<Integer>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<Integer>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
@@ -1471,11 +1497,25 @@ public class JavaAPISuite extends LocalJavaStreamingContext implements Serializa
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
- JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
+ JavaPairDStream<String, Tuple2<Iterable<String>, Iterable<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
- List<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
-
- Assert.assertEquals(expected, result);
+ List<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
+
+ Assert.assertEquals(expected.size(), result.size());
+ Iterator<List<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>>> resultItr = result.iterator();
+ Iterator<List<Tuple2<String, Tuple2<List<String>, List<String>>>>> expectedItr = expected.iterator();
+ while (resultItr.hasNext() && expectedItr.hasNext()) {
+ Iterator<Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>>> resultElements = resultItr.next().iterator();
+ Iterator<Tuple2<String, Tuple2<List<String>, List<String>>>> expectedElements = expectedItr.next().iterator();
+ while (resultElements.hasNext() && expectedElements.hasNext()) {
+ Tuple2<String, Tuple2<Iterable<String>, Iterable<String>>> resultElement = resultElements.next();
+ Tuple2<String, Tuple2<List<String>, List<String>>> expectedElement = expectedElements.next();
+ Assert.assertEquals(expectedElement._1(), resultElement._1());
+ equalIterable(expectedElement._2()._1(), resultElement._2()._1());
+ equalIterable(expectedElement._2()._2(), resultElement._2()._2());
+ }
+ Assert.assertEquals(resultElements.hasNext(), expectedElements.hasNext());
+ }
}
@SuppressWarnings("unchecked")
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index bb73dbf29b..8aec27e394 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -117,7 +117,7 @@ class BasicOperationsSuite extends TestSuiteBase {
test("groupByKey") {
testOperation(
Seq( Seq("a", "a", "b"), Seq("", ""), Seq() ),
- (s: DStream[String]) => s.map(x => (x, 1)).groupByKey(),
+ (s: DStream[String]) => s.map(x => (x, 1)).groupByKey().mapValues(_.toSeq),
Seq( Seq(("a", Seq(1, 1)), ("b", Seq(1))), Seq(("", Seq(1, 1))), Seq() ),
true
)
@@ -251,7 +251,7 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
- s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x")))
+ s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}