diff options
Diffstat (limited to 'bagel/src')
-rw-r--r-- | bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala | 20 |
1 files changed, 12 insertions, 8 deletions
diff --git a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala index 70c7474a93..70a99b33d7 100644 --- a/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala +++ b/bagel/src/main/scala/org/apache/spark/bagel/Bagel.scala @@ -220,20 +220,23 @@ object Bagel extends Logging { */ private def comp[K: Manifest, V <: Vertex, M <: Message[K], C]( sc: SparkContext, - grouped: RDD[(K, (Seq[C], Seq[V]))], + grouped: RDD[(K, (Iterable[C], Iterable[V]))], compute: (V, Option[C]) => (V, Array[M]), storageLevel: StorageLevel ): (RDD[(K, (V, Array[M]))], Int, Int) = { var numMsgs = sc.accumulator(0) var numActiveVerts = sc.accumulator(0) - val processed = grouped.flatMapValues { - case (_, vs) if vs.size == 0 => None - case (c, vs) => + val processed = grouped.mapValues(x => (x._1.iterator, x._2.iterator)) + .flatMapValues { + case (_, vs) if !vs.hasNext => None + case (c, vs) => { val (newVert, newMsgs) = - compute(vs(0), c match { - case Seq(comb) => Some(comb) - case Seq() => None - }) + compute(vs.next, + c.hasNext match { + case true => Some(c.next) + case false => None + } + ) numMsgs += newMsgs.size if (newVert.active) { @@ -241,6 +244,7 @@ object Bagel extends Logging { } Some((newVert, newMsgs)) + } }.persist(storageLevel) // Force evaluation of processed RDD for accurate performance measurements |