aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--bagel/src/main/scala/bagel/ShortestPath.scala14
1 files changed, 5 insertions, 9 deletions
diff --git a/bagel/src/main/scala/bagel/ShortestPath.scala b/bagel/src/main/scala/bagel/ShortestPath.scala
index 2af4dc5867..38f533728d 100644
--- a/bagel/src/main/scala/bagel/ShortestPath.scala
+++ b/bagel/src/main/scala/bagel/ShortestPath.scala
@@ -5,7 +5,6 @@ import spark.SparkContext._
import scala.math.min
-/*
object ShortestPath {
def main(args: Array[String]) {
if (args.length < 4) {
@@ -26,7 +25,7 @@ object ShortestPath {
.filter(!_.matches("^\\s*#.*"))
.map(line => line.split("\t")))
- val vertices: RDD[(String, Either[SPVertex, SPMessage])] =
+ val vertices: RDD[(String, SPVertex)] =
(lines.groupBy(line => line(0))
.map {
case (vertexId, lines) => {
@@ -35,18 +34,16 @@ object ShortestPath {
new SPEdge(targetId, edgeValue.toInt)
}
- (vertexId, Left[SPVertex, SPMessage](new SPVertex(vertexId, Int.MaxValue, outEdges, true)))
+ (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true))
}
})
- val messages: RDD[(String, Either[SPVertex, SPMessage])] =
+ val messages: RDD[(String, SPMessage)] =
(lines.filter(_.length == 2)
.map {
case Array(vertexId, messageValue) =>
- (vertexId, Right[SPVertex, SPMessage](new SPMessage(vertexId, messageValue.toInt)))
+ (vertexId, new SPMessage(vertexId, messageValue.toInt))
})
-
- val graph: RDD[(String, Either[SPVertex, SPMessage])] = vertices ++ messages
System.err.println("Read "+vertices.count()+" vertices and "+
messages.count()+" messages.")
@@ -55,7 +52,7 @@ object ShortestPath {
def messageCombiner(minSoFar: Int, message: SPMessage): Int =
min(minSoFar, message.value)
- val result = Pregel.run(sc, graph, numSplits, messageCombiner, () => Int.MaxValue, min _) {
+ val result = Pregel.run(sc, vertices, messages, numSplits, messageCombiner, () => Int.MaxValue, min _) {
(self: SPVertex, messageMinValue: Int, superstep: Int) =>
val newValue = min(self.value, messageMinValue)
@@ -83,4 +80,3 @@ object ShortestPath {
@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
@serializable class SPMessage(val targetId: String, val value: Int) extends Message
-*/