aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorXiangrui Meng <meng@databricks.com>2014-08-15 21:07:55 -0700
committerXiangrui Meng <meng@databricks.com>2014-08-15 21:07:55 -0700
commit2e069ca6560bf7ab07bd019f9530b42f4fe45014 (patch)
tree0f72496809af7d78489ab7ba21f392ab556127e9 /mllib
parent5d25c0b74f6397d78164b96afb8b8cbb1b15cfbd (diff)
downloadspark-2e069ca6560bf7ab07bd019f9530b42f4fe45014.tar.gz
spark-2e069ca6560bf7ab07bd019f9530b42f4fe45014.tar.bz2
spark-2e069ca6560bf7ab07bd019f9530b42f4fe45014.zip
[SPARK-3001][MLLIB] Improve Spearman's correlation
The current implementation requires sorting individual columns, which could be done with a global sort. result on a 32-node cluster: m | n | prev | this ---|---|-------|----- 1000000 | 50 | 55s | 9s 10000000 | 50 | 97s | 76s 1000000 | 100 | 119s | 15s Author: Xiangrui Meng <meng@databricks.com> Closes #1917 from mengxr/spearman and squashes the following commits: 4d5d262 [Xiangrui Meng] remove unused import 85c48de [Xiangrui Meng] minor updates a048d0c [Xiangrui Meng] remove cache and set a limit to cachedIds b98bb18 [Xiangrui Meng] add comments 0846e07 [Xiangrui Meng] first version
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala120
1 files changed, 42 insertions, 78 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
index 9bd0c2cd05..4a6c677f06 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/SpearmanCorrelation.scala
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.stat.correlation
import scala.collection.mutable.ArrayBuffer
-import org.apache.spark.{Logging, HashPartitioner}
+import org.apache.spark.Logging
import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.linalg.{DenseVector, Matrix, Vector}
-import org.apache.spark.rdd.{CoGroupedRDD, RDD}
+import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
+import org.apache.spark.rdd.RDD
/**
* Compute Spearman's correlation for two RDDs of the type RDD[Double] or the correlation matrix
@@ -43,87 +43,51 @@ private[stat] object SpearmanCorrelation extends Correlation with Logging {
/**
* Compute Spearman's correlation matrix S, for the input matrix, where S(i, j) is the
* correlation between column i and j.
- *
- * Input RDD[Vector] should be cached or checkpointed if possible since it would be split into
- * numCol RDD[Double]s, each of which sorted, and the joined back into a single RDD[Vector].
*/
override def computeCorrelationMatrix(X: RDD[Vector]): Matrix = {
- val indexed = X.zipWithUniqueId()
-
- val numCols = X.first.size
- if (numCols > 50) {
- logWarning("Computing the Spearman correlation matrix can be slow for large RDDs with more"
- + " than 50 columns.")
- }
- val ranks = new Array[RDD[(Long, Double)]](numCols)
-
- // Note: we use a for loop here instead of a while loop with a single index variable
- // to avoid race condition caused by closure serialization
- for (k <- 0 until numCols) {
- val column = indexed.map { case (vector, index) => (vector(k), index) }
- ranks(k) = getRanks(column)
+ // ((columnIndex, value), rowUid)
+ val colBased = X.zipWithUniqueId().flatMap { case (vec, uid) =>
+ vec.toArray.view.zipWithIndex.map { case (v, j) =>
+ ((j, v), uid)
+ }
}
-
- val ranksMat: RDD[Vector] = makeRankMatrix(ranks, X)
- PearsonCorrelation.computeCorrelationMatrix(ranksMat)
- }
-
- /**
- * Compute the ranks for elements in the input RDD, using the average method for ties.
- *
- * With the average method, elements with the same value receive the same rank that's computed
- * by taking the average of their positions in the sorted list.
- * e.g. ranks([2, 1, 0, 2]) = [2.5, 1.0, 0.0, 2.5]
- * Note that positions here are 0-indexed, instead of the 1-indexed as in the definition for
- * ranks in the standard definition for Spearman's correlation. This does not affect the final
- * results and is slightly more performant.
- *
- * @param indexed RDD[(Double, Long)] containing pairs of the format (originalValue, uniqueId)
- * @return RDD[(Long, Double)] containing pairs of the format (uniqueId, rank), where uniqueId is
- * copied from the input RDD.
- */
- private def getRanks(indexed: RDD[(Double, Long)]): RDD[(Long, Double)] = {
- // Get elements' positions in the sorted list for computing average rank for duplicate values
- val sorted = indexed.sortByKey().zipWithIndex()
-
- val ranks: RDD[(Long, Double)] = sorted.mapPartitions { iter =>
- // add an extra element to signify the end of the list so that flatMap can flush the last
- // batch of duplicates
- val end = -1L
- val padded = iter ++ Iterator[((Double, Long), Long)](((Double.NaN, end), end))
- val firstEntry = padded.next()
- var lastVal = firstEntry._1._1
- var firstRank = firstEntry._2.toDouble
- val idBuffer = ArrayBuffer(firstEntry._1._2)
- padded.flatMap { case ((v, id), rank) =>
- if (v == lastVal && id != end) {
- idBuffer += id
- Iterator.empty
- } else {
- val entries = if (idBuffer.size == 1) {
- Iterator((idBuffer(0), firstRank))
- } else {
- val averageRank = firstRank + (idBuffer.size - 1.0) / 2.0
- idBuffer.map(id => (id, averageRank))
- }
- lastVal = v
- firstRank = rank
- idBuffer.clear()
- idBuffer += id
- entries
+ // global sort by (columnIndex, value)
+ val sorted = colBased.sortByKey()
+ // assign global ranks (using average ranks for tied values)
+ val globalRanks = sorted.zipWithIndex().mapPartitions { iter =>
+ var preCol = -1
+ var preVal = Double.NaN
+ var startRank = -1.0
+ var cachedUids = ArrayBuffer.empty[Long]
+ val flush: () => Iterable[(Long, (Int, Double))] = () => {
+ val averageRank = startRank + (cachedUids.size - 1) / 2.0
+ val output = cachedUids.map { uid =>
+ (uid, (preCol, averageRank))
}
+ cachedUids.clear()
+ output
}
+ iter.flatMap { case (((j, v), uid), rank) =>
+ // If we see a new value or cachedUids is too big, we flush ids with their average rank.
+ if (j != preCol || v != preVal || cachedUids.size >= 10000000) {
+ val output = flush()
+ preCol = j
+ preVal = v
+ startRank = rank
+ cachedUids += uid
+ output
+ } else {
+ cachedUids += uid
+ Iterator.empty
+ }
+ } ++ flush()
}
- ranks
- }
-
- private def makeRankMatrix(ranks: Array[RDD[(Long, Double)]], input: RDD[Vector]): RDD[Vector] = {
- val partitioner = new HashPartitioner(input.partitions.size)
- val cogrouped = new CoGroupedRDD[Long](ranks, partitioner)
- cogrouped.map {
- case (_, values: Array[Iterable[_]]) =>
- val doubles = values.asInstanceOf[Array[Iterable[Double]]]
- new DenseVector(doubles.flatten.toArray)
+ // Replace values in the input matrix by their ranks compared with values in the same column.
+ // Note that shifting all ranks in a column by a constant value doesn't affect result.
+ val groupedRanks = globalRanks.groupByKey().map { case (uid, iter) =>
+ // sort by column index and then convert values to a vector
+ Vectors.dense(iter.toSeq.sortBy(_._1).map(_._2).toArray)
}
+ PearsonCorrelation.computeCorrelationMatrix(groupedRanks)
}
}