aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2016-02-13 15:56:20 -0800
committerXiangrui Meng <meng@databricks.com>2016-02-13 15:56:20 -0800
commite3441e3f68923224d5b576e6112917cf1fe1f89a (patch)
treea3b3df33c944d7e89ddefbde4208e4a8689661c5 /mllib
parent374c4b2869fc50570a68819cf0ece9b43ddeb34b (diff)
downloadspark-e3441e3f68923224d5b576e6112917cf1fe1f89a.tar.gz
spark-e3441e3f68923224d5b576e6112917cf1fe1f89a.tar.bz2
spark-e3441e3f68923224d5b576e6112917cf1fe1f89a.zip
[SPARK-12363][MLLIB] Remove setRun and fix PowerIterationClustering failed test
JIRA: https://issues.apache.org/jira/browse/SPARK-12363 This issue is pointed by yanboliang. When `setRuns` is removed from PowerIterationClustering, one of the tests will be failed. I found that some `dstAttr`s of the normalized graph are not correct values but 0.0. By setting `TripletFields.All` in `mapTriplets` it can work. Author: Liang-Chi Hsieh <viirya@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #10539 from viirya/fix-poweriter.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala24
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala79
2 files changed, 56 insertions, 47 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
index 1ab7cb393b..feacafec79 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/PowerIterationClustering.scala
@@ -25,7 +25,6 @@ import org.apache.spark.{Logging, SparkContext, SparkException}
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.graphx._
-import org.apache.spark.graphx.impl.GraphImpl
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.{Loader, MLUtils, Saveable}
import org.apache.spark.rdd.RDD
@@ -264,10 +263,12 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
- GraphImpl.fromExistingRDDs(vD, graph.edges)
+ Graph(vD, graph.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
- TripletFields.Src)
+ new TripletFields(/* useSrc */ true,
+ /* useDst */ false,
+ /* useEdge */ true))
}
/**
@@ -293,10 +294,12 @@ object PowerIterationClustering extends Logging {
},
mergeMsg = _ + _,
TripletFields.EdgeOnly)
- GraphImpl.fromExistingRDDs(vD, gA.edges)
+ Graph(vD, gA.edges)
.mapTriplets(
e => e.attr / math.max(e.srcAttr, MLUtils.EPSILON),
- TripletFields.Src)
+ new TripletFields(/* useSrc */ true,
+ /* useDst */ false,
+ /* useEdge */ true))
}
/**
@@ -317,7 +320,7 @@ object PowerIterationClustering extends Logging {
}, preservesPartitioning = true).cache()
val sum = r.values.map(math.abs).sum()
val v0 = r.mapValues(x => x / sum)
- GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+ Graph(VertexRDD(v0), g.edges)
}
/**
@@ -332,7 +335,7 @@ object PowerIterationClustering extends Logging {
def initDegreeVector(g: Graph[Double, Double]): Graph[Double, Double] = {
val sum = g.vertices.values.sum()
val v0 = g.vertices.mapValues(_ / sum)
- GraphImpl.fromExistingRDDs(VertexRDD(v0), g.edges)
+ Graph(VertexRDD(v0), g.edges)
}
/**
@@ -357,7 +360,9 @@ object PowerIterationClustering extends Logging {
val v = curG.aggregateMessages[Double](
sendMsg = ctx => ctx.sendToSrc(ctx.attr * ctx.dstAttr),
mergeMsg = _ + _,
- TripletFields.Dst).cache()
+ new TripletFields(/* useSrc */ false,
+ /* useDst */ true,
+ /* useEdge */ true)).cache()
// normalize v
val norm = v.values.map(math.abs).sum()
logInfo(s"$msgPrefix: norm(v) = $norm.")
@@ -370,7 +375,7 @@ object PowerIterationClustering extends Logging {
diffDelta = math.abs(delta - prevDelta)
logInfo(s"$msgPrefix: diff(delta) = $diffDelta.")
// update v
- curG = GraphImpl.fromExistingRDDs(VertexRDD(v1), g.edges)
+ curG = Graph(VertexRDD(v1), g.edges)
prevDelta = delta
}
curG.vertices
@@ -387,7 +392,6 @@ object PowerIterationClustering extends Logging {
val points = v.mapValues(x => Vectors.dense(x)).cache()
val model = new KMeans()
.setK(k)
- .setRuns(5)
.setSeed(0L)
.run(points.values)
points.mapValues(p => model.predict(p)).cache()
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
index 1890005121..3d81d375c7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/PowerIterationClusteringSuite.scala
@@ -30,62 +30,65 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
import org.apache.spark.mllib.clustering.PowerIterationClustering._
+ /** Generates a circle of points. */
+ private def genCircle(r: Double, n: Int): Array[(Double, Double)] = {
+ Array.tabulate(n) { i =>
+ val theta = 2.0 * math.Pi * i / n
+ (r * math.cos(theta), r * math.sin(theta))
+ }
+ }
+
+ /** Computes Gaussian similarity. */
+ private def sim(x: (Double, Double), y: (Double, Double)): Double = {
+ val dist2 = (x._1 - y._1) * (x._1 - y._1) + (x._2 - y._2) * (x._2 - y._2)
+ math.exp(-dist2 / 2.0)
+ }
+
test("power iteration clustering") {
- /*
- We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
- edge (3, 4).
-
- 15-14 -13 -12
- | |
- 4 . 3 - 2 11
- | | x | |
- 5 0 - 1 10
- | |
- 6 - 7 - 8 - 9
- */
+ // Generate two circles following the example in the PIC paper.
+ val r1 = 1.0
+ val n1 = 10
+ val r2 = 4.0
+ val n2 = 40
+ val n = n1 + n2
+ val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+ val similarities = for (i <- 1 until n; j <- 0 until i) yield {
+ (i.toLong, j.toLong, sim(points(i), points(j)))
+ }
- val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
- (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
- (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
- (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
val model = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(40)
.run(sc.parallelize(similarities, 2))
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
- assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
val model2 = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
- assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}
test("power iteration clustering on graph") {
- /*
- We use the following graph to test PIC. All edges are assigned similarity 1.0 except 0.1 for
- edge (3, 4).
-
- 15-14 -13 -12
- | |
- 4 . 3 - 2 11
- | | x | |
- 5 0 - 1 10
- | |
- 6 - 7 - 8 - 9
- */
-
- val similarities = Seq[(Long, Long, Double)]((0, 1, 1.0), (0, 2, 1.0), (0, 3, 1.0), (1, 2, 1.0),
- (1, 3, 1.0), (2, 3, 1.0), (3, 4, 0.1), // (3, 4) is a weak edge
- (4, 5, 1.0), (4, 15, 1.0), (5, 6, 1.0), (6, 7, 1.0), (7, 8, 1.0), (8, 9, 1.0), (9, 10, 1.0),
- (10, 11, 1.0), (11, 12, 1.0), (12, 13, 1.0), (13, 14, 1.0), (14, 15, 1.0))
+ // Generate two circles following the example in the PIC paper.
+ val r1 = 1.0
+ val n1 = 10
+ val r2 = 4.0
+ val n2 = 40
+ val n = n1 + n2
+ val points = genCircle(r1, n1) ++ genCircle(r2, n2)
+ val similarities = for (i <- 1 until n; j <- 0 until i) yield {
+ (i.toLong, j.toLong, sim(points(i), points(j)))
+ }
val edges = similarities.flatMap { case (i, j, s) =>
if (i != j) {
@@ -98,22 +101,24 @@ class PowerIterationClusteringSuite extends SparkFunSuite with MLlibTestSparkCon
val model = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(40)
.run(graph)
val predictions = Array.fill(2)(mutable.Set.empty[Long])
model.assignments.collect().foreach { a =>
predictions(a.cluster) += a.id
}
- assert(predictions.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ assert(predictions.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
val model2 = new PowerIterationClustering()
.setK(2)
+ .setMaxIterations(10)
.setInitializationMode("degree")
.run(sc.parallelize(similarities, 2))
val predictions2 = Array.fill(2)(mutable.Set.empty[Long])
model2.assignments.collect().foreach { a =>
predictions2(a.cluster) += a.id
}
- assert(predictions2.toSet == Set((0 to 3).toSet, (4 to 15).toSet))
+ assert(predictions2.toSet == Set((0 until n1).toSet, (n1 until n).toSet))
}
test("normalize and powerIter") {