aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 14:35:03 -0700
committerMosharaf Chowdhury <mosharaf@cs.berkeley.edu>2011-04-27 14:35:03 -0700
commitac7e066383a6878beb0618597c2be6fa9eb1982e (patch)
treea14bca13f50705ad2bfeb1d7634ad1dfb55346bf /examples
parentdfbc5af6ba7577f0af050b4f6fb294a956ff6452 (diff)
parent9d2d53349353267db80cd4bce6db059736237575 (diff)
downloadspark-ac7e066383a6878beb0618597c2be6fa9eb1982e.tar.gz
spark-ac7e066383a6878beb0618597c2be6fa9eb1982e.tar.bz2
spark-ac7e066383a6878beb0618597c2be6fa9eb1982e.zip
Merge branch 'master' into mos-shuffle-tracked
Conflicts: .gitignore core/src/main/scala/spark/LocalFileShuffle.scala src/scala/spark/BasicLocalFileShuffle.scala src/scala/spark/Broadcast.scala src/scala/spark/LocalFileShuffle.scala
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala30
-rw-r--r--examples/src/main/scala/spark/examples/CpuHog.scala26
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala37
-rw-r--r--examples/src/main/scala/spark/examples/HdfsTest.scala18
-rw-r--r--examples/src/main/scala/spark/examples/LocalALS.scala121
-rw-r--r--examples/src/main/scala/spark/examples/LocalFileLR.scala38
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala43
-rw-r--r--examples/src/main/scala/spark/examples/LocalPi.scala17
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala51
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala41
-rw-r--r--examples/src/main/scala/spark/examples/SleepJob.scala21
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala141
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala53
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala51
-rw-r--r--examples/src/main/scala/spark/examples/SparkPi.scala23
-rw-r--r--examples/src/main/scala/spark/examples/Vector.scala65
16 files changed, 776 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
new file mode 100644
index 0000000000..2506de5ae5
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -0,0 +1,30 @@
+package spark.examples
+
+import spark.SparkContext
+
+object BroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "Broadcast Test")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length)
+ arr1(i) = i
+
+// var arr2 = new Array[Int](num * 2)
+// for (i <- 0 until arr2.length)
+// arr2(i) = i
+
+ val barr1 = spark.broadcast(arr1)
+// val barr2 = spark.broadcast(arr2)
+ spark.parallelize(1 to 10, slices).foreach {
+// i => println(barr1.value.size + barr2.value.size)
+ i => println(barr1.value.size)
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala
new file mode 100644
index 0000000000..94b3709850
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/CpuHog.scala
@@ -0,0 +1,26 @@
+package spark.examples
+
+import spark._
+
+object CpuHog {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: CpuHog <master> <tasks> <threads_per_task>");
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "CPU hog")
+ val tasks = args(1).toInt
+ val threads = args(2).toInt
+ def task {
+ for (i <- 0 until threads-1) {
+ new Thread() {
+ override def run {
+ while(true) {}
+ }
+ }.start()
+ }
+ while(true) {}
+ }
+ sc.runTasks(Array.make(tasks, () => task))
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
new file mode 100644
index 0000000000..48c02a52c6
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -0,0 +1,37 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object GroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala
new file mode 100644
index 0000000000..072b4ce417
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/HdfsTest.scala
@@ -0,0 +1,18 @@
+package spark.examples
+
+import spark._
+
+object HdfsTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HdfsTest")
+ val file = sc.textFile(args(1))
+ val mapped = file.map(s => s.length).cache()
+ for (iter <- 1 to 10) {
+ val start = System.currentTimeMillis()
+ for (x <- mapped) { x + 2 }
+ // println("Processing: " + x)
+ val end = System.currentTimeMillis()
+ println("Iteration " + iter + " took " + (end-start) + " ms")
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala
new file mode 100644
index 0000000000..10360dab3d
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalALS.scala
@@ -0,0 +1,121 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+
+object LocalALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+
+ val LAMBDA = 0.01 // Regularization coefficient
+
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def main(args: Array[String]) {
+ args match {
+ case Array(m, u, f, iters) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ }
+ case _ => {
+ System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+
+ val R = generateR()
+
+ // Initialize m and u randomly
+ var ms = Array.fromFunction(_ => factory1D.random(F))(M)
+ var us = Array.fromFunction(_ => factory1D.random(F))(U)
+
+ // Iteratively update movies then users
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
+ us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala
new file mode 100644
index 0000000000..cc14aa7090
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala
@@ -0,0 +1,38 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+
+object LocalFileLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ val nums = line.split(' ').map(_.toDouble)
+ return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ }
+
+ def main(args: Array[String]) {
+ val lines = scala.io.Source.fromFile(args(0)).getLines()
+ val points = lines.map(parsePoint _)
+ val ITERATIONS = args(1).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- points) {
+ val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
new file mode 100644
index 0000000000..3fd3f88fa8
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -0,0 +1,43 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+
+object LocalLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Numer of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.fromFunction(generatePoint _)(N)
+ }
+
+ def main(args: Array[String]) {
+ val data = generateData
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- data) {
+ val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalPi.scala b/examples/src/main/scala/spark/examples/LocalPi.scala
new file mode 100644
index 0000000000..9457472f2d
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalPi.scala
@@ -0,0 +1,17 @@
+package spark.examples
+
+import scala.math.random
+import spark._
+import SparkContext._
+
+object LocalPi {
+ def main(args: Array[String]) {
+ var count = 0
+ for (i <- 1 to 100000) {
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) count += 1
+ }
+ println("Pi is roughly " + 4 * count / 100000.0)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
new file mode 100644
index 0000000000..c8edb7d8b4
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -0,0 +1,51 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SimpleSkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SimpleSkewedGroupByTest <host> " +
+ "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+ var ratio = if (args.length > 5) args(5).toInt else 5.0
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+ var result = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ val offset = ranGen.nextInt(1000) * numReducers
+ if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) {
+ // give ratio times higher chance of generating key 0 (for reducer 0)
+ result(i) = (offset, byteArr)
+ } else {
+ // generate a key for one of the other reducers
+ val key = 1 + ranGen.nextInt(numReducers-1) + offset
+ result(i) = (key, byteArr)
+ }
+ }
+ result
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println("RESULT: " + pairs1.groupByKey(numReducers).count)
+ // Print how many keys each reducer got (for debugging)
+ //println("RESULT: " + pairs1.groupByKey(numReducers)
+ // .map{case (k,v) => (k, v.size)}
+ // .collectAsMap)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
new file mode 100644
index 0000000000..e6dec44bed
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -0,0 +1,41 @@
+package spark.examples
+
+import spark.SparkContext
+import spark.SparkContext._
+import java.util.Random
+
+object SkewedGroupByTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]")
+ System.exit(1)
+ }
+
+ var numMappers = if (args.length > 1) args(1).toInt else 2
+ var numKVPairs = if (args.length > 2) args(2).toInt else 1000
+ var valSize = if (args.length > 3) args(3).toInt else 1000
+ var numReducers = if (args.length > 4) args(4).toInt else numMappers
+
+ val sc = new SparkContext(args(0), "GroupBy Test")
+
+ val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
+ val ranGen = new Random
+
+ // map output sizes lineraly increase from the 1st to the last
+ numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+
+ var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
+ for (i <- 0 until numKVPairs) {
+ val byteArr = new Array[Byte](valSize)
+ ranGen.nextBytes(byteArr)
+ arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
+ }
+ arr1
+ }.cache
+ // Enforce that everything has been calculated and in cache
+ pairs1.count
+
+ println(pairs1.groupByKey(numReducers).count)
+ }
+}
+
diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala
new file mode 100644
index 0000000000..02673a5f88
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SleepJob.scala
@@ -0,0 +1,21 @@
+package spark.examples
+
+import spark._
+
+object SleepJob {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: SleepJob <master> <tasks> <task_duration>");
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Sleep job")
+ val tasks = args(1).toInt
+ val duration = args(2).toInt
+ def task {
+ val start = System.currentTimeMillis
+ while (System.currentTimeMillis - start < duration * 1000L)
+ Thread.sleep(200)
+ }
+ sc.runTasks(Array.make(tasks, () => task))
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
new file mode 100644
index 0000000000..08e0420371
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -0,0 +1,141 @@
+package spark.examples
+
+import java.io.Serializable
+import java.util.Random
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+import spark._
+
+object SparkALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+
+ val LAMBDA = 0.01 // Regularization coefficient
+
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val U = us.size
+ val F = us(0).size
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val M = ms.size
+ val F = ms(0).size
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def main(args: Array[String]) {
+ var host = ""
+ var slices = 0
+ args match {
+ case Array(m, u, f, iters, slices_, host_) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ slices = slices_.toInt
+ host = host_
+ }
+ case _ => {
+ System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <host>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ val spark = new SparkContext(host, "SparkALS")
+
+ val R = generateR()
+
+ // Initialize m and u randomly
+ var ms = Array.fromFunction(_ => factory1D.random(F))(M)
+ var us = Array.fromFunction(_ => factory1D.random(F))(U)
+
+ // Iteratively update movies then users
+ val Rc = spark.broadcast(R)
+ var msc = spark.broadcast(ms)
+ var usc = spark.broadcast(us)
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = spark.parallelize(0 until M, slices)
+ .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
+ .toArray
+ msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
+ us = spark.parallelize(0 until U, slices)
+ .map(i => updateUser(i, usc.value(i), msc.value, Rc.value))
+ .toArray
+ usc = spark.broadcast(us) // Re-broadcast us because it was updated
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
new file mode 100644
index 0000000000..4c71fd0845
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -0,0 +1,53 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.exp
+import Vector._
+import spark._
+
+object SparkHdfsLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ //val nums = line.split(' ').map(_.toDouble)
+ //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ val tok = new java.util.StringTokenizer(line, " ")
+ var y = tok.nextToken.toDouble
+ var x = new Array[Double](D)
+ var i = 0
+ while (i < D) {
+ x(i) = tok.nextToken.toDouble; i += 1
+ }
+ return DataPoint(new Vector(x), y)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkHdfsLR")
+ val lines = sc.textFile(args(1))
+ val points = lines.map(parsePoint _).cache()
+ val ITERATIONS = args(2).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = sc.accumulator(Vector.zeros(D))
+ for (p <- points) {
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient.value
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
new file mode 100644
index 0000000000..d08f5d3f01
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -0,0 +1,51 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.exp
+import Vector._
+import spark._
+
+object SparkLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Numer of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.fromFunction(generatePoint _)(N)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkLR <host> [<slices>]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkLR")
+ val numSlices = if (args.length > 1) args(1).toInt else 2
+ val data = generateData
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = sc.accumulator(Vector.zeros(D))
+ for (p <- sc.parallelize(data, numSlices)) {
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient.value
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala
new file mode 100644
index 0000000000..31c6c5b9b1
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkPi.scala
@@ -0,0 +1,23 @@
+package spark.examples
+
+import scala.math.random
+import spark._
+import SparkContext._
+
+object SparkPi {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkPi <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkPi")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ var count = spark.accumulator(0)
+ for (i <- spark.parallelize(1 to 100000, slices)) {
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) count += 1
+ }
+ println("Pi is roughly " + 4 * count.value / 100000.0)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala
new file mode 100644
index 0000000000..ea70626e71
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/Vector.scala
@@ -0,0 +1,65 @@
+package spark.examples
+
+@serializable class Vector(val elements: Array[Double]) {
+ def length = elements.length
+
+ def apply(index: Int) = elements(index)
+
+ def + (other: Vector): Vector = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ return Vector(length, i => this(i) + other(i))
+ }
+
+ def - (other: Vector): Vector = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ return Vector(length, i => this(i) - other(i))
+ }
+
+ def dot(other: Vector): Double = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ var ans = 0.0
+ for (i <- 0 until length)
+ ans += this(i) * other(i)
+ return ans
+ }
+
+ def * ( scale: Double): Vector = Vector(length, i => this(i) * scale)
+
+ def unary_- = this * -1
+
+ def sum = elements.reduceLeft(_ + _)
+
+ override def toString = elements.mkString("(", ", ", ")")
+
+}
+
+object Vector {
+ def apply(elements: Array[Double]) = new Vector(elements)
+
+ def apply(elements: Double*) = new Vector(elements.toArray)
+
+ def apply(length: Int, initializer: Int => Double): Vector = {
+ val elements = new Array[Double](length)
+ for (i <- 0 until length)
+ elements(i) = initializer(i)
+ return new Vector(elements)
+ }
+
+ def zeros(length: Int) = new Vector(new Array[Double](length))
+
+ def ones(length: Int) = Vector(length, _ => 1)
+
+ class Multiplier(num: Double) {
+ def * (vec: Vector) = vec * num
+ }
+
+ implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
+
+ implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
+ def addInPlace(t1: Vector, t2: Vector) = t1 + t2
+ def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
+ }
+}