diff options
author | Xiangrui Meng <meng@databricks.com> | 2015-08-30 23:20:03 -0700 |
---|---|---|
committer | Xiangrui Meng <meng@databricks.com> | 2015-08-30 23:20:03 -0700 |
commit | f0f563a3c43fc9683e6920890cce44611c0c5f4b (patch) | |
tree | 5182439c10dabd3d46f0d0c30cbf41be63e21311 | |
parent | 8694c3ad7dcafca9563649e93b7a08076748d6f2 (diff) | |
download | spark-f0f563a3c43fc9683e6920890cce44611c0c5f4b.tar.gz spark-f0f563a3c43fc9683e6920890cce44611c0c5f4b.tar.bz2 spark-f0f563a3c43fc9683e6920890cce44611c0c5f4b.zip |
[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton
* do not cache first cost RDD
* change following cost RDD cache level to MEMORY_AND_DISK
* remove Vector wrapper to save a object per instance
Further improvements will be addressed in SPARK-10329
cc: yu-iskw HuJiayin
Author: Xiangrui Meng <meng@databricks.com>
Closes #8526 from mengxr/SPARK-10354.
-rw-r--r-- | mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala | 21 |
1 files changed, 14 insertions, 7 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 46920fffe6..7168aac32c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -369,7 +369,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) - var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() + var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -394,21 +394,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } + }.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs - .aggregate(Vectors.zeros(runs))( + .aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v - axpy(1.0, v, s) + var r = 0 + while (r < runs) { + s(r) += v(r) + r += 1 + } s }, combOp = (s0, s1) => { // s0 += s1 - axpy(1.0, s1, s0) + var r = 0 + while (r < runs) { + s0(r) += s1(r) + r += 1 + } s0 } ) |