aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorLiang-Chi Hsieh <viirya@gmail.com>2015-01-22 08:16:35 -0800
committerXiangrui Meng <meng@databricks.com>2015-01-22 08:16:35 -0800
commit246111d179a2f3f6b97a5c2b121d8ddbfd1c9aad (patch)
tree824c4d141c4428d3d50ed4825d5fd98eb3c5b3af /mllib
parent3027f06b4127ab23a43c5ce8cebf721e3b6766e5 (diff)
downloadspark-246111d179a2f3f6b97a5c2b121d8ddbfd1c9aad.tar.gz
spark-246111d179a2f3f6b97a5c2b121d8ddbfd1c9aad.tar.bz2
spark-246111d179a2f3f6b97a5c2b121d8ddbfd1c9aad.zip
[SPARK-5365][MLlib] Refactor KMeans to reduce redundant data
If a point is selected as new centers for many runs, it would collect many redundant data. This pr refactors it. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #4159 from viirya/small_refactor_kmeans and squashes the following commits: 25487e6 [Liang-Chi Hsieh] Refactor codes to reduce redundant data.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala9
1 files changed, 5 insertions, 4 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index fc46da3a93..11633e8242 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -328,14 +328,15 @@ class KMeans private (
val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) =>
val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
pointsWithCosts.flatMap { case (p, c) =>
- (0 until runs).filter { r =>
+ val rs = (0 until runs).filter { r =>
rand.nextDouble() < 2.0 * c(r) * k / sumCosts(r)
- }.map((_, p))
+ }
+ if (rs.length > 0) Some(p, rs) else None
}
}.collect()
mergeNewCenters()
- chosen.foreach { case (r, p) =>
- newCenters(r) += p.toDense
+ chosen.foreach { case (p, rs) =>
+ rs.foreach(newCenters(_) += p.toDense)
}
step += 1
}