aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorReza Zadeh <reza@databricks.com>2015-04-06 13:15:01 -0700
committerXiangrui Meng <meng@databricks.com>2015-04-06 13:15:01 -0700
commit30363ede8635f2548e444697dbcf60a795b61a84 (patch)
treeb3ee41a5b9dd3dcceec93c89f5db3897cab62d39
parent9fe41252198df71f4629843d363db8c83f36440c (diff)
downloadspark-30363ede8635f2548e444697dbcf60a795b61a84.tar.gz
spark-30363ede8635f2548e444697dbcf60a795b61a84.tar.bz2
spark-30363ede8635f2548e444697dbcf60a795b61a84.zip
[MLlib] [SPARK-6713] Iterators in columnSimilarities for mapPartitionsWithIndex
Use Iterators in columnSimilarities to allow mapPartitionsWithIndex to spill to disk. This could happen in a dense and large column - this way Spark can spill the pairs onto disk instead of building all the pairs before handing them to Spark. Another PR coming to update documentation. Author: Reza Zadeh <reza@databricks.com> Closes #5364 from rezazadeh/optmemsim and squashes the following commits: 47c90ba [Reza Zadeh] Iterators in columnSimilarities for flatMap
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala19
1 files changed, 9 insertions, 10 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 961111507f..9a89a6f3a5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -531,7 +531,6 @@ class RowMatrix(
val rand = new XORShiftRandom(indx)
val scaled = new Array[Double](p.size)
iter.flatMap { row =>
- val buf = new ListBuffer[((Int, Int), Double)]()
row match {
case SparseVector(size, indices, values) =>
val nnz = indices.size
@@ -540,8 +539,9 @@ class RowMatrix(
scaled(k) = values(k) / q(indices(k))
k += 1
}
- k = 0
- while (k < nnz) {
+
+ Iterator.tabulate (nnz) { k =>
+ val buf = new ListBuffer[((Int, Int), Double)]()
val i = indices(k)
val iVal = scaled(k)
if (iVal != 0 && rand.nextDouble() < p(i)) {
@@ -555,8 +555,8 @@ class RowMatrix(
l += 1
}
}
- k += 1
- }
+ buf
+ }.flatten
case DenseVector(values) =>
val n = values.size
var i = 0
@@ -564,8 +564,8 @@ class RowMatrix(
scaled(i) = values(i) / q(i)
i += 1
}
- i = 0
- while (i < n) {
+ Iterator.tabulate (n) { i =>
+ val buf = new ListBuffer[((Int, Int), Double)]()
val iVal = scaled(i)
if (iVal != 0 && rand.nextDouble() < p(i)) {
var j = i + 1
@@ -577,10 +577,9 @@ class RowMatrix(
j += 1
}
}
- i += 1
- }
+ buf
+ }.flatten
}
- buf
}
}.reduceByKey(_ + _).map { case ((i, j), sim) =>
MatrixEntry(i.toLong, j.toLong, sim)