From a11fe23017b3e1efb18befbd1b6abe27abf9a396 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Wed, 2 Feb 2011 16:28:57 -0800 Subject: Moved examples to spark.examples package --- README | 4 +- examples/src/main/scala/BroadcastTest.scala | 28 ---- examples/src/main/scala/CpuHog.scala | 24 ---- examples/src/main/scala/HdfsTest.scala | 16 --- examples/src/main/scala/LocalALS.scala | 119 ----------------- examples/src/main/scala/LocalFileLR.scala | 36 ------ examples/src/main/scala/LocalLR.scala | 41 ------ examples/src/main/scala/LocalPi.scala | 15 --- examples/src/main/scala/SleepJob.scala | 19 --- examples/src/main/scala/SparkALS.scala | 139 -------------------- examples/src/main/scala/SparkHdfsLR.scala | 51 -------- examples/src/main/scala/SparkLR.scala | 49 ------- examples/src/main/scala/SparkPi.scala | 21 --- examples/src/main/scala/Vector.scala | 63 --------- .../main/scala/spark/examples/BroadcastTest.scala | 30 +++++ .../src/main/scala/spark/examples/CpuHog.scala | 26 ++++ .../src/main/scala/spark/examples/HdfsTest.scala | 18 +++ .../src/main/scala/spark/examples/LocalALS.scala | 121 ++++++++++++++++++ .../main/scala/spark/examples/LocalFileLR.scala | 38 ++++++ .../src/main/scala/spark/examples/LocalLR.scala | 43 +++++++ .../src/main/scala/spark/examples/LocalPi.scala | 17 +++ .../src/main/scala/spark/examples/SleepJob.scala | 21 +++ .../src/main/scala/spark/examples/SparkALS.scala | 141 +++++++++++++++++++++ .../main/scala/spark/examples/SparkHdfsLR.scala | 53 ++++++++ .../src/main/scala/spark/examples/SparkLR.scala | 51 ++++++++ .../src/main/scala/spark/examples/SparkPi.scala | 23 ++++ .../src/main/scala/spark/examples/Vector.scala | 65 ++++++++++ 27 files changed, 649 insertions(+), 623 deletions(-) delete mode 100644 examples/src/main/scala/BroadcastTest.scala delete mode 100644 examples/src/main/scala/CpuHog.scala delete mode 100644 examples/src/main/scala/HdfsTest.scala delete mode 100644 examples/src/main/scala/LocalALS.scala delete mode 100644 examples/src/main/scala/LocalFileLR.scala delete mode 100644 examples/src/main/scala/LocalLR.scala delete mode 100644 examples/src/main/scala/LocalPi.scala delete mode 100644 examples/src/main/scala/SleepJob.scala delete mode 100644 examples/src/main/scala/SparkALS.scala delete mode 100644 examples/src/main/scala/SparkHdfsLR.scala delete mode 100644 examples/src/main/scala/SparkLR.scala delete mode 100644 examples/src/main/scala/SparkPi.scala delete mode 100644 examples/src/main/scala/Vector.scala create mode 100644 examples/src/main/scala/spark/examples/BroadcastTest.scala create mode 100644 examples/src/main/scala/spark/examples/CpuHog.scala create mode 100644 examples/src/main/scala/spark/examples/HdfsTest.scala create mode 100644 examples/src/main/scala/spark/examples/LocalALS.scala create mode 100644 examples/src/main/scala/spark/examples/LocalFileLR.scala create mode 100644 examples/src/main/scala/spark/examples/LocalLR.scala create mode 100644 examples/src/main/scala/spark/examples/LocalPi.scala create mode 100644 examples/src/main/scala/spark/examples/SleepJob.scala create mode 100644 examples/src/main/scala/spark/examples/SparkALS.scala create mode 100644 examples/src/main/scala/spark/examples/SparkHdfsLR.scala create mode 100644 examples/src/main/scala/spark/examples/SparkLR.scala create mode 100644 examples/src/main/scala/spark/examples/SparkPi.scala create mode 100644 examples/src/main/scala/spark/examples/Vector.scala diff --git a/README b/README index f084f22a1f..d60b143085 100644 --- a/README +++ b/README @@ -10,8 +10,8 @@ of these methods on Mesos slave nodes as well as on the master. To build Spark and the example programs, run make. To run one of the examples, use ./run . For example, -./run SparkLR will run the Logistic Regression example. Each of the -example programs prints usage help if no params are given. +./run spark.examples.SparkLR will run the Logistic Regression example. +Each of the example programs prints usage help if no params are given. All of the Spark samples take a parameter that is the Mesos master to connect to. This can be a Mesos URL, or "local" to run locally with one diff --git a/examples/src/main/scala/BroadcastTest.scala b/examples/src/main/scala/BroadcastTest.scala deleted file mode 100644 index 40c2be8f6d..0000000000 --- a/examples/src/main/scala/BroadcastTest.scala +++ /dev/null @@ -1,28 +0,0 @@ -import spark.SparkContext - -object BroadcastTest { - def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: BroadcastTest []") - System.exit(1) - } - val spark = new SparkContext(args(0), "Broadcast Test") - val slices = if (args.length > 1) args(1).toInt else 2 - val num = if (args.length > 2) args(2).toInt else 1000000 - - var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) - arr1(i) = i - -// var arr2 = new Array[Int](num * 2) -// for (i <- 0 until arr2.length) -// arr2(i) = i - - val barr1 = spark.broadcast(arr1) -// val barr2 = spark.broadcast(arr2) - spark.parallelize(1 to 10, slices).foreach { -// i => println(barr1.value.size + barr2.value.size) - i => println(barr1.value.size) - } - } -} diff --git a/examples/src/main/scala/CpuHog.scala b/examples/src/main/scala/CpuHog.scala deleted file mode 100644 index f37c6f7824..0000000000 --- a/examples/src/main/scala/CpuHog.scala +++ /dev/null @@ -1,24 +0,0 @@ -import spark._ - -object CpuHog { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: CpuHog "); - System.exit(1) - } - val sc = new SparkContext(args(0), "CPU hog") - val tasks = args(1).toInt - val threads = args(2).toInt - def task { - for (i <- 0 until threads-1) { - new Thread() { - override def run { - while(true) {} - } - }.start() - } - while(true) {} - } - sc.runTasks(Array.make(tasks, () => task)) - } -} diff --git a/examples/src/main/scala/HdfsTest.scala b/examples/src/main/scala/HdfsTest.scala deleted file mode 100644 index e678154aab..0000000000 --- a/examples/src/main/scala/HdfsTest.scala +++ /dev/null @@ -1,16 +0,0 @@ -import spark._ - -object HdfsTest { - def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HdfsTest") - val file = sc.textFile(args(1)) - val mapped = file.map(s => s.length).cache() - for (iter <- 1 to 10) { - val start = System.currentTimeMillis() - for (x <- mapped) { x + 2 } - // println("Processing: " + x) - val end = System.currentTimeMillis() - println("Iteration " + iter + " took " + (end-start) + " ms") - } - } -} diff --git a/examples/src/main/scala/LocalALS.scala b/examples/src/main/scala/LocalALS.scala deleted file mode 100644 index a976a5e1c5..0000000000 --- a/examples/src/main/scala/LocalALS.scala +++ /dev/null @@ -1,119 +0,0 @@ -import java.util.Random -import scala.math.sqrt -import cern.jet.math._ -import cern.colt.matrix._ -import cern.colt.matrix.linalg._ - -object LocalALS { - // Parameters set through command line arguments - var M = 0 // Number of movies - var U = 0 // Number of users - var F = 0 // Number of features - var ITERATIONS = 0 - - val LAMBDA = 0.01 // Regularization coefficient - - // Some COLT objects - val factory2D = DoubleFactory2D.dense - val factory1D = DoubleFactory1D.dense - val algebra = Algebra.DEFAULT - val blas = SeqBlas.seqBlas - - def generateR(): DoubleMatrix2D = { - val mh = factory2D.random(M, F) - val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) - } - - def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { - val r = factory2D.make(M, U) - for (i <- 0 until M; j <- 0 until U) { - r.set(i, j, blas.ddot(ms(i), us(j))) - } - //println("R: " + r) - blas.daxpy(-1, targetR, r) - val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) - } - - def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) - // For each user that rated the movie - for (j <- 0 until U) { - val u = us(j) - // Add u * u^t to XtX - blas.dger(1, u, u, XtX) - // Add u * rating to Xty - blas.daxpy(R.get(i, j), u, Xty) - } - // Add regularization coefs to diagonal terms - for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) - } - // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) - } - - def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) - // For each movie that the user rated - for (i <- 0 until M) { - val m = ms(i) - // Add m * m^t to XtX - blas.dger(1, m, m, XtX) - // Add m * rating to Xty - blas.daxpy(R.get(i, j), m, Xty) - } - // Add regularization coefs to diagonal terms - for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) - } - // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) - } - - def main(args: Array[String]) { - args match { - case Array(m, u, f, iters) => { - M = m.toInt - U = u.toInt - F = f.toInt - ITERATIONS = iters.toInt - } - case _ => { - System.err.println("Usage: LocalALS ") - System.exit(1) - } - } - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); - - val R = generateR() - - // Initialize m and u randomly - var ms = Array.fromFunction(_ => factory1D.random(F))(M) - var us = Array.fromFunction(_ => factory1D.random(F))(U) - - // Iteratively update movies then users - for (iter <- 1 to ITERATIONS) { - println("Iteration " + iter + ":") - ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray - us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray - println("RMSE = " + rmse(R, ms, us)) - println() - } - } -} diff --git a/examples/src/main/scala/LocalFileLR.scala b/examples/src/main/scala/LocalFileLR.scala deleted file mode 100644 index 3d3bb60677..0000000000 --- a/examples/src/main/scala/LocalFileLR.scala +++ /dev/null @@ -1,36 +0,0 @@ -import java.util.Random -import Vector._ - -object LocalFileLR { - val D = 10 // Numer of dimensions - val rand = new Random(42) - - case class DataPoint(x: Vector, y: Double) - - def parsePoint(line: String): DataPoint = { - val nums = line.split(' ').map(_.toDouble) - return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) - } - - def main(args: Array[String]) { - val lines = scala.io.Source.fromFile(args(0)).getLines() - val points = lines.map(parsePoint _) - val ITERATIONS = args(1).toInt - - // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) - println("Initial w: " + w) - - for (i <- 1 to ITERATIONS) { - println("On iteration " + i) - var gradient = Vector.zeros(D) - for (p <- points) { - val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x - } - w -= gradient - } - - println("Final w: " + w) - } -} diff --git a/examples/src/main/scala/LocalLR.scala b/examples/src/main/scala/LocalLR.scala deleted file mode 100644 index 175907e551..0000000000 --- a/examples/src/main/scala/LocalLR.scala +++ /dev/null @@ -1,41 +0,0 @@ -import java.util.Random -import Vector._ - -object LocalLR { - val N = 10000 // Number of data points - val D = 10 // Numer of dimensions - val R = 0.7 // Scaling factor - val ITERATIONS = 5 - val rand = new Random(42) - - case class DataPoint(x: Vector, y: Double) - - def generateData = { - def generatePoint(i: Int) = { - val y = if(i % 2 == 0) -1 else 1 - val x = Vector(D, _ => rand.nextGaussian + y * R) - DataPoint(x, y) - } - Array.fromFunction(generatePoint _)(N) - } - - def main(args: Array[String]) { - val data = generateData - - // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) - println("Initial w: " + w) - - for (i <- 1 to ITERATIONS) { - println("On iteration " + i) - var gradient = Vector.zeros(D) - for (p <- data) { - val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x - } - w -= gradient - } - - println("Final w: " + w) - } -} diff --git a/examples/src/main/scala/LocalPi.scala b/examples/src/main/scala/LocalPi.scala deleted file mode 100644 index c61b3e53d4..0000000000 --- a/examples/src/main/scala/LocalPi.scala +++ /dev/null @@ -1,15 +0,0 @@ -import scala.math.random -import spark._ -import SparkContext._ - -object LocalPi { - def main(args: Array[String]) { - var count = 0 - for (i <- 1 to 100000) { - val x = random * 2 - 1 - val y = random * 2 - 1 - if (x*x + y*y < 1) count += 1 - } - println("Pi is roughly " + 4 * count / 100000.0) - } -} diff --git a/examples/src/main/scala/SleepJob.scala b/examples/src/main/scala/SleepJob.scala deleted file mode 100644 index a5e0ea0dc2..0000000000 --- a/examples/src/main/scala/SleepJob.scala +++ /dev/null @@ -1,19 +0,0 @@ -import spark._ - -object SleepJob { - def main(args: Array[String]) { - if (args.length != 3) { - System.err.println("Usage: SleepJob "); - System.exit(1) - } - val sc = new SparkContext(args(0), "Sleep job") - val tasks = args(1).toInt - val duration = args(2).toInt - def task { - val start = System.currentTimeMillis - while (System.currentTimeMillis - start < duration * 1000L) - Thread.sleep(200) - } - sc.runTasks(Array.make(tasks, () => task)) - } -} diff --git a/examples/src/main/scala/SparkALS.scala b/examples/src/main/scala/SparkALS.scala deleted file mode 100644 index 6fae3c0940..0000000000 --- a/examples/src/main/scala/SparkALS.scala +++ /dev/null @@ -1,139 +0,0 @@ -import java.io.Serializable -import java.util.Random -import scala.math.sqrt -import cern.jet.math._ -import cern.colt.matrix._ -import cern.colt.matrix.linalg._ -import spark._ - -object SparkALS { - // Parameters set through command line arguments - var M = 0 // Number of movies - var U = 0 // Number of users - var F = 0 // Number of features - var ITERATIONS = 0 - - val LAMBDA = 0.01 // Regularization coefficient - - // Some COLT objects - val factory2D = DoubleFactory2D.dense - val factory1D = DoubleFactory1D.dense - val algebra = Algebra.DEFAULT - val blas = SeqBlas.seqBlas - - def generateR(): DoubleMatrix2D = { - val mh = factory2D.random(M, F) - val uh = factory2D.random(U, F) - return algebra.mult(mh, algebra.transpose(uh)) - } - - def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], - us: Array[DoubleMatrix1D]): Double = - { - val r = factory2D.make(M, U) - for (i <- 0 until M; j <- 0 until U) { - r.set(i, j, blas.ddot(ms(i), us(j))) - } - //println("R: " + r) - blas.daxpy(-1, targetR, r) - val sumSqs = r.aggregate(Functions.plus, Functions.square) - return sqrt(sumSqs / (M * U)) - } - - def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val U = us.size - val F = us(0).size - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) - // For each user that rated the movie - for (j <- 0 until U) { - val u = us(j) - // Add u * u^t to XtX - blas.dger(1, u, u, XtX) - // Add u * rating to Xty - blas.daxpy(R.get(i, j), u, Xty) - } - // Add regularization coefs to diagonal terms - for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) - } - // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) - } - - def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], - R: DoubleMatrix2D) : DoubleMatrix1D = - { - val M = ms.size - val F = ms(0).size - val XtX = factory2D.make(F, F) - val Xty = factory1D.make(F) - // For each movie that the user rated - for (i <- 0 until M) { - val m = ms(i) - // Add m * m^t to XtX - blas.dger(1, m, m, XtX) - // Add m * rating to Xty - blas.daxpy(R.get(i, j), m, Xty) - } - // Add regularization coefs to diagonal terms - for (d <- 0 until F) { - XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) - } - // Solve it with Cholesky - val ch = new CholeskyDecomposition(XtX) - val Xty2D = factory2D.make(Xty.toArray, F) - val solved2D = ch.solve(Xty2D) - return solved2D.viewColumn(0) - } - - def main(args: Array[String]) { - var host = "" - var slices = 0 - args match { - case Array(m, u, f, iters, slices_, host_) => { - M = m.toInt - U = u.toInt - F = f.toInt - ITERATIONS = iters.toInt - slices = slices_.toInt - host = host_ - } - case _ => { - System.err.println("Usage: SparkALS ") - System.exit(1) - } - } - printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); - val spark = new SparkContext(host, "SparkALS") - - val R = generateR() - - // Initialize m and u randomly - var ms = Array.fromFunction(_ => factory1D.random(F))(M) - var us = Array.fromFunction(_ => factory1D.random(F))(U) - - // Iteratively update movies then users - val Rc = spark.broadcast(R) - var msc = spark.broadcast(ms) - var usc = spark.broadcast(us) - for (iter <- 1 to ITERATIONS) { - println("Iteration " + iter + ":") - ms = spark.parallelize(0 until M, slices) - .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value)) - .toArray - msc = spark.broadcast(ms) // Re-broadcast ms because it was updated - us = spark.parallelize(0 until U, slices) - .map(i => updateUser(i, usc.value(i), msc.value, Rc.value)) - .toArray - usc = spark.broadcast(us) // Re-broadcast us because it was updated - println("RMSE = " + rmse(R, ms, us)) - println() - } - } -} diff --git a/examples/src/main/scala/SparkHdfsLR.scala b/examples/src/main/scala/SparkHdfsLR.scala deleted file mode 100644 index f14d48b17c..0000000000 --- a/examples/src/main/scala/SparkHdfsLR.scala +++ /dev/null @@ -1,51 +0,0 @@ -import java.util.Random -import scala.math.exp -import Vector._ -import spark._ - -object SparkHdfsLR { - val D = 10 // Numer of dimensions - val rand = new Random(42) - - case class DataPoint(x: Vector, y: Double) - - def parsePoint(line: String): DataPoint = { - //val nums = line.split(' ').map(_.toDouble) - //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) - val tok = new java.util.StringTokenizer(line, " ") - var y = tok.nextToken.toDouble - var x = new Array[Double](D) - var i = 0 - while (i < D) { - x(i) = tok.nextToken.toDouble; i += 1 - } - return DataPoint(new Vector(x), y) - } - - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: SparkHdfsLR ") - System.exit(1) - } - val sc = new SparkContext(args(0), "SparkHdfsLR") - val lines = sc.textFile(args(1)) - val points = lines.map(parsePoint _).cache() - val ITERATIONS = args(2).toInt - - // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) - println("Initial w: " + w) - - for (i <- 1 to ITERATIONS) { - println("On iteration " + i) - val gradient = sc.accumulator(Vector.zeros(D)) - for (p <- points) { - val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x - } - w -= gradient.value - } - - println("Final w: " + w) - } -} diff --git a/examples/src/main/scala/SparkLR.scala b/examples/src/main/scala/SparkLR.scala deleted file mode 100644 index 71f9aea624..0000000000 --- a/examples/src/main/scala/SparkLR.scala +++ /dev/null @@ -1,49 +0,0 @@ -import java.util.Random -import scala.math.exp -import Vector._ -import spark._ - -object SparkLR { - val N = 10000 // Number of data points - val D = 10 // Numer of dimensions - val R = 0.7 // Scaling factor - val ITERATIONS = 5 - val rand = new Random(42) - - case class DataPoint(x: Vector, y: Double) - - def generateData = { - def generatePoint(i: Int) = { - val y = if(i % 2 == 0) -1 else 1 - val x = Vector(D, _ => rand.nextGaussian + y * R) - DataPoint(x, y) - } - Array.fromFunction(generatePoint _)(N) - } - - def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkLR []") - System.exit(1) - } - val sc = new SparkContext(args(0), "SparkLR") - val numSlices = if (args.length > 1) args(1).toInt else 2 - val data = generateData - - // Initialize w to a random value - var w = Vector(D, _ => 2 * rand.nextDouble - 1) - println("Initial w: " + w) - - for (i <- 1 to ITERATIONS) { - println("On iteration " + i) - val gradient = sc.accumulator(Vector.zeros(D)) - for (p <- sc.parallelize(data, numSlices)) { - val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y - gradient += scale * p.x - } - w -= gradient.value - } - - println("Final w: " + w) - } -} diff --git a/examples/src/main/scala/SparkPi.scala b/examples/src/main/scala/SparkPi.scala deleted file mode 100644 index f055614125..0000000000 --- a/examples/src/main/scala/SparkPi.scala +++ /dev/null @@ -1,21 +0,0 @@ -import scala.math.random -import spark._ -import SparkContext._ - -object SparkPi { - def main(args: Array[String]) { - if (args.length == 0) { - System.err.println("Usage: SparkPi []") - System.exit(1) - } - val spark = new SparkContext(args(0), "SparkPi") - val slices = if (args.length > 1) args(1).toInt else 2 - var count = spark.accumulator(0) - for (i <- spark.parallelize(1 to 100000, slices)) { - val x = random * 2 - 1 - val y = random * 2 - 1 - if (x*x + y*y < 1) count += 1 - } - println("Pi is roughly " + 4 * count.value / 100000.0) - } -} diff --git a/examples/src/main/scala/Vector.scala b/examples/src/main/scala/Vector.scala deleted file mode 100644 index e9fbdca752..0000000000 --- a/examples/src/main/scala/Vector.scala +++ /dev/null @@ -1,63 +0,0 @@ -@serializable class Vector(val elements: Array[Double]) { - def length = elements.length - - def apply(index: Int) = elements(index) - - def + (other: Vector): Vector = { - if (length != other.length) - throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) + other(i)) - } - - def - (other: Vector): Vector = { - if (length != other.length) - throw new IllegalArgumentException("Vectors of different length") - return Vector(length, i => this(i) - other(i)) - } - - def dot(other: Vector): Double = { - if (length != other.length) - throw new IllegalArgumentException("Vectors of different length") - var ans = 0.0 - for (i <- 0 until length) - ans += this(i) * other(i) - return ans - } - - def * ( scale: Double): Vector = Vector(length, i => this(i) * scale) - - def unary_- = this * -1 - - def sum = elements.reduceLeft(_ + _) - - override def toString = elements.mkString("(", ", ", ")") - -} - -object Vector { - def apply(elements: Array[Double]) = new Vector(elements) - - def apply(elements: Double*) = new Vector(elements.toArray) - - def apply(length: Int, initializer: Int => Double): Vector = { - val elements = new Array[Double](length) - for (i <- 0 until length) - elements(i) = initializer(i) - return new Vector(elements) - } - - def zeros(length: Int) = new Vector(new Array[Double](length)) - - def ones(length: Int) = Vector(length, _ => 1) - - class Multiplier(num: Double) { - def * (vec: Vector) = vec * num - } - - implicit def doubleToMultiplier(num: Double) = new Multiplier(num) - - implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { - def addInPlace(t1: Vector, t2: Vector) = t1 + t2 - def zero(initialValue: Vector) = Vector.zeros(initialValue.length) - } -} diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala new file mode 100644 index 0000000000..2506de5ae5 --- /dev/null +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -0,0 +1,30 @@ +package spark.examples + +import spark.SparkContext + +object BroadcastTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest []") + System.exit(1) + } + val spark = new SparkContext(args(0), "Broadcast Test") + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i + +// var arr2 = new Array[Int](num * 2) +// for (i <- 0 until arr2.length) +// arr2(i) = i + + val barr1 = spark.broadcast(arr1) +// val barr2 = spark.broadcast(arr2) + spark.parallelize(1 to 10, slices).foreach { +// i => println(barr1.value.size + barr2.value.size) + i => println(barr1.value.size) + } + } +} diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala new file mode 100644 index 0000000000..94b3709850 --- /dev/null +++ b/examples/src/main/scala/spark/examples/CpuHog.scala @@ -0,0 +1,26 @@ +package spark.examples + +import spark._ + +object CpuHog { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: CpuHog "); + System.exit(1) + } + val sc = new SparkContext(args(0), "CPU hog") + val tasks = args(1).toInt + val threads = args(2).toInt + def task { + for (i <- 0 until threads-1) { + new Thread() { + override def run { + while(true) {} + } + }.start() + } + while(true) {} + } + sc.runTasks(Array.make(tasks, () => task)) + } +} diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala new file mode 100644 index 0000000000..072b4ce417 --- /dev/null +++ b/examples/src/main/scala/spark/examples/HdfsTest.scala @@ -0,0 +1,18 @@ +package spark.examples + +import spark._ + +object HdfsTest { + def main(args: Array[String]) { + val sc = new SparkContext(args(0), "HdfsTest") + val file = sc.textFile(args(1)) + val mapped = file.map(s => s.length).cache() + for (iter <- 1 to 10) { + val start = System.currentTimeMillis() + for (x <- mapped) { x + 2 } + // println("Processing: " + x) + val end = System.currentTimeMillis() + println("Iteration " + iter + " took " + (end-start) + " ms") + } + } +} diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala new file mode 100644 index 0000000000..10360dab3d --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -0,0 +1,121 @@ +package spark.examples + +import java.util.Random +import scala.math.sqrt +import cern.jet.math._ +import cern.colt.matrix._ +import cern.colt.matrix.linalg._ + +object LocalALS { + // Parameters set through command line arguments + var M = 0 // Number of movies + var U = 0 // Number of users + var F = 0 // Number of features + var ITERATIONS = 0 + + val LAMBDA = 0.01 // Regularization coefficient + + // Some COLT objects + val factory2D = DoubleFactory2D.dense + val factory1D = DoubleFactory1D.dense + val algebra = Algebra.DEFAULT + val blas = SeqBlas.seqBlas + + def generateR(): DoubleMatrix2D = { + val mh = factory2D.random(M, F) + val uh = factory2D.random(U, F) + return algebra.mult(mh, algebra.transpose(uh)) + } + + def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], + us: Array[DoubleMatrix1D]): Double = + { + val r = factory2D.make(M, U) + for (i <- 0 until M; j <- 0 until U) { + r.set(i, j, blas.ddot(ms(i), us(j))) + } + //println("R: " + r) + blas.daxpy(-1, targetR, r) + val sumSqs = r.aggregate(Functions.plus, Functions.square) + return sqrt(sumSqs / (M * U)) + } + + def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each user that rated the movie + for (j <- 0 until U) { + val u = us(j) + // Add u * u^t to XtX + blas.dger(1, u, u, XtX) + // Add u * rating to Xty + blas.daxpy(R.get(i, j), u, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each movie that the user rated + for (i <- 0 until M) { + val m = ms(i) + // Add m * m^t to XtX + blas.dger(1, m, m, XtX) + // Add m * rating to Xty + blas.daxpy(R.get(i, j), m, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def main(args: Array[String]) { + args match { + case Array(m, u, f, iters) => { + M = m.toInt + U = u.toInt + F = f.toInt + ITERATIONS = iters.toInt + } + case _ => { + System.err.println("Usage: LocalALS ") + System.exit(1) + } + } + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + + val R = generateR() + + // Initialize m and u randomly + var ms = Array.fromFunction(_ => factory1D.random(F))(M) + var us = Array.fromFunction(_ => factory1D.random(F))(U) + + // Iteratively update movies then users + for (iter <- 1 to ITERATIONS) { + println("Iteration " + iter + ":") + ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray + us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray + println("RMSE = " + rmse(R, ms, us)) + println() + } + } +} diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala new file mode 100644 index 0000000000..cc14aa7090 --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala @@ -0,0 +1,38 @@ +package spark.examples + +import java.util.Random +import Vector._ + +object LocalFileLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + val nums = line.split(' ').map(_.toDouble) + return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + } + + def main(args: Array[String]) { + val lines = scala.io.Source.fromFile(args(0)).getLines() + val points = lines.map(parsePoint _) + val ITERATIONS = args(1).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + var gradient = Vector.zeros(D) + for (p <- points) { + val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala new file mode 100644 index 0000000000..3fd3f88fa8 --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -0,0 +1,43 @@ +package spark.examples + +import java.util.Random +import Vector._ + +object LocalLR { + val N = 10000 // Number of data points + val D = 10 // Numer of dimensions + val R = 0.7 // Scaling factor + val ITERATIONS = 5 + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def generateData = { + def generatePoint(i: Int) = { + val y = if(i % 2 == 0) -1 else 1 + val x = Vector(D, _ => rand.nextGaussian + y * R) + DataPoint(x, y) + } + Array.fromFunction(generatePoint _)(N) + } + + def main(args: Array[String]) { + val data = generateData + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + var gradient = Vector.zeros(D) + for (p <- data) { + val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/LocalPi.scala b/examples/src/main/scala/spark/examples/LocalPi.scala new file mode 100644 index 0000000000..9457472f2d --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalPi.scala @@ -0,0 +1,17 @@ +package spark.examples + +import scala.math.random +import spark._ +import SparkContext._ + +object LocalPi { + def main(args: Array[String]) { + var count = 0 + for (i <- 1 to 100000) { + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) count += 1 + } + println("Pi is roughly " + 4 * count / 100000.0) + } +} diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala new file mode 100644 index 0000000000..02673a5f88 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SleepJob.scala @@ -0,0 +1,21 @@ +package spark.examples + +import spark._ + +object SleepJob { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: SleepJob "); + System.exit(1) + } + val sc = new SparkContext(args(0), "Sleep job") + val tasks = args(1).toInt + val duration = args(2).toInt + def task { + val start = System.currentTimeMillis + while (System.currentTimeMillis - start < duration * 1000L) + Thread.sleep(200) + } + sc.runTasks(Array.make(tasks, () => task)) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala new file mode 100644 index 0000000000..08e0420371 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -0,0 +1,141 @@ +package spark.examples + +import java.io.Serializable +import java.util.Random +import scala.math.sqrt +import cern.jet.math._ +import cern.colt.matrix._ +import cern.colt.matrix.linalg._ +import spark._ + +object SparkALS { + // Parameters set through command line arguments + var M = 0 // Number of movies + var U = 0 // Number of users + var F = 0 // Number of features + var ITERATIONS = 0 + + val LAMBDA = 0.01 // Regularization coefficient + + // Some COLT objects + val factory2D = DoubleFactory2D.dense + val factory1D = DoubleFactory1D.dense + val algebra = Algebra.DEFAULT + val blas = SeqBlas.seqBlas + + def generateR(): DoubleMatrix2D = { + val mh = factory2D.random(M, F) + val uh = factory2D.random(U, F) + return algebra.mult(mh, algebra.transpose(uh)) + } + + def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], + us: Array[DoubleMatrix1D]): Double = + { + val r = factory2D.make(M, U) + for (i <- 0 until M; j <- 0 until U) { + r.set(i, j, blas.ddot(ms(i), us(j))) + } + //println("R: " + r) + blas.daxpy(-1, targetR, r) + val sumSqs = r.aggregate(Functions.plus, Functions.square) + return sqrt(sumSqs / (M * U)) + } + + def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val U = us.size + val F = us(0).size + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each user that rated the movie + for (j <- 0 until U) { + val u = us(j) + // Add u * u^t to XtX + blas.dger(1, u, u, XtX) + // Add u * rating to Xty + blas.daxpy(R.get(i, j), u, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val M = ms.size + val F = ms(0).size + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each movie that the user rated + for (i <- 0 until M) { + val m = ms(i) + // Add m * m^t to XtX + blas.dger(1, m, m, XtX) + // Add m * rating to Xty + blas.daxpy(R.get(i, j), m, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def main(args: Array[String]) { + var host = "" + var slices = 0 + args match { + case Array(m, u, f, iters, slices_, host_) => { + M = m.toInt + U = u.toInt + F = f.toInt + ITERATIONS = iters.toInt + slices = slices_.toInt + host = host_ + } + case _ => { + System.err.println("Usage: SparkALS ") + System.exit(1) + } + } + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + val spark = new SparkContext(host, "SparkALS") + + val R = generateR() + + // Initialize m and u randomly + var ms = Array.fromFunction(_ => factory1D.random(F))(M) + var us = Array.fromFunction(_ => factory1D.random(F))(U) + + // Iteratively update movies then users + val Rc = spark.broadcast(R) + var msc = spark.broadcast(ms) + var usc = spark.broadcast(us) + for (iter <- 1 to ITERATIONS) { + println("Iteration " + iter + ":") + ms = spark.parallelize(0 until M, slices) + .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value)) + .toArray + msc = spark.broadcast(ms) // Re-broadcast ms because it was updated + us = spark.parallelize(0 until U, slices) + .map(i => updateUser(i, usc.value(i), msc.value, Rc.value)) + .toArray + usc = spark.broadcast(us) // Re-broadcast us because it was updated + println("RMSE = " + rmse(R, ms, us)) + println() + } + } +} diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala new file mode 100644 index 0000000000..4c71fd0845 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -0,0 +1,53 @@ +package spark.examples + +import java.util.Random +import scala.math.exp +import Vector._ +import spark._ + +object SparkHdfsLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + //val nums = line.split(' ').map(_.toDouble) + //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + val tok = new java.util.StringTokenizer(line, " ") + var y = tok.nextToken.toDouble + var x = new Array[Double](D) + var i = 0 + while (i < D) { + x(i) = tok.nextToken.toDouble; i += 1 + } + return DataPoint(new Vector(x), y) + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SparkHdfsLR ") + System.exit(1) + } + val sc = new SparkContext(args(0), "SparkHdfsLR") + val lines = sc.textFile(args(1)) + val points = lines.map(parsePoint _).cache() + val ITERATIONS = args(2).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = sc.accumulator(Vector.zeros(D)) + for (p <- points) { + val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient.value + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala new file mode 100644 index 0000000000..d08f5d3f01 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -0,0 +1,51 @@ +package spark.examples + +import java.util.Random +import scala.math.exp +import Vector._ +import spark._ + +object SparkLR { + val N = 10000 // Number of data points + val D = 10 // Numer of dimensions + val R = 0.7 // Scaling factor + val ITERATIONS = 5 + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def generateData = { + def generatePoint(i: Int) = { + val y = if(i % 2 == 0) -1 else 1 + val x = Vector(D, _ => rand.nextGaussian + y * R) + DataPoint(x, y) + } + Array.fromFunction(generatePoint _)(N) + } + + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkLR []") + System.exit(1) + } + val sc = new SparkContext(args(0), "SparkLR") + val numSlices = if (args.length > 1) args(1).toInt else 2 + val data = generateData + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = sc.accumulator(Vector.zeros(D)) + for (p <- sc.parallelize(data, numSlices)) { + val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient.value + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala new file mode 100644 index 0000000000..31c6c5b9b1 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkPi.scala @@ -0,0 +1,23 @@ +package spark.examples + +import scala.math.random +import spark._ +import SparkContext._ + +object SparkPi { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkPi []") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkPi") + val slices = if (args.length > 1) args(1).toInt else 2 + var count = spark.accumulator(0) + for (i <- spark.parallelize(1 to 100000, slices)) { + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) count += 1 + } + println("Pi is roughly " + 4 * count.value / 100000.0) + } +} diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala new file mode 100644 index 0000000000..ea70626e71 --- /dev/null +++ b/examples/src/main/scala/spark/examples/Vector.scala @@ -0,0 +1,65 @@ +package spark.examples + +@serializable class Vector(val elements: Array[Double]) { + def length = elements.length + + def apply(index: Int) = elements(index) + + def + (other: Vector): Vector = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + return Vector(length, i => this(i) + other(i)) + } + + def - (other: Vector): Vector = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + return Vector(length, i => this(i) - other(i)) + } + + def dot(other: Vector): Double = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + var ans = 0.0 + for (i <- 0 until length) + ans += this(i) * other(i) + return ans + } + + def * ( scale: Double): Vector = Vector(length, i => this(i) * scale) + + def unary_- = this * -1 + + def sum = elements.reduceLeft(_ + _) + + override def toString = elements.mkString("(", ", ", ")") + +} + +object Vector { + def apply(elements: Array[Double]) = new Vector(elements) + + def apply(elements: Double*) = new Vector(elements.toArray) + + def apply(length: Int, initializer: Int => Double): Vector = { + val elements = new Array[Double](length) + for (i <- 0 until length) + elements(i) = initializer(i) + return new Vector(elements) + } + + def zeros(length: Int) = new Vector(new Array[Double](length)) + + def ones(length: Int) = Vector(length, _ => 1) + + class Multiplier(num: Double) { + def * (vec: Vector) = vec * num + } + + implicit def doubleToMultiplier(num: Double) = new Multiplier(num) + + implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { + def addInPlace(t1: Vector, t2: Vector) = t1 + t2 + def zero(initialValue: Vector) = Vector.zeros(initialValue.length) + } +} -- cgit v1.2.3