aboutsummaryrefslogtreecommitdiff
path: root/mllib
diff options
context:
space:
mode:
authorTor Myklebust <tmyklebu@gmail.com>2014-06-02 11:48:09 -0700
committerXiangrui Meng <meng@databricks.com>2014-06-02 11:48:09 -0700
commit9a5d482e090eaaea8491d3864667e0f513e7195c (patch)
treee82102ddb1bb6bfcd659330338172a59cb6f327e /mllib
parent9535f4045daf46b084761d7f15f63dc6c2a543dd (diff)
downloadspark-9a5d482e090eaaea8491d3864667e0f513e7195c.tar.gz
spark-9a5d482e090eaaea8491d3864667e0f513e7195c.tar.bz2
spark-9a5d482e090eaaea8491d3864667e0f513e7195c.zip
[SPARK-1553] Alternating nonnegative least-squares
This pull request includes a nonnegative least-squares solver (NNLS) tailored to the kinds of small-scale problems that come up when training matrix factorisation models by alternating nonnegative least-squares (ANNLS). The method used for the NNLS subproblems is based on the classical method of projected gradients. There is a modification where, if the set of active constraints has not changed since the last iteration, a conjugate gradient step is considered and possibly rejected in favour of the gradient; this improves convergence once the optimal face has been located. The NNLS solver is in `org.apache.spark.mllib.optimization.NNLSbyPCG`. Author: Tor Myklebust <tmyklebu@gmail.com> Closes #460 from tmyklebu/annls and squashes the following commits: 79bc4b5 [Tor Myklebust] Merge branch 'master' of https://github.com/apache/spark into annls 199b0bc [Tor Myklebust] Make the ctor private again and use the builder pattern. 7fbabf1 [Tor Myklebust] Cleanup matrix math in NNLSSuite. 65ef7f2 [Tor Myklebust] Make ALS's ctor public and remove a couple of "convenience" wrappers. 2d4f3cb [Tor Myklebust] Cleanup. 0cb4481 [Tor Myklebust] Drop the iteration limit from 40k to max(400,20n). e2a01d1 [Tor Myklebust] Create a workspace object for NNLS to cut down on memory allocations. b285106 [Tor Myklebust] Clean up NNLS test cases. 9c820b6 [Tor Myklebust] Tweak variable names. 8a1a436 [Tor Myklebust] Describe the problem and add a reference to Polyak's paper. 5345402 [Tor Myklebust] Style fixes that got eaten. ac673bd [Tor Myklebust] More safeguards against numerical ridiculousness. c288b6a [Tor Myklebust] Finish moving the NNLS solver. 9a82fa6 [Tor Myklebust] Fix scalastyle moanings. 33bf4f2 [Tor Myklebust] Fix missing space. 89ea0a8 [Tor Myklebust] Hack ALSSuite to support NNLS testing. f5dbf4d [Tor Myklebust] Teach ALS how to use the NNLS solver. 6cb563c [Tor Myklebust] Tests for the nonnegative least squares solver. a68ac10 [Tor Myklebust] A nonnegative least-squares solver.
Diffstat (limited to 'mllib')
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala169
-rw-r--r--mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala33
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala80
-rw-r--r--mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala32
4 files changed, 300 insertions, 14 deletions
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
new file mode 100644
index 0000000000..e4b436b023
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/NNLS.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import org.jblas.{DoubleMatrix, SimpleBlas}
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * Object used to solve nonnegative least squares problems using a modified
+ * projected gradient method.
+ */
+private[mllib] object NNLS {
+ class Workspace(val n: Int) {
+ val scratch = new DoubleMatrix(n, 1)
+ val grad = new DoubleMatrix(n, 1)
+ val x = new DoubleMatrix(n, 1)
+ val dir = new DoubleMatrix(n, 1)
+ val lastDir = new DoubleMatrix(n, 1)
+ val res = new DoubleMatrix(n, 1)
+
+ def wipe() {
+ scratch.fill(0.0)
+ grad.fill(0.0)
+ x.fill(0.0)
+ dir.fill(0.0)
+ lastDir.fill(0.0)
+ res.fill(0.0)
+ }
+ }
+
+ def createWorkspace(n: Int): Workspace = {
+ new Workspace(n)
+ }
+
+ /**
+ * Solve a least squares problem, possibly with nonnegativity constraints, by a modified
+ * projected gradient method. That is, find x minimising ||Ax - b||_2 given A^T A and A^T b.
+ *
+ * We solve the problem
+ * min_x 1/2 x^T ata x^T - x^T atb
+ * subject to x >= 0
+ *
+ * The method used is similar to one described by Polyak (B. T. Polyak, The conjugate gradient
+ * method in extremal problems, Zh. Vychisl. Mat. Mat. Fiz. 9(4)(1969), pp. 94-112) for bound-
+ * constrained nonlinear programming. Polyak unconditionally uses a conjugate gradient
+ * direction, however, while this method only uses a conjugate gradient direction if the last
+ * iteration did not cause a previously-inactive constraint to become active.
+ */
+ def solve(ata: DoubleMatrix, atb: DoubleMatrix, ws: Workspace): Array[Double] = {
+ ws.wipe()
+
+ val n = atb.rows
+ val scratch = ws.scratch
+
+ // find the optimal unconstrained step
+ def steplen(dir: DoubleMatrix, res: DoubleMatrix): Double = {
+ val top = SimpleBlas.dot(dir, res)
+ SimpleBlas.gemv(1.0, ata, dir, 0.0, scratch)
+ // Push the denominator upward very slightly to avoid infinities and silliness
+ top / (SimpleBlas.dot(scratch, dir) + 1e-20)
+ }
+
+ // stopping condition
+ def stop(step: Double, ndir: Double, nx: Double): Boolean = {
+ ((step.isNaN) // NaN
+ || (step < 1e-6) // too small or negative
+ || (step > 1e40) // too small; almost certainly numerical problems
+ || (ndir < 1e-12 * nx) // gradient relatively too small
+ || (ndir < 1e-32) // gradient absolutely too small; numerical issues may lurk
+ )
+ }
+
+ val grad = ws.grad
+ val x = ws.x
+ val dir = ws.dir
+ val lastDir = ws.lastDir
+ val res = ws.res
+ val iterMax = Math.max(400, 20 * n)
+ var lastNorm = 0.0
+ var iterno = 0
+ var lastWall = 0 // Last iteration when we hit a bound constraint.
+ var i = 0
+ while (iterno < iterMax) {
+ // find the residual
+ SimpleBlas.gemv(1.0, ata, x, 0.0, res)
+ SimpleBlas.axpy(-1.0, atb, res)
+ SimpleBlas.copy(res, grad)
+
+ // project the gradient
+ i = 0
+ while (i < n) {
+ if (grad.data(i) > 0.0 && x.data(i) == 0.0) {
+ grad.data(i) = 0.0
+ }
+ i = i + 1
+ }
+ val ngrad = SimpleBlas.dot(grad, grad)
+
+ SimpleBlas.copy(grad, dir)
+
+ // use a CG direction under certain conditions
+ var step = steplen(grad, res)
+ var ndir = 0.0
+ val nx = SimpleBlas.dot(x, x)
+ if (iterno > lastWall + 1) {
+ val alpha = ngrad / lastNorm
+ SimpleBlas.axpy(alpha, lastDir, dir)
+ val dstep = steplen(dir, res)
+ ndir = SimpleBlas.dot(dir, dir)
+ if (stop(dstep, ndir, nx)) {
+ // reject the CG step if it could lead to premature termination
+ SimpleBlas.copy(grad, dir)
+ ndir = SimpleBlas.dot(dir, dir)
+ } else {
+ step = dstep
+ }
+ } else {
+ ndir = SimpleBlas.dot(dir, dir)
+ }
+
+ // terminate?
+ if (stop(step, ndir, nx)) {
+ return x.data.clone
+ }
+
+ // don't run through the walls
+ i = 0
+ while (i < n) {
+ if (step * dir.data(i) > x.data(i)) {
+ step = x.data(i) / dir.data(i)
+ }
+ i = i + 1
+ }
+
+ // take the step
+ i = 0
+ while (i < n) {
+ if (step * dir.data(i) > x.data(i) * (1 - 1e-14)) {
+ x.data(i) = 0
+ lastWall = iterno
+ } else {
+ x.data(i) -= step * dir.data(i)
+ }
+ i = i + 1
+ }
+
+ iterno = iterno + 1
+ SimpleBlas.copy(dir, lastDir)
+ lastNorm = ngrad
+ }
+ x.data.clone
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
index 0cf9a7f909..cfc3b68606 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala
@@ -32,6 +32,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
import org.apache.spark.util.Utils
+import org.apache.spark.mllib.optimization.NNLS
/**
* Out-link information for a user or product block. This includes the original user/product IDs
@@ -156,6 +157,18 @@ class ALS private (
this
}
+ /** If true, do alternating nonnegative least squares. */
+ private var nonnegative = false
+
+ /**
+ * Set whether the least-squares problems solved at each iteration should have
+ * nonnegativity constraints.
+ */
+ def setNonnegative(b: Boolean): ALS = {
+ this.nonnegative = b
+ this
+ }
+
/**
* Run ALS with the configured parameters on an input RDD of (user, product, rating) triples.
* Returns a MatrixFactorizationModel with feature vectors for each user and product.
@@ -505,6 +518,8 @@ class ALS private (
}
}
+ val ws = if (nonnegative) NNLS.createWorkspace(rank) else null
+
// Solve the least-squares problem for each user and return the new feature vectors
Array.range(0, numUsers).map { index =>
// Compute the full XtX matrix from the lower-triangular part we got above
@@ -517,14 +532,27 @@ class ALS private (
}
// Solve the resulting matrix, which is symmetric and positive-definite
if (implicitPrefs) {
- Solve.solvePositive(fullXtX.addi(YtY.get.value), userXy(index)).data
+ solveLeastSquares(fullXtX.addi(YtY.get.value), userXy(index), ws)
} else {
- Solve.solvePositive(fullXtX, userXy(index)).data
+ solveLeastSquares(fullXtX, userXy(index), ws)
}
}
}
/**
+ * Given A^T A and A^T b, find the x minimising ||Ax - b||_2, possibly subject
+ * to nonnegativity constraints if `nonnegative` is true.
+ */
+ def solveLeastSquares(ata: DoubleMatrix, atb: DoubleMatrix,
+ ws: NNLS.Workspace): Array[Double] = {
+ if (!nonnegative) {
+ Solve.solvePositive(ata, atb).data
+ } else {
+ NNLS.solve(ata, atb, ws)
+ }
+ }
+
+ /**
* Given a triangular matrix in the order of fillXtX above, compute the full symmetric square
* matrix that it represents, storing it into destMatrix.
*/
@@ -550,7 +578,6 @@ class ALS private (
* Top-level methods for calling Alternating Least Squares (ALS) matrix factorization.
*/
object ALS {
-
/**
* Train a matrix factorization model given an RDD of ratings given by users to some products,
* in the form of (userID, productID, rating) pairs. We approximate the ratings matrix as the
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
new file mode 100644
index 0000000000..bbf3852290
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/NNLSSuite.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.optimization
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.jblas.{DoubleMatrix, SimpleBlas, NativeBlas}
+
+class NNLSSuite extends FunSuite {
+ /** Generate an NNLS problem whose optimal solution is the all-ones vector. */
+ def genOnesData(n: Int, rand: Random): (DoubleMatrix, DoubleMatrix) = {
+ val A = new DoubleMatrix(n, n, Array.fill(n*n)(rand.nextDouble()): _*)
+ val b = A.mmul(DoubleMatrix.ones(n, 1))
+
+ val ata = A.transpose.mmul(A)
+ val atb = A.transpose.mmul(b)
+
+ (ata, atb)
+ }
+
+ test("NNLS: exact solution cases") {
+ val n = 20
+ val rand = new Random(12346)
+ val ws = NNLS.createWorkspace(n)
+ var numSolved = 0
+
+ // About 15% of random 20x20 [-1,1]-matrices have a singular value less than 1e-3. NNLS
+ // can legitimately fail to solve these anywhere close to exactly. So we grab a considerable
+ // sample of these matrices and make sure that we solved a substantial fraction of them.
+
+ for (k <- 0 until 100) {
+ val (ata, atb) = genOnesData(n, rand)
+ val x = new DoubleMatrix(NNLS.solve(ata, atb, ws))
+ assert(x.length === n)
+ val answer = DoubleMatrix.ones(n, 1)
+ SimpleBlas.axpy(-1.0, answer, x)
+ val solved = (x.norm2 < 1e-2) && (x.normmax < 1e-3)
+ if (solved) numSolved = numSolved + 1
+ }
+
+ assert(numSolved > 50)
+ }
+
+ test("NNLS: nonnegativity constraint active") {
+ val n = 5
+ val ata = new DoubleMatrix(Array(
+ Array( 4.377, -3.531, -1.306, -0.139, 3.418),
+ Array(-3.531, 4.344, 0.934, 0.305, -2.140),
+ Array(-1.306, 0.934, 2.644, -0.203, -0.170),
+ Array(-0.139, 0.305, -0.203, 5.883, 1.428),
+ Array( 3.418, -2.140, -0.170, 1.428, 4.684)))
+ val atb = new DoubleMatrix(Array(-1.632, 2.115, 1.094, -1.025, -0.636))
+
+ val goodx = Array(0.13025, 0.54506, 0.2874, 0.0, 0.028628)
+
+ val ws = NNLS.createWorkspace(n)
+ val x = NNLS.solve(ata, atb, ws)
+ for (i <- 0 until n) {
+ assert(Math.abs(x(i) - goodx(i)) < 1e-3)
+ assert(x(i) >= 0)
+ }
+ }
+}
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
index 2d944f3eb7..37c9b9d085 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala
@@ -48,12 +48,18 @@ object ALSSuite {
features: Int,
samplingRate: Double,
implicitPrefs: Boolean = false,
- negativeWeights: Boolean = false): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
+ negativeWeights: Boolean = false,
+ negativeFactors: Boolean = true): (Seq[Rating], DoubleMatrix, DoubleMatrix) = {
val rand = new Random(42)
// Create a random matrix with uniform values from -1 to 1
- def randomMatrix(m: Int, n: Int) =
- new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+ def randomMatrix(m: Int, n: Int) = {
+ if (negativeFactors) {
+ new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble() * 2 - 1): _*)
+ } else {
+ new DoubleMatrix(m, n, Array.fill(m * n)(rand.nextDouble()): _*)
+ }
+ }
val userMatrix = randomMatrix(users, features)
val productMatrix = randomMatrix(features, products)
@@ -146,6 +152,10 @@ class ALSSuite extends FunSuite with LocalSparkContext {
}
}
+ test("NNALS, rank 2") {
+ testALS(100, 200, 2, 15, 0.7, 0.4, false, false, false, -1, false)
+ }
+
/**
* Test if we can correctly factorize R = U * P where U and P are of known rank.
*
@@ -159,19 +169,19 @@ class ALSSuite extends FunSuite with LocalSparkContext {
* @param bulkPredict flag to test bulk prediciton
* @param negativeWeights whether the generated data can contain negative values
* @param numBlocks number of blocks to partition users and products into
+ * @param negativeFactors whether the generated user/product factors can have negative entries
*/
def testALS(users: Int, products: Int, features: Int, iterations: Int,
samplingRate: Double, matchThreshold: Double, implicitPrefs: Boolean = false,
- bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1)
+ bulkPredict: Boolean = false, negativeWeights: Boolean = false, numBlocks: Int = -1,
+ negativeFactors: Boolean = true)
{
val (sampledRatings, trueRatings, truePrefs) = ALSSuite.generateRatings(users, products,
- features, samplingRate, implicitPrefs, negativeWeights)
- val model = implicitPrefs match {
- case false => ALS.train(sc.parallelize(sampledRatings), features, iterations, 0.01,
- numBlocks, 0L)
- case true => ALS.trainImplicit(sc.parallelize(sampledRatings), features, iterations, 0.01,
- numBlocks, 1.0, 0L)
- }
+ features, samplingRate, implicitPrefs, negativeWeights, negativeFactors)
+
+ val model = (new ALS().setBlocks(numBlocks).setRank(features).setIterations(iterations)
+ .setAlpha(1.0).setImplicitPrefs(implicitPrefs).setLambda(0.01).setSeed(0L)
+ .setNonnegative(!negativeFactors).run(sc.parallelize(sampledRatings)))
val predictedU = new DoubleMatrix(users, features)
for ((u, vec) <- model.userFeatures.collect(); i <- 0 until features) {