aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala171
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala38
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala10
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala37
-rw-r--r--project/MimaExcludes.scala9
5 files changed, 259 insertions, 6 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 4174f45d23..8380058cf9 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -19,17 +19,21 @@ package org.apache.spark.mllib.linalg.distributed
import java.util.Arrays
-import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
-import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
+import scala.collection.mutable.ListBuffer
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV, axpy => brzAxpy,
+ svd => brzSvd}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
+import org.apache.spark.Logging
+import org.apache.spark.SparkContext._
import org.apache.spark.annotation.Experimental
import org.apache.spark.mllib.linalg._
-import org.apache.spark.rdd.RDD
-import org.apache.spark.Logging
import org.apache.spark.mllib.rdd.RDDFunctions._
import org.apache.spark.mllib.stat.{MultivariateOnlineSummarizer, MultivariateStatisticalSummary}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.random.XORShiftRandom
import org.apache.spark.storage.StorageLevel
/**
@@ -411,6 +415,165 @@ class RowMatrix(
new RowMatrix(AB, nRows, B.numCols)
}
+ /**
+ * Compute all cosine similarities between columns of this matrix using the brute-force
+ * approach of computing normalized dot products.
+ *
+ * @return An n x n sparse upper-triangular matrix of cosine similarities between
+ * columns of this matrix.
+ */
+ def columnSimilarities(): CoordinateMatrix = {
+ columnSimilarities(0.0)
+ }
+
+ /**
+ * Compute similarities between columns of this matrix using a sampling approach.
+ *
+ * The threshold parameter is a trade-off knob between estimate quality and computational cost.
+ *
+ * Setting a threshold of 0 guarantees deterministic correct results, but comes at exactly
+ * the same cost as the brute-force approach. Setting the threshold to positive values
+ * incurs strictly less computational cost than the brute-force approach, however the
+ * similarities computed will be estimates.
+ *
+ * The sampling guarantees relative-error correctness for those pairs of columns that have
+ * similarity greater than the given similarity threshold.
+ *
+ * To describe the guarantee, we set some notation:
+ * Let A be the smallest in magnitude non-zero element of this matrix.
+ * Let B be the largest in magnitude non-zero element of this matrix.
+ * Let L be the maximum number of non-zeros per row.
+ *
+ * For example, for {0,1} matrices: A=B=1.
+ * Another example, for the Netflix matrix: A=1, B=5
+ *
+ * For those column pairs that are above the threshold,
+ * the computed similarity is correct to within 20% relative error with probability
+ * at least 1 - (0.981)^10/B^
+ *
+ * The shuffle size is bounded by the *smaller* of the following two expressions:
+ *
+ * O(n log(n) L / (threshold * A))
+ * O(m L^2^)
+ *
+ * The latter is the cost of the brute-force approach, so for non-zero thresholds,
+ * the cost is always cheaper than the brute-force approach.
+ *
+ * @param threshold Set to 0 for deterministic guaranteed correctness.
+ * Similarities above this threshold are estimated
+ * with the cost vs estimate quality trade-off described above.
+ * @return An n x n sparse upper-triangular matrix of cosine similarities
+ * between columns of this matrix.
+ */
+ def columnSimilarities(threshold: Double): CoordinateMatrix = {
+ require(threshold >= 0, s"Threshold cannot be negative: $threshold")
+
+ if (threshold > 1) {
+ logWarning(s"Threshold is greater than 1: $threshold " +
+ "Computation will be more efficient with promoted sparsity, " +
+ " however there is no correctness guarantee.")
+ }
+
+ val gamma = if (threshold < 1e-6) {
+ Double.PositiveInfinity
+ } else {
+ 10 * math.log(numCols()) / threshold
+ }
+
+ columnSimilaritiesDIMSUM(computeColumnSummaryStatistics().normL2.toArray, gamma)
+ }
+
+ /**
+ * Find all similar columns using the DIMSUM sampling algorithm, described in two papers
+ *
+ * http://arxiv.org/abs/1206.2082
+ * http://arxiv.org/abs/1304.1467
+ *
+ * @param colMags A vector of column magnitudes
+ * @param gamma The oversampling parameter. For provable results, set to 10 * log(n) / s,
+ * where s is the smallest similarity score to be estimated,
+ * and n is the number of columns
+ * @return An n x n sparse upper-triangular matrix of cosine similarities
+ * between columns of this matrix.
+ */
+ private[mllib] def columnSimilaritiesDIMSUM(
+ colMags: Array[Double],
+ gamma: Double): CoordinateMatrix = {
+ require(gamma > 1.0, s"Oversampling should be greater than 1: $gamma")
+ require(colMags.size == this.numCols(), "Number of magnitudes didn't match column dimension")
+ val sg = math.sqrt(gamma) // sqrt(gamma) used many times
+
+ // Don't divide by zero for those columns with zero magnitude
+ val colMagsCorrected = colMags.map(x => if (x == 0) 1.0 else x)
+
+ val sc = rows.context
+ val pBV = sc.broadcast(colMagsCorrected.map(c => sg / c))
+ val qBV = sc.broadcast(colMagsCorrected.map(c => math.min(sg, c)))
+
+ val sims = rows.mapPartitionsWithIndex { (indx, iter) =>
+ val p = pBV.value
+ val q = qBV.value
+
+ val rand = new XORShiftRandom(indx)
+ val scaled = new Array[Double](p.size)
+ iter.flatMap { row =>
+ val buf = new ListBuffer[((Int, Int), Double)]()
+ row match {
+ case sv: SparseVector =>
+ val nnz = sv.indices.size
+ var k = 0
+ while (k < nnz) {
+ scaled(k) = sv.values(k) / q(sv.indices(k))
+ k += 1
+ }
+ k = 0
+ while (k < nnz) {
+ val i = sv.indices(k)
+ val iVal = scaled(k)
+ if (iVal != 0 && rand.nextDouble() < p(i)) {
+ var l = k + 1
+ while (l < nnz) {
+ val j = sv.indices(l)
+ val jVal = scaled(l)
+ if (jVal != 0 && rand.nextDouble() < p(j)) {
+ buf += (((i, j), iVal * jVal))
+ }
+ l += 1
+ }
+ }
+ k += 1
+ }
+ case dv: DenseVector =>
+ val n = dv.values.size
+ var i = 0
+ while (i < n) {
+ scaled(i) = dv.values(i) / q(i)
+ i += 1
+ }
+ i = 0
+ while (i < n) {
+ val iVal = scaled(i)
+ if (iVal != 0 && rand.nextDouble() < p(i)) {
+ var j = i + 1
+ while (j < n) {
+ val jVal = scaled(j)
+ if (jVal != 0 && rand.nextDouble() < p(j)) {
+ buf += (((i, j), iVal * jVal))
+ }
+ j += 1
+ }
+ }
+ i += 1
+ }
+ }
+ buf
+ }
+ }.reduceByKey(_ + _).map { case ((i, j), sim) =>
+ MatrixEntry(i.toLong, j.toLong, sim)
+ }
+ new CoordinateMatrix(sims, numCols(), numCols())
+ }
+
private[mllib] override def toBreeze(): BDM[Double] = {
val m = numRows().toInt
val n = numCols().toInt
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
index 7d845c4436..3025d4837c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateOnlineSummarizer.scala
@@ -42,6 +42,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
private var n = 0
private var currMean: BDV[Double] = _
private var currM2n: BDV[Double] = _
+ private var currM2: BDV[Double] = _
+ private var currL1: BDV[Double] = _
private var totalCnt: Long = 0
private var nnz: BDV[Double] = _
private var currMax: BDV[Double] = _
@@ -60,6 +62,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currMean = BDV.zeros[Double](n)
currM2n = BDV.zeros[Double](n)
+ currM2 = BDV.zeros[Double](n)
+ currL1 = BDV.zeros[Double](n)
nnz = BDV.zeros[Double](n)
currMax = BDV.fill(n)(Double.MinValue)
currMin = BDV.fill(n)(Double.MaxValue)
@@ -81,6 +85,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
val tmpPrevMean = currMean(i)
currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
+ currM2(i) += value * value
+ currL1(i) += math.abs(value)
nnz(i) += 1.0
}
@@ -97,7 +103,7 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
* @return This MultivariateOnlineSummarizer object.
*/
def merge(other: MultivariateOnlineSummarizer): this.type = {
- if (this.totalCnt != 0 && other.totalCnt != 0) {
+ if (this.totalCnt != 0 && other.totalCnt != 0) {
require(n == other.n, s"Dimensions mismatch when merging with another summarizer. " +
s"Expecting $n but got ${other.n}.")
totalCnt += other.totalCnt
@@ -114,6 +120,15 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
(nnz(i) + other.nnz(i))
}
+ // merge m2 together
+ if (nnz(i) + other.nnz(i) != 0.0) {
+ currM2(i) += other.currM2(i)
+ }
+ // merge l1 together
+ if (nnz(i) + other.nnz(i) != 0.0) {
+ currL1(i) += other.currL1(i)
+ }
+
if (currMax(i) < other.currMax(i)) {
currMax(i) = other.currMax(i)
}
@@ -127,6 +142,8 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
this.n = other.n
this.currMean = other.currMean.copy
this.currM2n = other.currM2n.copy
+ this.currM2 = other.currM2.copy
+ this.currL1 = other.currL1.copy
this.totalCnt = other.totalCnt
this.nnz = other.nnz.copy
this.currMax = other.currMax.copy
@@ -198,4 +215,23 @@ class MultivariateOnlineSummarizer extends MultivariateStatisticalSummary with S
}
Vectors.fromBreeze(currMin)
}
+
+ override def normL2: Vector = {
+ require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+
+ val realMagnitude = BDV.zeros[Double](n)
+
+ var i = 0
+ while (i < currM2.size) {
+ realMagnitude(i) = math.sqrt(currM2(i))
+ i += 1
+ }
+
+ Vectors.fromBreeze(realMagnitude)
+ }
+
+ override def normL1: Vector = {
+ require(totalCnt > 0, s"Nothing has been added to this summarizer.")
+ Vectors.fromBreeze(currL1)
+ }
}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
index f9eb343da2..6a364c9328 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/MultivariateStatisticalSummary.scala
@@ -53,4 +53,14 @@ trait MultivariateStatisticalSummary {
* Minimum value of each column.
*/
def min: Vector
+
+ /**
+ * Euclidean magnitude of each column
+ */
+ def normL2: Vector
+
+ /**
+ * L1 norm of each column
+ */
+ def normL1: Vector
}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index 1d3a322136..63f3ed58c0 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -95,6 +95,40 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
}
}
+ test("similar columns") {
+ val colMags = Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94))
+ val expected = BDM(
+ (0.0, 54.0, 72.0),
+ (0.0, 0.0, 78.0),
+ (0.0, 0.0, 0.0))
+
+ for (i <- 0 until n; j <- 0 until n) {
+ expected(i, j) /= (colMags(i) * colMags(j))
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilarities(0.11).toBreeze()
+ for (i <- 0 until n; j <- 0 until n) {
+ if (expected(i, j) > 0) {
+ val actual = expected(i, j)
+ val estimate = G(i, j)
+ assert(math.abs(actual - estimate) / actual < 0.2,
+ s"Similarities not close enough: $actual vs $estimate")
+ }
+ }
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilarities()
+ assert(closeToZero(G.toBreeze() - expected))
+ }
+
+ for (mat <- Seq(denseMat, sparseMat)) {
+ val G = mat.columnSimilaritiesDIMSUM(colMags.toArray, 150.0)
+ assert(closeToZero(G.toBreeze() - expected))
+ }
+ }
+
test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
@@ -190,6 +224,9 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
assert(summary.numNonzeros === Vectors.dense(3.0, 3.0, 4.0), "nnz mismatch")
assert(summary.max === Vectors.dense(9.0, 7.0, 8.0), "max mismatch")
assert(summary.min === Vectors.dense(0.0, 0.0, 1.0), "column mismatch.")
+ assert(summary.normL2 === Vectors.dense(Math.sqrt(126), Math.sqrt(66), Math.sqrt(94)),
+ "magnitude mismatch.")
+ assert(summary.normL1 === Vectors.dense(18.0, 12.0, 16.0), "L1 norm mismatch")
}
}
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 3280e662fa..1adfaa18c6 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -39,7 +39,14 @@ object MimaExcludes {
MimaBuild.excludeSparkPackage("graphx")
) ++
MimaBuild.excludeSparkClass("mllib.linalg.Matrix") ++
- MimaBuild.excludeSparkClass("mllib.linalg.Vector")
+ MimaBuild.excludeSparkClass("mllib.linalg.Vector") ++
+ Seq(
+ // Added normL1 and normL2 to trait MultivariateStatisticalSummary
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL1"),
+ ProblemFilters.exclude[MissingMethodProblem](
+ "org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2")
+ )
case v if v.startsWith("1.1") =>
Seq(