diff options
Diffstat (limited to 'mllib/src')
4 files changed, 26 insertions, 27 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala index 19cc942aba..6a85608706 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/BLAS.scala @@ -237,7 +237,7 @@ private[spark] object BLAS extends Serializable with Logging { } /** - * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR. + * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's ?SPR. * * @param U the upper triangular part of the matrix in a [[DenseVector]](column major) */ @@ -246,7 +246,7 @@ private[spark] object BLAS extends Serializable with Logging { } /** - * Adds alpha * x * x.t to a matrix in-place. This is the same as BLAS's ?SPR. + * Adds alpha * v * v.t to a matrix in-place. This is the same as BLAS's ?SPR. * * @param U the upper triangular part of the matrix packed in an array (column major) */ @@ -267,7 +267,6 @@ private[spark] object BLAS extends Serializable with Logging { col = indices(j) // Skip empty columns. colStartIdx += (col - prevCol) * (col + prevCol + 1) / 2 - col = indices(j) av = alpha * values(j) i = 0 while (i <= j) { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index f6183a5eaa..4b8ed301eb 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -115,10 +115,10 @@ class RowMatrix @Since("1.0.0") ( checkNumColumns(n) // Computes n*(n+1)/2, avoiding overflow in the multiplication. // This succeeds when n <= 65535, which is checked above - val nt: Int = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2)) + val nt = if (n % 2 == 0) ((n / 2) * (n + 1)) else (n * ((n + 1) / 2)) // Compute the upper triangular part of the gram matrix. - val GU = rows.treeAggregate(new BDV[Double](new Array[Double](nt)))( + val GU = rows.treeAggregate(new BDV[Double](nt))( seqOp = (U, v) => { BLAS.spr(1.0, v, U.data) U @@ -328,25 +328,17 @@ class RowMatrix @Since("1.0.0") ( val n = numCols().toInt checkNumColumns(n) - val (m, mean) = rows.treeAggregate[(Long, BDV[Double])]((0L, BDV.zeros[Double](n)))( - seqOp = (s: (Long, BDV[Double]), v: Vector) => (s._1 + 1L, s._2 += v.toBreeze), - combOp = (s1: (Long, BDV[Double]), s2: (Long, BDV[Double])) => - (s1._1 + s2._1, s1._2 += s2._2) - ) - - if (m <= 1) { - sys.error(s"RowMatrix.computeCovariance called on matrix with only $m rows." + - " Cannot compute the covariance of a RowMatrix with <= 1 row.") - } - updateNumRows(m) - - mean :/= m.toDouble + val summary = computeColumnSummaryStatistics() + val m = summary.count + require(m > 1, s"RowMatrix.computeCovariance called on matrix with only $m rows." + + " Cannot compute the covariance of a RowMatrix with <= 1 row.") + val mean = summary.mean // We use the formula Cov(X, Y) = E[X * Y] - E[X] E[Y], which is not accurate if E[X * Y] is // large but Cov(X, Y) is small, but it is good for sparse computation. // TODO: find a fast and stable way for sparse data. - val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] + val G = computeGramianMatrix().toBreeze var i = 0 var j = 0 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala index f131f6948a..515be0b817 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/correlation/PearsonCorrelation.scala @@ -52,7 +52,7 @@ private[stat] object PearsonCorrelation extends Correlation with Logging { /** * Compute the Pearson correlation matrix from the covariance matrix. - * 0 covariance results in a correlation value of Double.NaN. + * 0 variance results in a correlation value of Double.NaN. */ def computeCorrelationMatrixFromCovariance(covarianceMatrix: Matrix): Matrix = { val cov = covarianceMatrix.toBreeze.asInstanceOf[BDM[Double]] diff --git a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala index eaa819c2e6..700f803490 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/stat/CorrelationSuite.scala @@ -22,6 +22,7 @@ import breeze.linalg.{DenseMatrix => BDM, Matrix => BM} import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.mllib.linalg.Vectors +import org.apache.spark.mllib.random.RandomRDDs import org.apache.spark.mllib.stat.correlation.{Correlations, PearsonCorrelation, SpearmanCorrelation} import org.apache.spark.mllib.util.MLlibTestSparkContext @@ -42,10 +43,10 @@ class CorrelationSuite extends SparkFunSuite with MLlibTestSparkContext with Log test("corr(x, y) pearson, 1 value in data") { val x = sc.parallelize(Array(1.0)) val y = sc.parallelize(Array(4.0)) - intercept[RuntimeException] { + intercept[IllegalArgumentException] { Statistics.corr(x, y, "pearson") } - intercept[RuntimeException] { + intercept[IllegalArgumentException] { Statistics.corr(x, y, "spearman") } } @@ -127,15 +128,22 @@ class CorrelationSuite extends SparkFunSuite with MLlibTestSparkContext with Log assert(Correlations.getCorrelationFromName("pearson") === pearson) assert(Correlations.getCorrelationFromName("spearman") === spearman) - // Should throw IllegalArgumentException - try { + intercept[IllegalArgumentException] { Correlations.getCorrelationFromName("kendall") - assert(false) - } catch { - case ie: IllegalArgumentException => } } + ignore("Pearson correlation of very large uncorrelated values (SPARK-14533)") { + // The two RDDs should have 0 correlation because they're random; + // this should stay the same after shifting them by any amount + // In practice a large shift produces very large values which can reveal + // round-off problems + val a = RandomRDDs.normalRDD(sc, 100000, 10).map(_ + 1000000000.0) + val b = RandomRDDs.normalRDD(sc, 100000, 10).map(_ + 1000000000.0) + val p = Statistics.corr(a, b, method = "pearson") + assert(approxEqual(p, 0.0, 0.01)) + } + def approxEqual(v1: Double, v2: Double, threshold: Double = 1e-6): Boolean = { if (v1.isNaN) { v2.isNaN |