diff options
Diffstat (limited to 'examples')
16 files changed, 776 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala new file mode 100644 index 0000000000..2506de5ae5 --- /dev/null +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -0,0 +1,30 @@ +package spark.examples + +import spark.SparkContext + +object BroadcastTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest <host> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "Broadcast Test") + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr1 = new Array[Int](num) + for (i <- 0 until arr1.length) + arr1(i) = i + +// var arr2 = new Array[Int](num * 2) +// for (i <- 0 until arr2.length) +// arr2(i) = i + + val barr1 = spark.broadcast(arr1) +// val barr2 = spark.broadcast(arr2) + spark.parallelize(1 to 10, slices).foreach { +// i => println(barr1.value.size + barr2.value.size) + i => println(barr1.value.size) + } + } +} diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala new file mode 100644 index 0000000000..94b3709850 --- /dev/null +++ b/examples/src/main/scala/spark/examples/CpuHog.scala @@ -0,0 +1,26 @@ +package spark.examples + +import spark._ + +object CpuHog { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: CpuHog <master> <tasks> <threads_per_task>"); + System.exit(1) + } + val sc = new SparkContext(args(0), "CPU hog") + val tasks = args(1).toInt + val threads = args(2).toInt + def task { + for (i <- 0 until threads-1) { + new Thread() { + override def run { + while(true) {} + } + }.start() + } + while(true) {} + } + sc.runTasks(Array.make(tasks, () => task)) + } +} diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala new file mode 100644 index 0000000000..48c02a52c6 --- /dev/null +++ b/examples/src/main/scala/spark/examples/GroupByTest.scala @@ -0,0 +1,37 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object GroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println(pairs1.groupByKey(numReducers).count) + } +} + diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala new file mode 100644 index 0000000000..072b4ce417 --- /dev/null +++ b/examples/src/main/scala/spark/examples/HdfsTest.scala @@ -0,0 +1,18 @@ +package spark.examples + +import spark._ + +object HdfsTest { + def main(args: Array[String]) { + val sc = new SparkContext(args(0), "HdfsTest") + val file = sc.textFile(args(1)) + val mapped = file.map(s => s.length).cache() + for (iter <- 1 to 10) { + val start = System.currentTimeMillis() + for (x <- mapped) { x + 2 } + // println("Processing: " + x) + val end = System.currentTimeMillis() + println("Iteration " + iter + " took " + (end-start) + " ms") + } + } +} diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala new file mode 100644 index 0000000000..10360dab3d --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -0,0 +1,121 @@ +package spark.examples + +import java.util.Random +import scala.math.sqrt +import cern.jet.math._ +import cern.colt.matrix._ +import cern.colt.matrix.linalg._ + +object LocalALS { + // Parameters set through command line arguments + var M = 0 // Number of movies + var U = 0 // Number of users + var F = 0 // Number of features + var ITERATIONS = 0 + + val LAMBDA = 0.01 // Regularization coefficient + + // Some COLT objects + val factory2D = DoubleFactory2D.dense + val factory1D = DoubleFactory1D.dense + val algebra = Algebra.DEFAULT + val blas = SeqBlas.seqBlas + + def generateR(): DoubleMatrix2D = { + val mh = factory2D.random(M, F) + val uh = factory2D.random(U, F) + return algebra.mult(mh, algebra.transpose(uh)) + } + + def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], + us: Array[DoubleMatrix1D]): Double = + { + val r = factory2D.make(M, U) + for (i <- 0 until M; j <- 0 until U) { + r.set(i, j, blas.ddot(ms(i), us(j))) + } + //println("R: " + r) + blas.daxpy(-1, targetR, r) + val sumSqs = r.aggregate(Functions.plus, Functions.square) + return sqrt(sumSqs / (M * U)) + } + + def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each user that rated the movie + for (j <- 0 until U) { + val u = us(j) + // Add u * u^t to XtX + blas.dger(1, u, u, XtX) + // Add u * rating to Xty + blas.daxpy(R.get(i, j), u, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each movie that the user rated + for (i <- 0 until M) { + val m = ms(i) + // Add m * m^t to XtX + blas.dger(1, m, m, XtX) + // Add m * rating to Xty + blas.daxpy(R.get(i, j), m, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def main(args: Array[String]) { + args match { + case Array(m, u, f, iters) => { + M = m.toInt + U = u.toInt + F = f.toInt + ITERATIONS = iters.toInt + } + case _ => { + System.err.println("Usage: LocalALS <M> <U> <F> <iters>") + System.exit(1) + } + } + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + + val R = generateR() + + // Initialize m and u randomly + var ms = Array.fromFunction(_ => factory1D.random(F))(M) + var us = Array.fromFunction(_ => factory1D.random(F))(U) + + // Iteratively update movies then users + for (iter <- 1 to ITERATIONS) { + println("Iteration " + iter + ":") + ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray + us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray + println("RMSE = " + rmse(R, ms, us)) + println() + } + } +} diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala new file mode 100644 index 0000000000..cc14aa7090 --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala @@ -0,0 +1,38 @@ +package spark.examples + +import java.util.Random +import Vector._ + +object LocalFileLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + val nums = line.split(' ').map(_.toDouble) + return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + } + + def main(args: Array[String]) { + val lines = scala.io.Source.fromFile(args(0)).getLines() + val points = lines.map(parsePoint _) + val ITERATIONS = args(1).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + var gradient = Vector.zeros(D) + for (p <- points) { + val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala new file mode 100644 index 0000000000..3fd3f88fa8 --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -0,0 +1,43 @@ +package spark.examples + +import java.util.Random +import Vector._ + +object LocalLR { + val N = 10000 // Number of data points + val D = 10 // Numer of dimensions + val R = 0.7 // Scaling factor + val ITERATIONS = 5 + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def generateData = { + def generatePoint(i: Int) = { + val y = if(i % 2 == 0) -1 else 1 + val x = Vector(D, _ => rand.nextGaussian + y * R) + DataPoint(x, y) + } + Array.fromFunction(generatePoint _)(N) + } + + def main(args: Array[String]) { + val data = generateData + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + var gradient = Vector.zeros(D) + for (p <- data) { + val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/LocalPi.scala b/examples/src/main/scala/spark/examples/LocalPi.scala new file mode 100644 index 0000000000..9457472f2d --- /dev/null +++ b/examples/src/main/scala/spark/examples/LocalPi.scala @@ -0,0 +1,17 @@ +package spark.examples + +import scala.math.random +import spark._ +import SparkContext._ + +object LocalPi { + def main(args: Array[String]) { + var count = 0 + for (i <- 1 to 100000) { + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) count += 1 + } + println("Pi is roughly " + 4 * count / 100000.0) + } +} diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala new file mode 100644 index 0000000000..c8edb7d8b4 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -0,0 +1,51 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object SimpleSkewedGroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SimpleSkewedGroupByTest <host> " + + "[numMappers] [numKVPairs] [valSize] [numReducers] [ratio]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + var ratio = if (args.length > 5) args(5).toInt else 5.0 + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + var result = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + val offset = ranGen.nextInt(1000) * numReducers + if (ranGen.nextDouble < ratio / (numReducers + ratio - 1)) { + // give ratio times higher chance of generating key 0 (for reducer 0) + result(i) = (offset, byteArr) + } else { + // generate a key for one of the other reducers + val key = 1 + ranGen.nextInt(numReducers-1) + offset + result(i) = (key, byteArr) + } + } + result + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println("RESULT: " + pairs1.groupByKey(numReducers).count) + // Print how many keys each reducer got (for debugging) + //println("RESULT: " + pairs1.groupByKey(numReducers) + // .map{case (k,v) => (k, v.size)} + // .collectAsMap) + } +} + diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala new file mode 100644 index 0000000000..e6dec44bed --- /dev/null +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -0,0 +1,41 @@ +package spark.examples + +import spark.SparkContext +import spark.SparkContext._ +import java.util.Random + +object SkewedGroupByTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: GroupByTest <host> [numMappers] [numKVPairs] [KeySize] [numReducers]") + System.exit(1) + } + + var numMappers = if (args.length > 1) args(1).toInt else 2 + var numKVPairs = if (args.length > 2) args(2).toInt else 1000 + var valSize = if (args.length > 3) args(3).toInt else 1000 + var numReducers = if (args.length > 4) args(4).toInt else numMappers + + val sc = new SparkContext(args(0), "GroupBy Test") + + val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => + val ranGen = new Random + + // map output sizes lineraly increase from the 1st to the last + numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt + + var arr1 = new Array[(Int, Array[Byte])](numKVPairs) + for (i <- 0 until numKVPairs) { + val byteArr = new Array[Byte](valSize) + ranGen.nextBytes(byteArr) + arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) + } + arr1 + }.cache + // Enforce that everything has been calculated and in cache + pairs1.count + + println(pairs1.groupByKey(numReducers).count) + } +} + diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala new file mode 100644 index 0000000000..02673a5f88 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SleepJob.scala @@ -0,0 +1,21 @@ +package spark.examples + +import spark._ + +object SleepJob { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: SleepJob <master> <tasks> <task_duration>"); + System.exit(1) + } + val sc = new SparkContext(args(0), "Sleep job") + val tasks = args(1).toInt + val duration = args(2).toInt + def task { + val start = System.currentTimeMillis + while (System.currentTimeMillis - start < duration * 1000L) + Thread.sleep(200) + } + sc.runTasks(Array.make(tasks, () => task)) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala new file mode 100644 index 0000000000..08e0420371 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -0,0 +1,141 @@ +package spark.examples + +import java.io.Serializable +import java.util.Random +import scala.math.sqrt +import cern.jet.math._ +import cern.colt.matrix._ +import cern.colt.matrix.linalg._ +import spark._ + +object SparkALS { + // Parameters set through command line arguments + var M = 0 // Number of movies + var U = 0 // Number of users + var F = 0 // Number of features + var ITERATIONS = 0 + + val LAMBDA = 0.01 // Regularization coefficient + + // Some COLT objects + val factory2D = DoubleFactory2D.dense + val factory1D = DoubleFactory1D.dense + val algebra = Algebra.DEFAULT + val blas = SeqBlas.seqBlas + + def generateR(): DoubleMatrix2D = { + val mh = factory2D.random(M, F) + val uh = factory2D.random(U, F) + return algebra.mult(mh, algebra.transpose(uh)) + } + + def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D], + us: Array[DoubleMatrix1D]): Double = + { + val r = factory2D.make(M, U) + for (i <- 0 until M; j <- 0 until U) { + r.set(i, j, blas.ddot(ms(i), us(j))) + } + //println("R: " + r) + blas.daxpy(-1, targetR, r) + val sumSqs = r.aggregate(Functions.plus, Functions.square) + return sqrt(sumSqs / (M * U)) + } + + def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val U = us.size + val F = us(0).size + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each user that rated the movie + for (j <- 0 until U) { + val u = us(j) + // Add u * u^t to XtX + blas.dger(1, u, u, XtX) + // Add u * rating to Xty + blas.daxpy(R.get(i, j), u, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * U) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D], + R: DoubleMatrix2D) : DoubleMatrix1D = + { + val M = ms.size + val F = ms(0).size + val XtX = factory2D.make(F, F) + val Xty = factory1D.make(F) + // For each movie that the user rated + for (i <- 0 until M) { + val m = ms(i) + // Add m * m^t to XtX + blas.dger(1, m, m, XtX) + // Add m * rating to Xty + blas.daxpy(R.get(i, j), m, Xty) + } + // Add regularization coefs to diagonal terms + for (d <- 0 until F) { + XtX.set(d, d, XtX.get(d, d) + LAMBDA * M) + } + // Solve it with Cholesky + val ch = new CholeskyDecomposition(XtX) + val Xty2D = factory2D.make(Xty.toArray, F) + val solved2D = ch.solve(Xty2D) + return solved2D.viewColumn(0) + } + + def main(args: Array[String]) { + var host = "" + var slices = 0 + args match { + case Array(m, u, f, iters, slices_, host_) => { + M = m.toInt + U = u.toInt + F = f.toInt + ITERATIONS = iters.toInt + slices = slices_.toInt + host = host_ + } + case _ => { + System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <host>") + System.exit(1) + } + } + printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS); + val spark = new SparkContext(host, "SparkALS") + + val R = generateR() + + // Initialize m and u randomly + var ms = Array.fromFunction(_ => factory1D.random(F))(M) + var us = Array.fromFunction(_ => factory1D.random(F))(U) + + // Iteratively update movies then users + val Rc = spark.broadcast(R) + var msc = spark.broadcast(ms) + var usc = spark.broadcast(us) + for (iter <- 1 to ITERATIONS) { + println("Iteration " + iter + ":") + ms = spark.parallelize(0 until M, slices) + .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value)) + .toArray + msc = spark.broadcast(ms) // Re-broadcast ms because it was updated + us = spark.parallelize(0 until U, slices) + .map(i => updateUser(i, usc.value(i), msc.value, Rc.value)) + .toArray + usc = spark.broadcast(us) // Re-broadcast us because it was updated + println("RMSE = " + rmse(R, ms, us)) + println() + } + } +} diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala new file mode 100644 index 0000000000..4c71fd0845 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -0,0 +1,53 @@ +package spark.examples + +import java.util.Random +import scala.math.exp +import Vector._ +import spark._ + +object SparkHdfsLR { + val D = 10 // Numer of dimensions + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def parsePoint(line: String): DataPoint = { + //val nums = line.split(' ').map(_.toDouble) + //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0)) + val tok = new java.util.StringTokenizer(line, " ") + var y = tok.nextToken.toDouble + var x = new Array[Double](D) + var i = 0 + while (i < D) { + x(i) = tok.nextToken.toDouble; i += 1 + } + return DataPoint(new Vector(x), y) + } + + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: SparkHdfsLR <master> <file> <iters>") + System.exit(1) + } + val sc = new SparkContext(args(0), "SparkHdfsLR") + val lines = sc.textFile(args(1)) + val points = lines.map(parsePoint _).cache() + val ITERATIONS = args(2).toInt + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = sc.accumulator(Vector.zeros(D)) + for (p <- points) { + val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient.value + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala new file mode 100644 index 0000000000..d08f5d3f01 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -0,0 +1,51 @@ +package spark.examples + +import java.util.Random +import scala.math.exp +import Vector._ +import spark._ + +object SparkLR { + val N = 10000 // Number of data points + val D = 10 // Numer of dimensions + val R = 0.7 // Scaling factor + val ITERATIONS = 5 + val rand = new Random(42) + + case class DataPoint(x: Vector, y: Double) + + def generateData = { + def generatePoint(i: Int) = { + val y = if(i % 2 == 0) -1 else 1 + val x = Vector(D, _ => rand.nextGaussian + y * R) + DataPoint(x, y) + } + Array.fromFunction(generatePoint _)(N) + } + + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkLR <host> [<slices>]") + System.exit(1) + } + val sc = new SparkContext(args(0), "SparkLR") + val numSlices = if (args.length > 1) args(1).toInt else 2 + val data = generateData + + // Initialize w to a random value + var w = Vector(D, _ => 2 * rand.nextDouble - 1) + println("Initial w: " + w) + + for (i <- 1 to ITERATIONS) { + println("On iteration " + i) + val gradient = sc.accumulator(Vector.zeros(D)) + for (p <- sc.parallelize(data, numSlices)) { + val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y + gradient += scale * p.x + } + w -= gradient.value + } + + println("Final w: " + w) + } +} diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala new file mode 100644 index 0000000000..31c6c5b9b1 --- /dev/null +++ b/examples/src/main/scala/spark/examples/SparkPi.scala @@ -0,0 +1,23 @@ +package spark.examples + +import scala.math.random +import spark._ +import SparkContext._ + +object SparkPi { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkPi <host> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "SparkPi") + val slices = if (args.length > 1) args(1).toInt else 2 + var count = spark.accumulator(0) + for (i <- spark.parallelize(1 to 100000, slices)) { + val x = random * 2 - 1 + val y = random * 2 - 1 + if (x*x + y*y < 1) count += 1 + } + println("Pi is roughly " + 4 * count.value / 100000.0) + } +} diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala new file mode 100644 index 0000000000..ea70626e71 --- /dev/null +++ b/examples/src/main/scala/spark/examples/Vector.scala @@ -0,0 +1,65 @@ +package spark.examples + +@serializable class Vector(val elements: Array[Double]) { + def length = elements.length + + def apply(index: Int) = elements(index) + + def + (other: Vector): Vector = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + return Vector(length, i => this(i) + other(i)) + } + + def - (other: Vector): Vector = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + return Vector(length, i => this(i) - other(i)) + } + + def dot(other: Vector): Double = { + if (length != other.length) + throw new IllegalArgumentException("Vectors of different length") + var ans = 0.0 + for (i <- 0 until length) + ans += this(i) * other(i) + return ans + } + + def * ( scale: Double): Vector = Vector(length, i => this(i) * scale) + + def unary_- = this * -1 + + def sum = elements.reduceLeft(_ + _) + + override def toString = elements.mkString("(", ", ", ")") + +} + +object Vector { + def apply(elements: Array[Double]) = new Vector(elements) + + def apply(elements: Double*) = new Vector(elements.toArray) + + def apply(length: Int, initializer: Int => Double): Vector = { + val elements = new Array[Double](length) + for (i <- 0 until length) + elements(i) = initializer(i) + return new Vector(elements) + } + + def zeros(length: Int) = new Vector(new Array[Double](length)) + + def ones(length: Int) = Vector(length, _ => 1) + + class Multiplier(num: Double) { + def * (vec: Vector) = vec * num + } + + implicit def doubleToMultiplier(num: Double) = new Multiplier(num) + + implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] { + def addInPlace(t1: Vector, t2: Vector) = t1 + t2 + def zero(initialValue: Vector) = Vector.zeros(initialValue.length) + } +} |