From 9535f4045daf46b084761d7f15f63dc6c2a543dd Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 2 Jun 2014 00:00:24 -0700 Subject: Add landmark-based Shortest Path algorithm to graphx.lib This is a modified version of apache/spark#10. Author: Ankur Dave Author: Andres Perez Closes #933 from ankurdave/shortestpaths and squashes the following commits: 03a103c [Ankur Dave] Style fixes 7a1ff48 [Ankur Dave] Improve ShortestPaths documentation d75c8fc [Ankur Dave] Remove unnecessary VD type param, and pass through ED d983fb4 [Ankur Dave] Fix style errors 60ed8e6 [Andres Perez] Add Shortest-path computations to graphx.lib with unit tests. --- .../apache/spark/graphx/lib/ShortestPaths.scala | 71 ++++++++++++++++++++++ .../spark/graphx/lib/ShortestPathsSuite.scala | 49 +++++++++++++++ 2 files changed, 120 insertions(+) create mode 100644 graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala create mode 100644 graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala new file mode 100644 index 0000000000..bba070f256 --- /dev/null +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.apache.spark.graphx._ +import scala.reflect.ClassTag + +/** + * Computes shortest paths to the given set of landmark vertices, returning a graph where each + * vertex attribute is a map containing the shortest-path distance to each reachable landmark. + */ +object ShortestPaths { + /** Stores a map from the vertex id of a landmark to the distance to that landmark. */ + type SPMap = Map[VertexId, Int] + + private def makeMap(x: (VertexId, Int)*) = Map(x: _*) + + private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) } + + private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = + (spmap1.keySet ++ spmap2.keySet).map { + k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue)) + }.toMap + + /** + * Computes shortest paths to the given set of landmark vertices. + * + * @tparam ED the edge attribute type (not used in the computation) + * + * @param graph the graph for which to compute the shortest paths + * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each + * landmark. + * + * @return a graph where each vertex attribute is a map containing the shortest-path distance to + * each reachable landmark vertex. + */ + def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = { + val spGraph = graph.mapVertices { (vid, attr) => + if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap() + } + + val initialMessage = makeMap() + + def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = { + addMaps(attr, msg) + } + + def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = { + val newAttr = incrementMap(edge.srcAttr) + if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr)) + else Iterator.empty + } + + Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps) + } +} diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala new file mode 100644 index 0000000000..265827b334 --- /dev/null +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.graphx.lib + +import org.scalatest.FunSuite + +import org.apache.spark.SparkContext +import org.apache.spark.SparkContext._ +import org.apache.spark.graphx._ +import org.apache.spark.graphx.lib._ +import org.apache.spark.graphx.util.GraphGenerators +import org.apache.spark.rdd._ + +class ShortestPathsSuite extends FunSuite with LocalSparkContext { + + test("Shortest Path Computations") { + withSpark { sc => + val shortestPaths = Set( + (1, Map(1 -> 0, 4 -> 2)), (2, Map(1 -> 1, 4 -> 2)), (3, Map(1 -> 2, 4 -> 1)), + (4, Map(1 -> 2, 4 -> 0)), (5, Map(1 -> 1, 4 -> 1)), (6, Map(1 -> 3, 4 -> 1))) + val edgeSeq = Seq((1, 2), (1, 5), (2, 3), (2, 5), (3, 4), (4, 5), (4, 6)).flatMap { + case e => Seq(e, e.swap) + } + val edges = sc.parallelize(edgeSeq).map { case (v1, v2) => (v1.toLong, v2.toLong) } + val graph = Graph.fromEdgeTuples(edges, 1) + val landmarks = Seq(1, 4).map(_.toLong) + val results = ShortestPaths.run(graph, landmarks).vertices.collect.map { + case (v, spMap) => (v, spMap.mapValues(_.get)) + } + assert(results.toSet === shortestPaths) + } + } + +} -- cgit v1.2.3