From 19122af78746e501207c5345bafc6b229e0101d1 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Wed, 13 Apr 2011 17:19:41 -0700 Subject: Update ShortestPath to work with controllable partitioning --- bagel/src/main/scala/bagel/ShortestPath.scala | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) (limited to 'bagel') diff --git a/bagel/src/main/scala/bagel/ShortestPath.scala b/bagel/src/main/scala/bagel/ShortestPath.scala index 2af4dc5867..38f533728d 100644 --- a/bagel/src/main/scala/bagel/ShortestPath.scala +++ b/bagel/src/main/scala/bagel/ShortestPath.scala @@ -5,7 +5,6 @@ import spark.SparkContext._ import scala.math.min -/* object ShortestPath { def main(args: Array[String]) { if (args.length < 4) { @@ -26,7 +25,7 @@ object ShortestPath { .filter(!_.matches("^\\s*#.*")) .map(line => line.split("\t"))) - val vertices: RDD[(String, Either[SPVertex, SPMessage])] = + val vertices: RDD[(String, SPVertex)] = (lines.groupBy(line => line(0)) .map { case (vertexId, lines) => { @@ -35,18 +34,16 @@ object ShortestPath { new SPEdge(targetId, edgeValue.toInt) } - (vertexId, Left[SPVertex, SPMessage](new SPVertex(vertexId, Int.MaxValue, outEdges, true))) + (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true)) } }) - val messages: RDD[(String, Either[SPVertex, SPMessage])] = + val messages: RDD[(String, SPMessage)] = (lines.filter(_.length == 2) .map { case Array(vertexId, messageValue) => - (vertexId, Right[SPVertex, SPMessage](new SPMessage(vertexId, messageValue.toInt))) + (vertexId, new SPMessage(vertexId, messageValue.toInt)) }) - - val graph: RDD[(String, Either[SPVertex, SPMessage])] = vertices ++ messages System.err.println("Read "+vertices.count()+" vertices and "+ messages.count()+" messages.") @@ -55,7 +52,7 @@ object ShortestPath { def messageCombiner(minSoFar: Int, message: SPMessage): Int = min(minSoFar, message.value) - val result = Pregel.run(sc, graph, numSplits, messageCombiner, () => Int.MaxValue, min _) { + val result = Pregel.run(sc, vertices, messages, numSplits, messageCombiner, () => Int.MaxValue, min _) { (self: SPVertex, messageMinValue: Int, superstep: Int) => val newValue = min(self.value, messageMinValue) @@ -83,4 +80,3 @@ object ShortestPath { @serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex @serializable class SPEdge(val targetId: String, val value: Int) extends Edge @serializable class SPMessage(val targetId: String, val value: Int) extends Message -*/ -- cgit v1.2.3