From c1104058c6201d5a2b59dff1babcb71523f156fe Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Tue, 3 May 2011 18:53:58 -0700 Subject: Move shortest path and PageRank to bagel.examples --- bagel/src/main/scala/bagel/ShortestPath.scala | 94 ------------ bagel/src/main/scala/bagel/WikipediaPageRank.scala | 155 -------------------- .../main/scala/bagel/examples/ShortestPath.scala | 96 +++++++++++++ .../scala/bagel/examples/WikipediaPageRank.scala | 158 +++++++++++++++++++++ 4 files changed, 254 insertions(+), 249 deletions(-) delete mode 100644 bagel/src/main/scala/bagel/ShortestPath.scala delete mode 100644 bagel/src/main/scala/bagel/WikipediaPageRank.scala create mode 100644 bagel/src/main/scala/bagel/examples/ShortestPath.scala create mode 100644 bagel/src/main/scala/bagel/examples/WikipediaPageRank.scala (limited to 'bagel/src') diff --git a/bagel/src/main/scala/bagel/ShortestPath.scala b/bagel/src/main/scala/bagel/ShortestPath.scala deleted file mode 100644 index 8f4a881850..0000000000 --- a/bagel/src/main/scala/bagel/ShortestPath.scala +++ /dev/null @@ -1,94 +0,0 @@ -package bagel - -import spark._ -import spark.SparkContext._ - -import scala.math.min - -import bagel.Pregel._ - -object ShortestPath { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: ShortestPath " + - " ") - System.exit(-1) - } - - val graphFile = args(0) - val startVertex = args(1) - val numSplits = args(2).toInt - val host = args(3) - val sc = new SparkContext(host, "ShortestPath") - - // Parse the graph data from a file into two RDDs, vertices and messages - val lines = - (sc.textFile(graphFile) - .filter(!_.matches("^\\s*#.*")) - .map(line => line.split("\t"))) - - val vertices: RDD[(String, SPVertex)] = - (lines.groupBy(line => line(0)) - .map { - case (vertexId, lines) => { - val outEdges = lines.collect { - case Array(_, targetId, edgeValue) => - new SPEdge(targetId, edgeValue.toInt) - } - - (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true)) - } - }) - - val messages: RDD[(String, SPMessage)] = - (lines.filter(_.length == 2) - .map { - case Array(vertexId, messageValue) => - (vertexId, new SPMessage(vertexId, messageValue.toInt)) - }) - - System.err.println("Read "+vertices.count()+" vertices and "+ - messages.count()+" messages.") - - // Do the computation - val compute = addAggregatorArg { - (self: SPVertex, messageMinValue: Option[Int], superstep: Int) => - val newValue = messageMinValue match { - case Some(minVal) => min(self.value, minVal) - case None => self.value - } - - val outbox = - if (newValue != self.value) - self.outEdges.map(edge => - new SPMessage(edge.targetId, newValue + edge.value)) - else - List() - - (new SPVertex(self.id, newValue, self.outEdges, false), outbox) - } - val result = Pregel.run(sc, vertices, messages)(combiner = MinCombiner, numSplits = numSplits)(compute) - - // Print the result - System.err.println("Shortest path from "+startVertex+" to all vertices:") - val shortest = result.map(vertex => - "%s\t%s\n".format(vertex.id, vertex.value match { - case x if x == Int.MaxValue => "inf" - case x => x - })).collect.mkString - println(shortest) - } -} - -object MinCombiner extends Combiner[SPMessage, Int] { - def createCombiner(msg: SPMessage): Int = - msg.value - def mergeMsg(combiner: Int, msg: SPMessage): Int = - min(combiner, msg.value) - def mergeCombiners(a: Int, b: Int): Int = - min(a, b) -} - -@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex -@serializable class SPEdge(val targetId: String, val value: Int) extends Edge -@serializable class SPMessage(val targetId: String, val value: Int) extends Message diff --git a/bagel/src/main/scala/bagel/WikipediaPageRank.scala b/bagel/src/main/scala/bagel/WikipediaPageRank.scala deleted file mode 100644 index 2fe77b4962..0000000000 --- a/bagel/src/main/scala/bagel/WikipediaPageRank.scala +++ /dev/null @@ -1,155 +0,0 @@ -package bagel - -import spark._ -import spark.SparkContext._ - -import bagel.Pregel._ - -import scala.collection.mutable.ArrayBuffer -import scala.xml.{XML,NodeSeq} - -import java.io.{Externalizable,ObjectInput,ObjectOutput,DataOutputStream,DataInputStream} - -import com.esotericsoftware.kryo._ - -object WikipediaPageRank { - def main(args: Array[String]) { - if (args.length < 4) { - System.err.println("Usage: WikipediaPageRank []") - System.exit(-1) - } - - System.setProperty("spark.serialization", "spark.KryoSerialization") - System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) - - val inputFile = args(0) - val threshold = args(1).toDouble - val numSplits = args(2).toInt - val host = args(3) - val noCombiner = args.length > 4 && args(4).nonEmpty - val sc = new SparkContext(host, "WikipediaPageRank") - - // Parse the Wikipedia page data into a graph - val input = sc.textFile(inputFile) - - println("Counting vertices...") - val numVertices = input.count() - println("Done counting vertices.") - - println("Parsing input file...") - val vertices: RDD[(String, PRVertex)] = input.map(line => { - val fields = line.split("\t") - val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) - val links = - if (body == "\\N") - NodeSeq.Empty - else - try { - XML.loadString(body) \\ "link" \ "target" - } catch { - case e: org.xml.sax.SAXParseException => - System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) - NodeSeq.Empty - } - val outEdges = ArrayBuffer(links.map(link => new PREdge(new String(link.text))): _*) - val id = new String(title) - (id, new PRVertex(id, 1.0 / numVertices, outEdges, true)) - }).cache - println("Done parsing input file.") - - // Do the computation - val epsilon = 0.01 / numVertices - val messages = sc.parallelize(List[(String, PRMessage)]()) - val result = - if (noCombiner) { - Pregel.run(sc, vertices, messages)(numSplits = numSplits)(PRNoCombiner.compute(numVertices, epsilon)) - } else { - Pregel.run(sc, vertices, messages)(combiner = PRCombiner, numSplits = numSplits)(PRCombiner.compute(numVertices, epsilon)) - } - - // Print the result - System.err.println("Articles with PageRank >= "+threshold+":") - val top = result.filter(_.value >= threshold).map(vertex => - "%s\t%s\n".format(vertex.id, vertex.value)).collect.mkString - println(top) - } -} - -object PRCombiner extends Combiner[PRMessage, Double] { - def createCombiner(msg: PRMessage): Double = - msg.value - def mergeMsg(combiner: Double, msg: PRMessage): Double = - combiner + msg.value - def mergeCombiners(a: Double, b: Double): Double = - a + b - - def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messageSum: Option[Double], superstep: Int): (PRVertex, Iterable[PRMessage]) = { - val newValue = messageSum match { - case Some(msgSum) if msgSum != 0 => - 0.15 / numVertices + 0.85 * msgSum - case _ => self.value - } - - val terminate = (superstep >= 10 && (newValue - self.value).abs < epsilon) || superstep >= 30 - - val outbox = - if (!terminate) - self.outEdges.map(edge => - new PRMessage(edge.targetId, newValue / self.outEdges.size)) - else - ArrayBuffer[PRMessage]() - - (new PRVertex(self.id, newValue, self.outEdges, !terminate), outbox) - } -} - -object PRNoCombiner extends DefaultCombiner[PRMessage] { - def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) = - PRCombiner.compute(numVertices, epsilon)(self, messages match { - case Some(msgs) => Some(msgs.map(_.value).sum) - case None => None - }, superstep) -} - -@serializable class PRVertex() extends Vertex { - var id: String = _ - var value: Double = _ - var outEdges: ArrayBuffer[PREdge] = _ - var active: Boolean = true - - def this(id: String, value: Double, outEdges: ArrayBuffer[PREdge], active: Boolean) { - this() - this.id = id - this.value = value - this.outEdges = outEdges - this.active = active - } -} - -@serializable class PRMessage() extends Message { - var targetId: String = _ - var value: Double = _ - - def this(targetId: String, value: Double) { - this() - this.targetId = targetId - this.value = value - } -} - -@serializable class PREdge() extends Edge { - var targetId: String = _ - - def this(targetId: String) { - this() - this.targetId = targetId - } -} - -class PRKryoRegistrator extends KryoRegistrator { - def registerClasses(kryo: Kryo) { - kryo.register(classOf[PRVertex]) - kryo.register(classOf[PRMessage]) - kryo.register(classOf[PREdge]) - } -} diff --git a/bagel/src/main/scala/bagel/examples/ShortestPath.scala b/bagel/src/main/scala/bagel/examples/ShortestPath.scala new file mode 100644 index 0000000000..2e6100c070 --- /dev/null +++ b/bagel/src/main/scala/bagel/examples/ShortestPath.scala @@ -0,0 +1,96 @@ +package bagel.examples + +import spark._ +import spark.SparkContext._ + +import scala.math.min + +import bagel._ +import bagel.Pregel._ + +object ShortestPath { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: ShortestPath " + + " ") + System.exit(-1) + } + + val graphFile = args(0) + val startVertex = args(1) + val numSplits = args(2).toInt + val host = args(3) + val sc = new SparkContext(host, "ShortestPath") + + // Parse the graph data from a file into two RDDs, vertices and messages + val lines = + (sc.textFile(graphFile) + .filter(!_.matches("^\\s*#.*")) + .map(line => line.split("\t"))) + + val vertices: RDD[(String, SPVertex)] = + (lines.groupBy(line => line(0)) + .map { + case (vertexId, lines) => { + val outEdges = lines.collect { + case Array(_, targetId, edgeValue) => + new SPEdge(targetId, edgeValue.toInt) + } + + (vertexId, new SPVertex(vertexId, Int.MaxValue, outEdges, true)) + } + }) + + val messages: RDD[(String, SPMessage)] = + (lines.filter(_.length == 2) + .map { + case Array(vertexId, messageValue) => + (vertexId, new SPMessage(vertexId, messageValue.toInt)) + }) + + System.err.println("Read "+vertices.count()+" vertices and "+ + messages.count()+" messages.") + + // Do the computation + val compute = addAggregatorArg { + (self: SPVertex, messageMinValue: Option[Int], superstep: Int) => + val newValue = messageMinValue match { + case Some(minVal) => min(self.value, minVal) + case None => self.value + } + + val outbox = + if (newValue != self.value) + self.outEdges.map(edge => + new SPMessage(edge.targetId, newValue + edge.value)) + else + List() + + (new SPVertex(self.id, newValue, self.outEdges, false), outbox) + } + val result = Pregel.run(sc, vertices, messages)(combiner = MinCombiner, numSplits = numSplits)(compute) + + // Print the result + System.err.println("Shortest path from "+startVertex+" to all vertices:") + val shortest = result.map(vertex => + "%s\t%s\n".format(vertex.id, vertex.value match { + case x if x == Int.MaxValue => "inf" + case x => x + })).collect.mkString + println(shortest) + } +} + +@serializable +object MinCombiner extends Combiner[SPMessage, Int] { + def createCombiner(msg: SPMessage): Int = + msg.value + def mergeMsg(combiner: Int, msg: SPMessage): Int = + min(combiner, msg.value) + def mergeCombiners(a: Int, b: Int): Int = + min(a, b) +} + +@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex +@serializable class SPEdge(val targetId: String, val value: Int) extends Edge +@serializable class SPMessage(val targetId: String, val value: Int) extends Message diff --git a/bagel/src/main/scala/bagel/examples/WikipediaPageRank.scala b/bagel/src/main/scala/bagel/examples/WikipediaPageRank.scala new file mode 100644 index 0000000000..a5e0a9ffb6 --- /dev/null +++ b/bagel/src/main/scala/bagel/examples/WikipediaPageRank.scala @@ -0,0 +1,158 @@ +package bagel.examples + +import spark._ +import spark.SparkContext._ + +import bagel._ +import bagel.Pregel._ + +import scala.collection.mutable.ArrayBuffer +import scala.xml.{XML,NodeSeq} + +import java.io.{Externalizable,ObjectInput,ObjectOutput,DataOutputStream,DataInputStream} + +import com.esotericsoftware.kryo._ + +object WikipediaPageRank { + def main(args: Array[String]) { + if (args.length < 4) { + System.err.println("Usage: WikipediaPageRank []") + System.exit(-1) + } + + System.setProperty("spark.serialization", "spark.KryoSerialization") + System.setProperty("spark.kryo.registrator", classOf[PRKryoRegistrator].getName) + + val inputFile = args(0) + val threshold = args(1).toDouble + val numSplits = args(2).toInt + val host = args(3) + val noCombiner = args.length > 4 && args(4).nonEmpty + val sc = new SparkContext(host, "WikipediaPageRank") + + // Parse the Wikipedia page data into a graph + val input = sc.textFile(inputFile) + + println("Counting vertices...") + val numVertices = input.count() + println("Done counting vertices.") + + println("Parsing input file...") + val vertices: RDD[(String, PRVertex)] = input.map(line => { + val fields = line.split("\t") + val (title, body) = (fields(1), fields(3).replace("\\n", "\n")) + val links = + if (body == "\\N") + NodeSeq.Empty + else + try { + XML.loadString(body) \\ "link" \ "target" + } catch { + case e: org.xml.sax.SAXParseException => + System.err.println("Article \""+title+"\" has malformed XML in body:\n"+body) + NodeSeq.Empty + } + val outEdges = ArrayBuffer(links.map(link => new PREdge(new String(link.text))): _*) + val id = new String(title) + (id, new PRVertex(id, 1.0 / numVertices, outEdges, true)) + }).cache + println("Done parsing input file.") + + // Do the computation + val epsilon = 0.01 / numVertices + val messages = sc.parallelize(List[(String, PRMessage)]()) + val result = + if (noCombiner) { + Pregel.run(sc, vertices, messages)(numSplits = numSplits)(PRNoCombiner.compute(numVertices, epsilon)) + } else { + Pregel.run(sc, vertices, messages)(combiner = PRCombiner, numSplits = numSplits)(PRCombiner.compute(numVertices, epsilon)) + } + + // Print the result + System.err.println("Articles with PageRank >= "+threshold+":") + val top = result.filter(_.value >= threshold).map(vertex => + "%s\t%s\n".format(vertex.id, vertex.value)).collect.mkString + println(top) + } +} + +@serializable +object PRCombiner extends Combiner[PRMessage, Double] { + def createCombiner(msg: PRMessage): Double = + msg.value + def mergeMsg(combiner: Double, msg: PRMessage): Double = + combiner + msg.value + def mergeCombiners(a: Double, b: Double): Double = + a + b + + def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messageSum: Option[Double], superstep: Int): (PRVertex, Iterable[PRMessage]) = { + val newValue = messageSum match { + case Some(msgSum) if msgSum != 0 => + 0.15 / numVertices + 0.85 * msgSum + case _ => self.value + } + + val terminate = (superstep >= 10 && (newValue - self.value).abs < epsilon) || superstep >= 30 + + val outbox = + if (!terminate) + self.outEdges.map(edge => + new PRMessage(edge.targetId, newValue / self.outEdges.size)) + else + ArrayBuffer[PRMessage]() + + (new PRVertex(self.id, newValue, self.outEdges, !terminate), outbox) + } +} + +@serializable +object PRNoCombiner extends DefaultCombiner[PRMessage] { + def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) = + PRCombiner.compute(numVertices, epsilon)(self, messages match { + case Some(msgs) => Some(msgs.map(_.value).sum) + case None => None + }, superstep) +} + +@serializable class PRVertex() extends Vertex { + var id: String = _ + var value: Double = _ + var outEdges: ArrayBuffer[PREdge] = _ + var active: Boolean = true + + def this(id: String, value: Double, outEdges: ArrayBuffer[PREdge], active: Boolean) { + this() + this.id = id + this.value = value + this.outEdges = outEdges + this.active = active + } +} + +@serializable class PRMessage() extends Message { + var targetId: String = _ + var value: Double = _ + + def this(targetId: String, value: Double) { + this() + this.targetId = targetId + this.value = value + } +} + +@serializable class PREdge() extends Edge { + var targetId: String = _ + + def this(targetId: String) { + this() + this.targetId = targetId + } +} + +class PRKryoRegistrator extends KryoRegistrator { + def registerClasses(kryo: Kryo) { + kryo.register(classOf[PRVertex]) + kryo.register(classOf[PRMessage]) + kryo.register(classOf[PREdge]) + } +} -- cgit v1.2.3