aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorReza Zadeh <rizlar@gmail.com>2014-09-29 11:15:09 -0700
committerXiangrui Meng <meng@databricks.com>2014-09-29 11:15:09 -0700
commit587a0cd7ed964ebfca2c97924c4f1e363f1fd3cb (patch)
treefa168a72a18a0174aea2ac8664bea714a901772d /mllib
parentaedd251c54fd130fe6e2f28d7587d39136e7ad1c (diff)
downloadspark-587a0cd7ed964ebfca2c97924c4f1e363f1fd3cb.tar.gz
spark-587a0cd7ed964ebfca2c97924c4f1e363f1fd3cb.tar.bz2
spark-587a0cd7ed964ebfca2c97924c4f1e363f1fd3cb.zip
[MLlib] [SPARK-2885] DIMSUM: All-pairs similarity
# All-pairs similarity via DIMSUM Compute all pairs of similar vectors using brute force approach, and also DIMSUM sampling approach. Laying down some notation: we are looking for all pairs of similar columns in an m x n RowMatrix whose entries are denoted a_ij, with the i’th row denoted r_i and the j’th column denoted c_j. There is an oversampling parameter labeled ɣ that should be set to 4 log(n)/s to get provably correct results (with high probability), where s is the similarity threshold. The algorithm is stated with a Map and Reduce, with proofs of correctness and efficiency in published papers [1] [2]. The reducer is simply the summation reducer. The mapper is more interesting, and is also the heart of the scheme. As an exercise, you should try to see why in expectation, the map-reduce below outputs cosine similarities. ![dimsumv2](https://cloud.githubusercontent.com/assets/3220351/3807272/d1d9514e-1c62-11e4-9f12-3cfdb1d78b3a.png) [1] Bosagh-Zadeh, Reza and Carlsson, Gunnar (2013), Dimension Independent Matrix Square using MapReduce, arXiv:1304.1467 http://arxiv.org/abs/1304.1467 [2] Bosagh-Zadeh, Reza and Goel, Ashish (2012), Dimension Independent Similarity Computation, arXiv:1206.2082 http://arxiv.org/abs/1206.2082 # Testing Tests for all invocations included. Added L1 and L2 norm computation to MultivariateStatisticalSummary since it was needed. Added tests for both of them. Author: Reza Zadeh <rizlar@gmail.com> Author: Xiangrui Meng <meng@databricks.com> Closes #1778 from rezazadeh/dimsumv2 and squashes the following commits: 404c64c [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2 4eb71c6 [Reza Zadeh] Add excludes for normL1 and normL2 ee8bd65 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2 976ddd4 [Reza Zadeh] Broadcast colMags. Avoid div by zero. 3467cff [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2 aea0247 [Reza Zadeh] Allow large thresholds to promote sparsity 9fe17c0 [Xiangrui Meng] organize imports 2196ba5 [Xiangrui Meng] Merge branch 'rezazadeh-dimsumv2' into dimsumv2 254ca08 [Reza Zadeh] Merge remote-tracking branch 'upstream/master' into dimsumv2 f2947e4 [Xiangrui Meng] some optimization 3c4cf41 [Xiangrui Meng] Merge branch 'master' into rezazadeh-dimsumv2 0e4eda4 [Reza Zadeh] Use partition index for RNG 251bb9c [Reza Zadeh] Documentation 25e9d0d [Reza Zadeh] Line length for style fb296f6 [Reza Zadeh] renamed to normL1 and normL2 3764983 [Reza Zadeh] Documentation e9c6791 [Reza Zadeh] New interface and documentation 613f261 [Reza Zadeh] Column magnitude summary 75a0b51 [Reza Zadeh] Use Ints instead of Longs in the shuffle 0f12ade [Reza Zadeh] Style changes eb1dc20 [Reza Zadeh] Use Double.PositiveInfinity instead of Double.Max f56a882 [Reza Zadeh] Remove changes to MultivariateOnlineSummarizer dbc55ba [Reza Zadeh] Make colMagnitudes a method in RowMatrix 41e8ece [Reza Zadeh] style changes 139c8e1 [Reza Zadeh] Syntax changes 029aa9c [Reza Zadeh] javadoc and new test 75edb25 [Reza Zadeh] All tests passing! 05e59b8 [Reza Zadeh] Add test 502ce52 [Reza Zadeh] new interface 654c4fb [Reza Zadeh] default methods 3726ca9 [Reza Zadeh] Remove MatrixAlgebra 6bebabb [Reza Zadeh] remove changes to MatrixSuite 5b8cd7d [Reza Zadeh] Initial files
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala171
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala38
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala10
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala37
4 files changed, 251 insertions, 5 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 4174f45d23..8380058cf9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -19,17 +19,21 @@ package org.apache.spark.mllib.linalg.distributed
import java.util.Arrays
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
-import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
+import scala.collection.mutable.ListBuffer
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy,
+ svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.storage.StorageLevel
/**
@@ -411,6 +415,165 @@ class RowMatrix(
new RowMatrix(AB, nRows, B.numCols)
}
+ /**
+ * Compute all cosine similarities between columns of this matrix using the brute-force
+ * approach of computing normalized dot products.
+ *
+ * @return An n x n sparse upper-triangular matrix of cosine similarities between
+ * columns of this matrix.
+ */
+ def columnSimilarities(): CoordinateMatrix = {
+ columnSimilarities(0.0)
+ }
+
+ /**
+ * Compute similarities between columns of this matrix using a sampling approach.
+ *
+ * The threshold parameter is a trade-off knob between estimate quality and computational cost.
+ *
+ * Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly
+ * the same cost as the brute-force approach. Setting the threshold to positive values
+ * incurs strictly less computational cost than the brute-force approach, however the
+ * similarities computed will be estimates.
+ *
+ * The sampling guarantees relative-error correctness for those pairs of columns that have
+ * similarity greater than the given similarity threshold.
+ *
+ * To describe the guarantee, we set some notation:
+ * Let A be the smallest in magnitude non-zero element of this matrix.
+ * Let B be the largest in magnitude non-zero element of this matrix.
+ * Let L be the maximum number of non-zeros per row.
+ *
+ * For example, for {0,1} matrices: A=B=1.
+ * Another example, for the Netflix matrix: A=1, B=5
+ *
+ * For those column pairs that are above the threshold,
+ * the computed similarity is correct to within 20% relative error with probability
+ * at least 1 - (0.981)^10/B^
+ *
+ * The shuffle size is bounded by the *smaller* of the following two expressions:
+ *
+ * O(n log(n) L / (threshold * A))
+ * O(m L^2^)
+ *
+ * The latter is the cost of the brute-force approach, so for non-zero thresholds,
+ * the cost is always cheaper than the brute-force approach.
+ *
+ * @param threshold Set to 0 for deterministic guaranteed correctness.
+ * Similarities above this threshold are estimated
+ * with the cost vs estimate quality trade-off described above.
+ * @return An n x n sparse upper-triangular matrix of cosine similarities
+ * between columns of this matrix.
+ */
+ def columnSimilarities(threshold: Double): CoordinateMatrix = {
+ require(threshold >= 0, s"Threshold cannot be negative: $threshold")
+
+ if (threshold > 1) {
+ logWarning(s"Threshold is greater than 1: $threshold " +
+ "Computation will be more efficient with promoted sparsity, " +
+ " however there is no correctness guarantee.")
+ }
+
+ val gamma = if (threshold < 1e-6) {
+ Double.PositiveInfinity
+ } else {
+ 10 * math.log(numCols()) / threshold
+ }
+
+ columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
+ }
+
+ /**
+ * Find all similar columns using the DIMSUM sampling algorithm, described in two papers
+ *
+ * http://arxiv.org/abs/1206.2082
+ * http://arxiv.org/abs/1304.1467
+ *
+ * @param colMags A vector of column magnitudes
+ * @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s,
+ * where s is the smallest similarity score to be estimated,
+ * and n is the number of columns
+ * @return An n x n sparse upper-triangular matrix of cosine similarities
+ * between columns of this matrix.
+ */
+ private[mllib] def columnSimilaritiesDIMSUM(
+ colMags: Array[Double],
+ gamma: Double): CoordinateMatrix = {
+ require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
+ require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")
+ val sg = math.sqrt(gamma) // sqrt(gamma) used many times
+
+ // Don't divide by zero for those columns with zero magnitude
+ val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)
+
+ val sc = rows.context
+ val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
+ val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))
+
+ val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
+ val p = pBV.value
+ val q = qBV.value
+
+ val rand = new XORShiftRandom(indx)
+ val scaled = new Array[Double](p.size)
+ iter.flatMap { row =>
+ val buf = new ListBuffer[((Int, Int), Double)]()
+ row match {
+ case sv: SparseVector =>
+ val nnz = sv.indices.size
+ var k = 0
+ while (k < nnz) {
+ scaled(k) = sv.values(k) / q(sv.indices(k))
+ k += 1
+ }
+ k = 0
+ while (k < nnz) {
+ val i = sv.indices(k)
+ val iVal = scaled(k)
+ if (iVal != 0 && rand.nextDouble() < p(i)) {
+ var l = k + 1
+ while (l < nnz) {
+ val j = sv.indices(l)
+ val jVal = scaled(l)
+ if (jVal != 0 && rand.nextDouble() < p(j)) {
+ buf += (((i, j), iVal * jVal))
+ }
+ l += 1
+ }
+ }
+ k += 1
+ }
+ case dv: DenseVector =>
+ val n = dv.values.size
+ var i = 0
+ while (i < n) {
+ scaled(i) = dv.values(i) / q(i)
+ i += 1
+ }
+ i = 0
+ while (i < n) {
+ val iVal = scaled(i)
+ if (iVal != 0 && rand.nextDouble() < p(i)) {
+ var j = i + 1
+ while (j < n) {
+ val jVal = scaled(j)
+ if (jVal != 0 && rand.nextDouble() < p(j)) {
+ buf += (((i, j), iVal * jVal))
+ }
+ j += 1
+ }
+ }
+ i += 1
+ }
+ }
+ buf
+ }
+ }.reduceByKey(_ + _).map { case ((i, j), sim) =>
+ MatrixEntry(i.toLong, j.toLong, sim)
+ }
+ new CoordinateMatrix(sims, numCols(), numCols())
+ }
+
private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 7d845c4436..3025d4837c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -42,6 +42,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
private var n = 0
private var currMean: BDV[Double] = _
private var currM2n: BDV[Double] = _
+ private var currM2: BDV[Double] = _
+ private var currL1: BDV[Double] = _
private var totalCnt: Long = 0
private var nnz: BDV[Double] = _
private var currMax: BDV[Double] = _
@@ -60,6 +62,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currMean = BDV.zeros[Double](n)
currM2n = BDV.zeros[Double](n)
+ currM2 = BDV.zeros[Double](n)
+ currL1 = BDV.zeros[Double](n)
nnz = BDV.zeros[Double](n)
currMax = BDV.fill(n)(Double.MinValue)
currMin = BDV.fill(n)(Double.MaxValue)
@@ -81,6 +85,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
val tmpPrevMean = currMean(i)
currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
+ currM2(i) += value * value
+ currL1(i) += math.abs(value)
nnz(i) += 1.0
}
@@ -97,7 +103,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
* @return This MultivariateOnlineSummarizer object.
*/
def merge(other: MultivariateOnlineSummarizer): this.type = {
- if (this.totalCnt != 0 && other.totalCnt != 0) {
+ if (this.totalCnt != 0 && other.totalCnt != 0) {
require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $n but got ${other.n}.")
totalCnt += other.totalCnt
@@ -114,6 +120,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
(nnz(i) + other.nnz(i))
}
+ // merge m2 together
+ if (nnz(i) + other.nnz(i) != 0.0) {
+ currM2(i) += other.currM2(i)
+ }
+ // merge l1 together
+ if (nnz(i) + other.nnz(i) != 0.0) {
+ currL1(i) += other.currL1(i)
+ }
+
if (currMax(i) < other.currMax(i)) {
currMax(i) = other.currMax(i)
}
@@ -127,6 +142,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
this.n = other.n
this.currMean = other.currMean.copy
this.currM2n = other.currM2n.copy
+ this.currM2 = other.currM2.copy
+ this.currL1 = other.currL1.copy
this.totalCnt = other.totalCnt
this.nnz = other.nnz.copy
this.currMax = other.currMax.copy
@@ -198,4 +215,23 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
}
Vectors.fromBreeze(currMin)
}
+
+ override def normL2: Vector = {
+ require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+
+ val realMagnitude = BDV.zeros[Double](n)
+
+ var i = 0
+ while (i < currM2.size) {
+ realMagnitude(i) = math.sqrt(currM2(i))
+ i += 1
+ }
+
+ Vectors.fromBreeze(realMagnitude)
+ }
+
+ override def normL1: Vector = {
+ require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+ Vectors.fromBreeze(currL1)
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
index f9eb343da2..6a364c9328 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
@@ -53,4 +53,14 @@ trait MultivariateStatisticalSummary {
* Minimum value of each column.
*/
def min: Vector
+
+ /**
+ * Euclidean magnitude of each column
+ */
+ def normL2: Vector
+
+ /**
+ * L1 norm of each column
+ */
+ def normL1: Vector
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 1d3a322136..63f3ed58c0 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -95,6 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
}
}
+ test("similar columns") {
+ val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94))
+ val expected = BDM(
+ (0.0, 54.0, 72.0),
+ (0.0, 0.0, 78.0),
+ (0.0, 0.0, 0.0))
+
+ for (i <- 0 until n; j <- 0 until n) {
+ expected(i, j) /= (colMags(i) * colMags(j))
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilarities(0.11).toBreeze()
+ for (i <- 0 until n; j <- 0 until n) {
+ if (expected(i, j) > 0) {
+ val actual = expected(i, j)
+ val estimate = G(i, j)
+ assert(math.abs(actual - estimate) / actual < 0.2,
+ s"Similarities not close enough: $actual vs $estimate")
+ }
+ }
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilarities()
+ assert(closeToZero(G.toBreeze() - expected))
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0)
+ assert(closeToZero(G.toBreeze() - expected))
+ }
+ }
+
test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
@@ -190,6 +224,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch")
assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch")
assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.")
+ assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)),
+ "magnitude mismatch.")
+ assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch")
}
}
}