aboutsummaryrefslogtreecommitdiff
path: root/bagel
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2011-04-13 17:19:41 -0700
committerAnkur Dave <ankurdave@gmail.com>2011-05-03 15:39:39 -0700
commit19122af78746e501207c5345bafc6b229e0101d1 (patch)
tree00fc7fe23123f0ed6fc20357dbb6229d2dd47496 /bagel
parent45ec9db8af6a7e4e148477730c3c842b0742544a (diff)
downloadspark-19122af78746e501207c5345bafc6b229e0101d1.tar.gz
spark-19122af78746e501207c5345bafc6b229e0101d1.tar.bz2
spark-19122af78746e501207c5345bafc6b229e0101d1.zip
Update ShortestPath to work with controllable partitioning
Diffstat (limited to 'bagel')
-rw-r--r--bagel/src/main/scala/bagel/ShortestPath.scala14
1 files changed, 5 insertions, 9 deletions
diff --git a/bagel/src/main/scala/bagel/ShortestPath.scala b/bagel/src/main/scala/bagel/ShortestPath.scala
index 2af4dc5867..38f533728d 100644
--- a/bagel/src/main/scala/bagel/ShortestPath.scala
+++ b/bagel/src/main/scala/bagel/ShortestPath.scala
@@ -5,7 +5,6 @@ import spark.SparkContext._
import scala.math.min
-/*
object ShortestPath {
def main(args: Array[String]) {
if (args.length < 4) {
@@ -26,7 +25,7 @@ object ShortestPath {
.filter(!_.matches("^\\s*#.*"))
.map(line => line.split("\t")))
- val vertices: RDD[(String, Either[SPVertex, SPMessage])] =
+ val vertices: RDD[(String, SPVertex)] =
(lines.groupBy(line => line(0))
.map {
case (vertexId, lines) => {
@@ -35,18 +34,16 @@ object ShortestPath {
new SPEdge(targetId, edgeValue.toInt)
}
- (vertexId, Left[SPVertex, SPMessage](new SPVertex(vertexId, Int.MaxValue, outEdges, true)))
+ (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true))
}
})
- val messages: RDD[(String, Either[SPVertex, SPMessage])] =
+ val messages: RDD[(String, SPMessage)] =
(lines.filter(_.length == 2)
.map {
case Array(vertexId, messageValue) =>
- (vertexId, Right[SPVertex, SPMessage](new SPMessage(vertexId, messageValue.toInt)))
+ (vertexId, new SPMessage(vertexId, messageValue.toInt))
})
-
- val graph: RDD[(String, Either[SPVertex, SPMessage])] = vertices ++ messages
System.err.println("Read "+vertices.count()+" vertices and "+
messages.count()+" messages.")
@@ -55,7 +52,7 @@ object ShortestPath {
def messageCombiner(minSoFar: Int, message: SPMessage): Int =
min(minSoFar, message.value)
- val result = Pregel.run(sc, graph, numSplits, messageCombiner, () => Int.MaxValue, min _) {
+ val result = Pregel.run(sc, vertices, messages, numSplits, messageCombiner, () => Int.MaxValue, min _) {
(self: SPVertex, messageMinValue: Int, superstep: Int) =>
val newValue = min(self.value, messageMinValue)
@@ -83,4 +80,3 @@ object ShortestPath {
@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
@serializable class SPMessage(val targetId: String, val value: Int) extends Message
-*/