aboutsummaryrefslogtreecommitdiff
path: root/bagel/src
diff options
context:
space:
mode:
Diffstat (limited to 'bagel/src')
-rw-r--r--bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala20
1 files changed, 12 insertions, 8 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
index 70c7474a93..70a99b33d7 100644
--- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
+++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala
@@ -220,20 +220,23 @@ object Bagel extends Logging {
*/
private def comp[K: Manifest, V <: Vertex, M <: Message[K], C](
sc: SparkContext,
- grouped: RDD[(K, (Seq[C], Seq[V]))],
+ grouped: RDD[(K, (Iterable[C], Iterable[V]))],
compute: (V, Option[C]) => (V, Array[M]),
storageLevel: StorageLevel
): (RDD[(K, (V, Array[M]))], Int, Int) = {
var numMsgs = sc.accumulator(0)
var numActiveVerts = sc.accumulator(0)
- val processed = grouped.flatMapValues {
- case (_, vs) if vs.size == 0 => None
- case (c, vs) =>
+ val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator))
+ .flatMapValues {
+ case (_, vs) if !vs.hasNext => None
+ case (c, vs) => {
val (newVert, newMsgs) =
- compute(vs(0), c match {
- case Seq(comb) => Some(comb)
- case Seq() => None
- })
+ compute(vs.next,
+ c.hasNext match {
+ case true => Some(c.next)
+ case false => None
+ }
+ )
numMsgs += newMsgs.size
if (newVert.active) {
@@ -241,6 +244,7 @@ object Bagel extends Logging {
}
Some((newVert, newMsgs))
+ }
}.persist(storageLevel)
// Force evaluation of processed RDD for accurate performance measurements