aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-01-20 12:47:55 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-01-20 12:47:55 -0800
commit86057ec7c868262763d1e31b3f3c94bd43eeafb3 (patch)
treecdde5e4264549bd10b67e4da73322391b922e14e /examples
parent76ff962edcb7f41601c6c2d4fc6714bbc885faa7 (diff)
parent9f54d7e1f5a5e6f80b3d710de67f800bef943d33 (diff)
downloadspark-86057ec7c868262763d1e31b3f3c94bd43eeafb3.tar.gz
spark-86057ec7c868262763d1e31b3f3c94bd43eeafb3.tar.bz2
spark-86057ec7c868262763d1e31b3f3c94bd43eeafb3.zip
Merge branch 'master' into streaming
Conflicts: core/src/main/scala/spark/api/python/PythonRDD.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala59
1 files changed, 20 insertions, 39 deletions
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
index fb28e2c932..5e01885dbb 100644
--- a/examples/src/main/scala/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -7,6 +7,7 @@ import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
import spark._
+import scala.Option
object SparkALS {
// Parameters set through command line arguments
@@ -42,7 +43,7 @@ object SparkALS {
return sqrt(sumSqs / (M * U))
}
- def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ def update(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
R: DoubleMatrix2D) : DoubleMatrix1D =
{
val U = us.size
@@ -68,50 +69,30 @@ object SparkALS {
return solved2D.viewColumn(0)
}
- def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
- R: DoubleMatrix2D) : DoubleMatrix1D =
- {
- val M = ms.size
- val F = ms(0).size
- val XtX = factory2D.make(F, F)
- val Xty = factory1D.make(F)
- // For each movie that the user rated
- for (i <- 0 until M) {
- val m = ms(i)
- // Add m * m^t to XtX
- blas.dger(1, m, m, XtX)
- // Add m * rating to Xty
- blas.daxpy(R.get(i, j), m, Xty)
- }
- // Add regularization coefs to diagonal terms
- for (d <- 0 until F) {
- XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
- }
- // Solve it with Cholesky
- val ch = new CholeskyDecomposition(XtX)
- val Xty2D = factory2D.make(Xty.toArray, F)
- val solved2D = ch.solve(Xty2D)
- return solved2D.viewColumn(0)
- }
-
def main(args: Array[String]) {
var host = ""
var slices = 0
- args match {
- case Array(m, u, f, iters, slices_, host_) => {
- M = m.toInt
- U = u.toInt
- F = f.toInt
- ITERATIONS = iters.toInt
- slices = slices_.toInt
- host = host_
+
+ (0 to 5).map(i => {
+ i match {
+ case a if a < args.length => Some(args(a))
+ case _ => None
+ }
+ }).toArray match {
+ case Array(host_, m, u, f, iters, slices_) => {
+ host = host_ getOrElse "local"
+ M = (m getOrElse "100").toInt
+ U = (u getOrElse "500").toInt
+ F = (f getOrElse "10").toInt
+ ITERATIONS = (iters getOrElse "5").toInt
+ slices = (slices_ getOrElse "2").toInt
}
case _ => {
- System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <master>")
+ System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]")
System.exit(1)
}
}
- printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
val spark = new SparkContext(host, "SparkALS")
val R = generateR()
@@ -127,11 +108,11 @@ object SparkALS {
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
ms = spark.parallelize(0 until M, slices)
- .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
+ .map(i => update(i, msc.value(i), usc.value, Rc.value))
.toArray
msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
us = spark.parallelize(0 until U, slices)
- .map(i => updateUser(i, usc.value(i), msc.value, Rc.value))
+ .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value)))
.toArray
usc = spark.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))