aboutsummaryrefslogtreecommitdiff
path: root/mllib/src/main/scala
diff options
context:
space:
mode:
authorDLucky <mouendless@gmail.com>2016-05-18 12:05:21 +0100
committerSean Owen <sowen@cloudera.com>2016-05-18 12:05:21 +0100
commit420b700695fe8bcdda406c34ad48230b9dfc07f1 (patch)
treed4eb6bbbf7bae4c3f7a8f246967eaa280a80b502 /mllib/src/main/scala
parentc4a45fd855252f0f7130cdcc08486591fadab7df (diff)
downloadspark-420b700695fe8bcdda406c34ad48230b9dfc07f1.tar.gz
spark-420b700695fe8bcdda406c34ad48230b9dfc07f1.tar.bz2
spark-420b700695fe8bcdda406c34ad48230b9dfc07f1.zip
[SPARK-15346][MLLIB] Reduce duplicate computation in picking initial points
mateiz srowen I state that the contribution is my original work and that I license the work to the project under the project's open source license There's some format problems with my last PR, with HyukjinKwon 's help I read the guidance, re-check my code and PR, then run the tests, finally re-submit the PR request here. The related JIRA issue though marked as resolved, this change may relate to it I think. ## Proposed Change After picking each new initial centers, it's unnecessary to compute the distances between all the points and the old ones. Instead this change keeps the distance between all the points and their closest centers, and compare to the distance of them with the new center then update them. ## Test result One can find an easy test way in (https://issues.apache.org/jira/browse/SPARK-6706) I test the KMeans++ method for a small dataset with 16k points, and the whole KMeans|| with a large one with 240k points. The data has 4096 features and I tunes K from 100 to 500. The test environment was on my 4 machine cluster, I also tested a 3M points data on a larger cluster with 25 machines and got similar results, which I would not draw the detail curve. The result of the first two exps are shown below ### Local KMeans++ test: Dataset:4m_ini_center Data_size:16234 Dimension:4096 Lloyd's Iteration = 10 The y-axis is time in sec, the x-axis is tuning the K. ![image](https://cloud.githubusercontent.com/assets/10915169/15175831/d0c92b82-179a-11e6-8b68-4e165fc2fdff.png) ![local_total](https://cloud.githubusercontent.com/assets/10915169/15175957/6b21c3b0-179b-11e6-9741-66dfe4e23eb7.jpg) ### On a larger dataset An improve show in the graph but not commit in this file: In this experiment I also have an improvement for calculation in normalization data (the distance is convert to the cosine distance). As if the data is normalized into (0,1), one improvement in the original vesion for util.MLUtils.fastSauaredDistance would have no effect (the precisionBound 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON) will never less then precision in this case). Therefore I design an early terminal method when comparing two distance (used for findClosest). But I don't include this improve in this file, you may only refer to the curves without "normalize" for comparing the results. Dataset:4k24 Data_size:243960 Dimension:4096 Normlize Enlarge Initialize Lloyd's_Iteration NO 1 3 5 YES 10000 3 5 Notice: the normlized data is enlarged to ensure precision The cost time: x-for value of K, y-for time in sec ![4k24_total](https://cloud.githubusercontent.com/assets/10915169/15176635/9a54c0bc-179e-11e6-81c5-238e0c54bce2.jpg) SE for unnormalized data between two version, to ensure the correctness ![4k24_unnorm_se](https://cloud.githubusercontent.com/assets/10915169/15176661/b85dabc8-179e-11e6-9269-fe7d2101dd48.jpg) Here is the SE between normalized data just for reference, it's also correct. ![4k24_norm_se](https://cloud.githubusercontent.com/assets/10915169/15176742/1fbde940-179f-11e6-8290-d24b0dd4a4f7.jpg) Author: DLucky <mouendless@gmail.com> Closes #13133 from mouendless/patch-2.
Diffstat (limited to 'mllib/src/main/scala')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala16
1 files changed, 10 insertions, 6 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
index adf20dc4b8..53587670a5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
@@ -46,17 +46,15 @@ private[mllib] object LocalKMeans extends Logging {
// Initialize centers by sampling using the k-means++ procedure.
centers(0) = pickWeighted(rand, points, weights).toDense
+ val costArray = points.map(KMeans.fastSquaredDistance(_, centers(0)))
+
for (i <- 1 until k) {
- // Pick the next center with a probability proportional to cost under current centers
- val curCenters = centers.view.take(i)
- val sum = points.view.zip(weights).map { case (p, w) =>
- w * KMeans.pointCost(curCenters, p)
- }.sum
+ val sum = costArray.zip(weights).map(p => p._1 * p._2).sum
val r = rand.nextDouble() * sum
var cumulativeScore = 0.0
var j = 0
while (j < points.length && cumulativeScore < r) {
- cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
+ cumulativeScore += weights(j) * costArray(j)
j += 1
}
if (j == 0) {
@@ -66,6 +64,12 @@ private[mllib] object LocalKMeans extends Logging {
} else {
centers(i) = points(j - 1).toDense
}
+
+ // update costArray
+ for (p <- points.indices) {
+ costArray(p) = math.min(KMeans.fastSquaredDistance(points(p), centers(i)), costArray(p))
+ }
+
}
// Run up to maxIterations iterations of Lloyd's algorithm