aboutsummaryrefslogtreecommitdiff
path: root/bagel/src
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2011-10-09 16:18:33 -0700
committerAnkur Dave <ankurdave@gmail.com>2011-10-09 16:19:34 -0700
commit6d707f6b63e875f1b88210da2cf486f9d33f83c0 (patch)
tree906339d969573dd3e5bb559135734fd776aaf9d4 /bagel/src
parent0028caf3a4727623f70e23cd2f611f9797d0a3d3 (diff)
downloadspark-6d707f6b63e875f1b88210da2cf486f9d33f83c0.tar.gz
spark-6d707f6b63e875f1b88210da2cf486f9d33f83c0.tar.bz2
spark-6d707f6b63e875f1b88210da2cf486f9d33f83c0.zip
Remove ShortestPath for now
Diffstat (limited to 'bagel/src')
-rw-r--r--bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala95
1 files changed, 0 insertions, 95 deletions
diff --git a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
deleted file mode 100644
index 691fc55b78..0000000000
--- a/bagel/src/main/scala/spark/bagel/examples/ShortestPath.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-package spark.bagel.examples
-
-import spark._
-import spark.SparkContext._
-
-import scala.math.min
-
-import spark.bagel._
-import spark.bagel.Bagel._
-
-object ShortestPath {
- def main(args: Array[String]) {
- if (args.length < 4) {
- System.err.println("Usage: ShortestPath <graphFile> <startVertex> " +
- "<numSplits> <host>")
- System.exit(-1)
- }
-
- val graphFile = args(0)
- val startVertex = args(1)
- val numSplits = args(2).toInt
- val host = args(3)
- val sc = new SparkContext(host, "ShortestPath")
-
- // Parse the graph data from a file into two RDDs, vertices and messages
- val lines =
- (sc.textFile(graphFile)
- .filter(!_.matches("^\\s*#.*"))
- .map(line => line.split("\t")))
-
- val vertices: RDD[(String, SPVertex)] =
- (lines.groupBy(line => line(0))
- .map {
- case (vertexId, lines) => {
- val outEdges = lines.collect {
- case Array(_, targetId, edgeValue) =>
- new SPEdge(targetId, edgeValue.toInt)
- }
-
- (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true))
- }
- })
-
- val messages: RDD[(String, SPMessage)] =
- (lines.filter(_.length == 2)
- .map {
- case Array(vertexId, messageValue) =>
- (vertexId, new SPMessage(vertexId, messageValue.toInt))
- })
-
- System.err.println("Read "+vertices.count()+" vertices and "+
- messages.count()+" messages.")
-
- // Do the computation
- val compute = addAggregatorArg {
- (self: SPVertex, messageMinValue: Option[Int], superstep: Int) =>
- val newValue = messageMinValue match {
- case Some(minVal) => min(self.value, minVal)
- case None => self.value
- }
-
- val outbox =
- if (newValue != self.value)
- self.outEdges.map(edge =>
- new SPMessage(edge.targetId, newValue + edge.value))
- else
- List()
-
- (new SPVertex(self.id, newValue, self.outEdges, false), outbox)
- }
- val result = Bagel.run(sc, vertices, messages)(combiner = MinCombiner, numSplits = numSplits)(compute)
-
- // Print the result
- System.err.println("Shortest path from "+startVertex+" to all vertices:")
- val shortest = result.map(vertex =>
- "%s\t%s\n".format(vertex.id, vertex.value match {
- case x if x == Int.MaxValue => "inf"
- case x => x
- })).collect.mkString
- println(shortest)
- }
-}
-
-object MinCombiner extends Combiner[SPMessage, Int] with Serializable {
- def createCombiner(msg: SPMessage): Int =
- msg.value
- def mergeMsg(combiner: Int, msg: SPMessage): Int =
- min(combiner, msg.value)
- def mergeCombiners(a: Int, b: Int): Int =
- min(a, b)
-}
-
-class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable
-class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable
-class SPMessage(val targetId: String, val value: Int) extends Message with Serializable