aboutsummaryrefslogtreecommitdiff
path: root/graphx
diff options
context:
space:
mode:
Diffstat (limited to 'graphx')
-rw-r--r--graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala66
-rw-r--r--graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala45
2 files changed, 111 insertions, 0 deletions
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
new file mode 100644
index 0000000000..776bfb8dd6
--- /dev/null
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import scala.reflect.ClassTag
+import org.apache.spark.graphx._
+
+/** Label Propagation algorithm. */
+object LabelPropagation {
+ /**
+ * Run static Label Propagation for detecting communities in networks.
+ *
+ * Each node in the network is initially assigned to its own community. At every superstep, nodes
+ * send their community affiliation to all neighbors and update their state to the mode community
+ * affiliation of incoming messages.
+ *
+ * LPA is a standard community detection algorithm for graphs. It is very inexpensive
+ * computationally, although (1) convergence is not guaranteed and (2) one can end up with
+ * trivial solutions (all nodes are identified into a single community).
+ *
+ * @tparam ED the edge attribute type (not used in the computation)
+ *
+ * @param graph the graph for which to compute the community affiliation
+ * @param maxSteps the number of supersteps of LPA to be performed. Because this is a static
+ * implementation, the algorithm will run for exactly this many supersteps.
+ *
+ * @return a graph with vertex attributes containing the label of community affiliation
+ */
+ def run[ED: ClassTag](graph: Graph[_, ED], maxSteps: Int): Graph[VertexId, ED] = {
+ val lpaGraph = graph.mapVertices { case (vid, _) => vid }
+ def sendMessage(e: EdgeTriplet[VertexId, ED]) = {
+ Iterator((e.srcId, Map(e.dstAttr -> 1L)), (e.dstId, Map(e.srcAttr -> 1L)))
+ }
+ def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
+ : Map[VertexId, Long] = {
+ (count1.keySet ++ count2.keySet).map { i =>
+ val count1Val = count1.getOrElse(i, 0L)
+ val count2Val = count2.getOrElse(i, 0L)
+ i -> (count1Val + count2Val)
+ }.toMap
+ }
+ def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, Long]) = {
+ if (message.isEmpty) attr else message.maxBy(_._2)._1
+ }
+ val initialMessage = Map[VertexId, Long]()
+ Pregel(lpaGraph, initialMessage, maxIterations = maxSteps)(
+ vprog = vertexProgram,
+ sendMsg = sendMessage,
+ mergeMsg = mergeMessage)
+ }
+}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala
new file mode 100644
index 0000000000..61fd0c4605
--- /dev/null
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/LabelPropagationSuite.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.graphx.lib
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.graphx._
+
+class LabelPropagationSuite extends FunSuite with LocalSparkContext {
+ test("Label Propagation") {
+ withSpark { sc =>
+ // Construct a graph with two cliques connected by a single edge
+ val n = 5
+ val clique1 = for (u <- 0L until n; v <- 0L until n) yield Edge(u, v, 1)
+ val clique2 = for (u <- 0L to n; v <- 0L to n) yield Edge(u + n, v + n, 1)
+ val twoCliques = sc.parallelize(clique1 ++ clique2 :+ Edge(0L, n, 1))
+ val graph = Graph.fromEdges(twoCliques, 1)
+ // Run label propagation
+ val labels = LabelPropagation.run(graph, n * 4).cache()
+
+ // All vertices within a clique should have the same label
+ val clique1Labels = labels.vertices.filter(_._1 < n).map(_._2).collect.toArray
+ assert(clique1Labels.forall(_ == clique1Labels(0)))
+ val clique2Labels = labels.vertices.filter(_._1 >= n).map(_._2).collect.toArray
+ assert(clique2Labels.forall(_ == clique2Labels(0)))
+ // The two cliques should have different labels
+ assert(clique1Labels(0) != clique2Labels(0))
+ }
+ }
+}