aboutsummaryrefslogtreecommitdiff
path: root/graphx/src/main/scala
diff options
context:
space:
mode:
authorAnkur Dave <ankurdave@gmail.com>2014-06-02 00:00:24 -0700
committerReynold Xin <rxin@apache.org>2014-06-02 00:00:24 -0700
commit9535f4045daf46b084761d7f15f63dc6c2a543dd (patch)
treef35f237fe99dacb2c25082a27ba2e02aa7a5f3fa /graphx/src/main/scala
parentd17d221487fa7a3af6f4af2217f1d4889ceb084d (diff)
downloadspark-9535f4045daf46b084761d7f15f63dc6c2a543dd.tar.gz
spark-9535f4045daf46b084761d7f15f63dc6c2a543dd.tar.bz2
spark-9535f4045daf46b084761d7f15f63dc6c2a543dd.zip
Add landmark-based Shortest Path algorithm to graphx.lib
This is a modified version of apache/spark#10. Author: Ankur Dave <ankurdave@gmail.com> Author: Andres Perez <andres@tresata.com> Closes #933 from ankurdave/shortestpaths and squashes the following commits: 03a103c [Ankur Dave] Style fixes 7a1ff48 [Ankur Dave] Improve ShortestPaths documentation d75c8fc [Ankur Dave] Remove unnecessary VD type param, and pass through ED d983fb4 [Ankur Dave] Fix style errors 60ed8e6 [Andres Perez] Add Shortest-path computations to graphx.lib with unit tests.
Diffstat (limited to 'graphx/src/main/scala')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala71
1 files changed, 71 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
new file mode 100644
index 0000000000..bba070f256
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import org.apache.spark.graphx._
+import scala.reflect.ClassTag
+
+/**
+ * Computes shortest paths to the given set of landmark vertices, returning a graph where each
+ * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
+ */
+object ShortestPaths {
+ /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
+ type SPMap = Map[VertexId, Int]
+
+ private def makeMap(x: (VertexId, Int)*) = Map(x: _*)
+
+ private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v -> (d + 1) }
+
+ private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap =
+ (spmap1.keySet ++ spmap2.keySet).map {
+ k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), spmap2.getOrElse(k, Int.MaxValue))
+ }.toMap
+
+ /**
+ * Computes shortest paths to the given set of landmark vertices.
+ *
+ * @tparam ED the edge attribute type (not used in the computation)
+ *
+ * @param graph the graph for which to compute the shortest paths
+ * @param landmarks the list of landmark vertex ids. Shortest paths will be computed to each
+ * landmark.
+ *
+ * @return a graph where each vertex attribute is a map containing the shortest-path distance to
+ * each reachable landmark vertex.
+ */
+ def run[ED: ClassTag](graph: Graph[_, ED], landmarks: Seq[VertexId]): Graph[SPMap, ED] = {
+ val spGraph = graph.mapVertices { (vid, attr) =>
+ if (landmarks.contains(vid)) makeMap(vid -> 0) else makeMap()
+ }
+
+ val initialMessage = makeMap()
+
+ def vertexProgram(id: VertexId, attr: SPMap, msg: SPMap): SPMap = {
+ addMaps(attr, msg)
+ }
+
+ def sendMessage(edge: EdgeTriplet[SPMap, _]): Iterator[(VertexId, SPMap)] = {
+ val newAttr = incrementMap(edge.srcAttr)
+ if (edge.dstAttr != addMaps(newAttr, edge.dstAttr)) Iterator((edge.dstId, newAttr))
+ else Iterator.empty
+ }
+
+ Pregel(spGraph, initialMessage)(vertexProgram, sendMessage, addMaps)
+ }
+}