aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@appier.com>2015-07-22 23:29:26 -0700
committerXiangrui Meng <meng@databricks.com>2015-07-22 23:29:26 -0700
commit825ab1e4526059a77e3278769797c4d065f48bd3 (patch)
treeb9eb71ffb22e2391a816d1f0615c7c8c244ca0f0
parent410dd41cf6618b93b6daa6147d17339deeaa49ae (diff)
downloadspark-825ab1e4526059a77e3278769797c4d065f48bd3.tar.gz
spark-825ab1e4526059a77e3278769797c4d065f48bd3.tar.bz2
spark-825ab1e4526059a77e3278769797c4d065f48bd3.zip
[SPARK-7254] [MLLIB] Run PowerIterationClustering directly on graph
JIRA: https://issues.apache.org/jira/browse/SPARK-7254 Author: Liang-Chi Hsieh <viirya@appier.com> Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #6054 from viirya/pic_on_graph and squashes the following commits: 8b87b81 [Liang-Chi Hsieh] Fix scala style. a22fb8b [Liang-Chi Hsieh] For comment. ef565a0 [Liang-Chi Hsieh] Fix indentation. d249aa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into pic_on_graph 82d7351 [Liang-Chi Hsieh] Run PowerIterationClustering directly on graph.
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala46
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala48
2 files changed, 94 insertions, 0 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index e7a243f854..407e43a024 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -154,6 +154,27 @@ class PowerIterationClustering private[clustering] (
}
/**
+ * Run the PIC algorithm on Graph.
+ *
+ * @param graph an affinity matrix represented as graph, which is the matrix A in the PIC paper.
+ * The similarity s,,ij,, represented as the edge between vertices (i, j) must
+ * be nonnegative. This is a symmetric matrix and hence s,,ij,, = s,,ji,,. For
+ * any (i, j) with nonzero similarity, there should be either (i, j, s,,ij,,)
+ * or (j, i, s,,ji,,) in the input. Tuples with i = j are ignored, because we
+ * assume s,,ij,, = 0.0.
+ *
+ * @return a [[PowerIterationClusteringModel]] that contains the clustering result
+ */
+ def run(graph: Graph[Double, Double]): PowerIterationClusteringModel = {
+ val w = normalize(graph)
+ val w0 = initMode match {
+ case "random" => randomInit(w)
+ case "degree" => initDegreeVector(w)
+ }
+ pic(w0)
+ }
+
+ /**
* Run the PIC algorithm.
*
* @param similarities an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix, which is
@@ -213,6 +234,31 @@ object PowerIterationClustering extends Logging {
case class Assignment(id: Long, cluster: Int)
/**
+ * Normalizes the affinity graph (A) and returns the normalized affinity matrix (W).
+ */
+ private[clustering]
+ def normalize(graph: Graph[Double, Double]): Graph[Double, Double] = {
+ val vD = graph.aggregateMessages[Double](
+ sendMsg = ctx => {
+ val i = ctx.srcId
+ val j = ctx.dstId
+ val s = ctx.attr
+ if (s < 0.0) {
+ throw new SparkException("Similarity must be nonnegative but found s($i, $j) = $s.")
+ }
+ if (s > 0.0) {
+ ctx.sendToSrc(s)
+ }
+ },
+ mergeMsg = _ + _,
+ TripletFields.EdgeOnly)
+ GraphImpl.fromExistingRDDs(vD, graph.edges)
+ .mapTriplets(
+ e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
+ TripletFields.Src)
+ }
+
+ /**
* Normalizes the affinity matrix (A) by row sums and returns the normalized affinity matrix (W).
*/
private[clustering]
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index 19e65f1b53..1890005121 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -68,6 +68,54 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
}
+ test("power iteration clustering on graph") {
+ /*
+ We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
+ edge (3, 4).
+
+ 15-14 -13 -12
+ | |
+ 4 . 3 - 2 11
+ | | x | |
+ 5 0 - 1 10
+ | |
+ 6 - 7 - 8 - 9
+ */
+
+ val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
+ (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
+ (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
+ (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
+
+ val edges = similarities.flatMap { case (i, j, s) =>
+ if (i != j) {
+ Seq(Edge(i, j, s), Edge(j, i, s))
+ } else {
+ None
+ }
+ }
+ val graph = Graph.fromEdges(sc.parallelize(edges, 2), 0.0)
+
+ val model = new PowerIterationClustering()
+ .setK(2)
+ .run(graph)
+ val predictions = Array.fill(2)(mutable.Set.empty[Long])
+ model.assignments.collect().foreach { a =>
+ predictions(a.cluster) += a.id
+ }
+ assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+
+ val model2 = new PowerIterationClustering()
+ .setK(2)
+ .setInitializationMode("degree")
+ .run(sc.parallelize(similarities, 2))
+ val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
+ model2.assignments.collect().foreach { a =>
+ predictions2(a.cluster) += a.id
+ }
+ assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ }
+
test("normalize and powerIter") {
/*
Test normalize() with the following graph: