aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorSean Owen <sowen@cloudera.com>2014-10-27 10:53:15 -0700
committerXiangrui Meng <meng@databricks.com>2014-10-27 10:53:15 -0700
commitbfa614b12795f1cfce4de0950f90cb8c4f2a7d53 (patch)
treee968afcba36be2d27db36bc17c26f38f1a491a48 /examples
parent1d7bcc88401d66c8d17a075355acfc25a8b7615c (diff)
downloadspark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.gz
spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.tar.bz2
spark-bfa614b12795f1cfce4de0950f90cb8c4f2a7d53.zip
SPARK-4022 [CORE] [MLLIB] Replace colt dependency (LGPL) with commons-math
This change replaces usages of colt with commons-math3 equivalents, and makes some minor necessary adjustments to related code and tests to match. Author: Sean Owen <sowen@cloudera.com> Closes #2928 from srowen/SPARK-4022 and squashes the following commits: 61a232f [Sean Owen] Fix failure due to different sampling in JavaAPISuite.sample() 16d66b8 [Sean Owen] Simplify seeding with call to reseedRandomGenerator a1a78e0 [Sean Owen] Use Well19937c 31c7641 [Sean Owen] Fix Python Poisson test by choosing a different seed; about 88% of seeds should work but 1 didn't, it seems 5c9c67f [Sean Owen] Additional test fixes from review d8f88e0 [Sean Owen] Replace colt with commons-math3. Some tests do not pass yet.
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml8
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/LocalALS.scala97
-rw-r--r--examples/src/main/scala/org/apache/spark/examples/SparkALS.scala78
3 files changed, 87 insertions, 96 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index eb49a0e5af..bc3291803c 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -157,6 +157,10 @@
<version>0.1.11</version>
</dependency>
<dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math3</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.binary.version}</artifactId>
<scope>test</scope>
@@ -268,6 +272,10 @@
<exclude>com.google.common.base.Optional**</exclude>
</excludes>
</relocation>
+ <relocation>
+ <pattern>org.apache.commons.math3</pattern>
+ <shadedPattern>org.spark-project.commons.math3</shadedPattern>
+ </relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
index 1f576319b3..3d52594630 100644
--- a/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/LocalALS.scala
@@ -17,11 +17,7 @@
package org.apache.spark.examples
-import scala.math.sqrt
-
-import cern.colt.matrix._
-import cern.colt.matrix.linalg._
-import cern.jet.math._
+import org.apache.commons.math3.linear._
/**
* Alternating least squares matrix factorization.
@@ -30,84 +26,70 @@ import cern.jet.math._
* please refer to org.apache.spark.mllib.recommendation.ALS
*/
object LocalALS {
+
// Parameters set through command line arguments
var M = 0 // Number of movies
var U = 0 // Number of users
var F = 0 // Number of features
var ITERATIONS = 0
-
val LAMBDA = 0.01 // Regularization coefficient
- // Some COLT objects
- val factory2D = DoubleFactory2D.dense
- val factory1D = DoubleFactory1D.dense
- val algebra = Algebra.DEFAULT
- val blas = SeqBlas.seqBlas
-
- def generateR(): DoubleMatrix2D = {
- val mh = factory2D.random(M, F)
- val uh = factory2D.random(U, F)
- algebra.mult(mh, algebra.transpose(uh))
+ def generateR(): RealMatrix = {
+ val mh = randomMatrix(M, F)
+ val uh = randomMatrix(U, F)
+ mh.multiply(uh.transpose())
}
- def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
- us: Array[DoubleMatrix1D]): Double =
- {
- val r = factory2D.make(M, U)
+ def rmse(targetR: RealMatrix, ms: Array[RealVector], us: Array[RealVector]): Double = {
+ val r = new Array2DRowRealMatrix(M, U)
for (i <- 0 until M; j <- 0 until U) {
- r.set(i, j, blas.ddot(ms(i), us(j)))
+ r.setEntry(i, j, ms(i).dotProduct(us(j)))
}
- blas.daxpy(-1, targetR, r)
- val sumSqs = r.aggregate(Functions.plus, Functions.square)
- sqrt(sumSqs / (M * U))
+ val diffs = r.subtract(targetR)
+ var sumSqs = 0.0
+ for (i <- 0 until M; j <- 0 until U) {
+ val diff = diffs.getEntry(i, j)
+ sumSqs += diff * diff
+ }
+ math.sqrt(sumSqs / (M.toDouble * U.toDouble))
}
- def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
- R: DoubleMatrix2D) : DoubleMatrix1D =
- {
- val XtX = factory2D.make(F, F)
- val Xty = factory1D.make(F)
+ def updateMovie(i: Int, m: RealVector, us: Array[RealVector], R: RealMatrix) : RealVector = {
+ var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
+ var Xty: RealVector = new ArrayRealVector(F)
// For each user that rated the movie
for (j <- 0 until U) {
val u = us(j)
// Add u * u^t to XtX
- blas.dger(1, u, u, XtX)
+ XtX = XtX.add(u.outerProduct(u))
// Add u * rating to Xty
- blas.daxpy(R.get(i, j), u, Xty)
+ Xty = Xty.add(u.mapMultiply(R.getEntry(i, j)))
}
- // Add regularization coefs to diagonal terms
+ // Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
- XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ XtX.addToEntry(d, d, LAMBDA * U)
}
// Solve it with Cholesky
- val ch = new CholeskyDecomposition(XtX)
- val Xty2D = factory2D.make(Xty.toArray, F)
- val solved2D = ch.solve(Xty2D)
- solved2D.viewColumn(0)
+ new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}
- def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
- R: DoubleMatrix2D) : DoubleMatrix1D =
- {
- val XtX = factory2D.make(F, F)
- val Xty = factory1D.make(F)
+ def updateUser(j: Int, u: RealVector, ms: Array[RealVector], R: RealMatrix) : RealVector = {
+ var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
+ var Xty: RealVector = new ArrayRealVector(F)
// For each movie that the user rated
for (i <- 0 until M) {
val m = ms(i)
// Add m * m^t to XtX
- blas.dger(1, m, m, XtX)
+ XtX = XtX.add(m.outerProduct(m))
// Add m * rating to Xty
- blas.daxpy(R.get(i, j), m, Xty)
+ Xty = Xty.add(m.mapMultiply(R.getEntry(i, j)))
}
- // Add regularization coefs to diagonal terms
+ // Add regularization coefficients to diagonal terms
for (d <- 0 until F) {
- XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ XtX.addToEntry(d, d, LAMBDA * M)
}
// Solve it with Cholesky
- val ch = new CholeskyDecomposition(XtX)
- val Xty2D = factory2D.make(Xty.toArray, F)
- val solved2D = ch.solve(Xty2D)
- solved2D.viewColumn(0)
+ new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}
def showWarning() {
@@ -135,21 +117,28 @@ object LocalALS {
showWarning()
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
+ println(s"Running with M=$M, U=$U, F=$F, iters=$ITERATIONS")
val R = generateR()
// Initialize m and u randomly
- var ms = Array.fill(M)(factory1D.random(F))
- var us = Array.fill(U)(factory1D.random(F))
+ var ms = Array.fill(M)(randomVector(F))
+ var us = Array.fill(U)(randomVector(F))
// Iteratively update movies then users
for (iter <- 1 to ITERATIONS) {
- println("Iteration " + iter + ":")
+ println(s"Iteration $iter:")
ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
println("RMSE = " + rmse(R, ms, us))
println()
}
}
+
+ private def randomVector(n: Int): RealVector =
+ new ArrayRealVector(Array.fill(n)(math.random))
+
+ private def randomMatrix(rows: Int, cols: Int): RealMatrix =
+ new Array2DRowRealMatrix(Array.fill(rows, cols)(math.random))
+
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
index fde8ffeedf..6c0ac8013c 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkALS.scala
@@ -17,11 +17,7 @@
package org.apache.spark.examples
-import scala.math.sqrt
-
-import cern.colt.matrix._
-import cern.colt.matrix.linalg._
-import cern.jet.math._
+import org.apache.commons.math3.linear._
import org.apache.spark._
@@ -32,62 +28,53 @@ import org.apache.spark._
* please refer to org.apache.spark.mllib.recommendation.ALS
*/
object SparkALS {
+
// Parameters set through command line arguments
var M = 0 // Number of movies
var U = 0 // Number of users
var F = 0 // Number of features
var ITERATIONS = 0
-
val LAMBDA = 0.01 // Regularization coefficient
- // Some COLT objects
- val factory2D = DoubleFactory2D.dense
- val factory1D = DoubleFactory1D.dense
- val algebra = Algebra.DEFAULT
- val blas = SeqBlas.seqBlas
-
- def generateR(): DoubleMatrix2D = {
- val mh = factory2D.random(M, F)
- val uh = factory2D.random(U, F)
- algebra.mult(mh, algebra.transpose(uh))
+ def generateR(): RealMatrix = {
+ val mh = randomMatrix(M, F)
+ val uh = randomMatrix(U, F)
+ mh.multiply(uh.transpose())
}
- def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
- us: Array[DoubleMatrix1D]): Double =
- {
- val r = factory2D.make(M, U)
+ def rmse(targetR: RealMatrix, ms: Array[RealVector], us: Array[RealVector]): Double = {
+ val r = new Array2DRowRealMatrix(M, U)
for (i <- 0 until M; j <- 0 until U) {
- r.set(i, j, blas.ddot(ms(i), us(j)))
+ r.setEntry(i, j, ms(i).dotProduct(us(j)))
}
- blas.daxpy(-1, targetR, r)
- val sumSqs = r.aggregate(Functions.plus, Functions.square)
- sqrt(sumSqs / (M * U))
+ val diffs = r.subtract(targetR)
+ var sumSqs = 0.0
+ for (i <- 0 until M; j <- 0 until U) {
+ val diff = diffs.getEntry(i, j)
+ sumSqs += diff * diff
+ }
+ math.sqrt(sumSqs / (M.toDouble * U.toDouble))
}
- def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
- R: DoubleMatrix2D) : DoubleMatrix1D =
- {
+ def update(i: Int, m: RealVector, us: Array[RealVector], R: RealMatrix) : RealVector = {
val U = us.size
- val F = us(0).size
- val XtX = factory2D.make(F, F)
- val Xty = factory1D.make(F)
+ val F = us(0).getDimension
+ var XtX: RealMatrix = new Array2DRowRealMatrix(F, F)
+ var Xty: RealVector = new ArrayRealVector(F)
// For each user that rated the movie
for (j <- 0 until U) {
val u = us(j)
// Add u * u^t to XtX
- blas.dger(1, u, u, XtX)
+ XtX = XtX.add(u.outerProduct(u))
// Add u * rating to Xty
- blas.daxpy(R.get(i, j), u, Xty)
+ Xty = Xty.add(u.mapMultiply(R.getEntry(i, j)))
}
// Add regularization coefs to diagonal terms
for (d <- 0 until F) {
- XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ XtX.addToEntry(d, d, LAMBDA * U)
}
// Solve it with Cholesky
- val ch = new CholeskyDecomposition(XtX)
- val Xty2D = factory2D.make(Xty.toArray, F)
- val solved2D = ch.solve(Xty2D)
- solved2D.viewColumn(0)
+ new CholeskyDecomposition(XtX).getSolver.solve(Xty)
}
def showWarning() {
@@ -118,7 +105,7 @@ object SparkALS {
showWarning()
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
+ println(s"Running with M=$M, U=$U, F=$F, iters=$ITERATIONS")
val sparkConf = new SparkConf().setAppName("SparkALS")
val sc = new SparkContext(sparkConf)
@@ -126,21 +113,21 @@ object SparkALS {
val R = generateR()
// Initialize m and u randomly
- var ms = Array.fill(M)(factory1D.random(F))
- var us = Array.fill(U)(factory1D.random(F))
+ var ms = Array.fill(M)(randomVector(F))
+ var us = Array.fill(U)(randomVector(F))
// Iteratively update movies then users
val Rc = sc.broadcast(R)
var msb = sc.broadcast(ms)
var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
- println("Iteration " + iter + ":")
+ println(s"Iteration $iter:")
ms = sc.parallelize(0 until M, slices)
.map(i => update(i, msb.value(i), usb.value, Rc.value))
.collect()
msb = sc.broadcast(ms) // Re-broadcast ms because it was updated
us = sc.parallelize(0 until U, slices)
- .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value)))
+ .map(i => update(i, usb.value(i), msb.value, Rc.value.transpose()))
.collect()
usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
@@ -149,4 +136,11 @@ object SparkALS {
sc.stop()
}
+
+ private def randomVector(n: Int): RealVector =
+ new ArrayRealVector(Array.fill(n)(math.random))
+
+ private def randomMatrix(rows: Int, cols: Int): RealMatrix =
+ new Array2DRowRealMatrix(Array.fill(rows, cols)(math.random))
+
}