aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMosharaf Chowdhury <mosharaf@mosharaf-mbp.(none)>2011-02-09 10:40:23 -0800
committerMosharaf Chowdhury <mosharaf@mosharaf-mbp.(none)>2011-02-09 10:40:23 -0800
commit495b38658e99101240a9b028fd7c2fb8bf1034a4 (patch)
tree89744230fa669231ace88bc5da59e83c142a9177 /examples
parenta12c0b6c00eb0c73339beba12eb638c33d472b19 (diff)
parente8df4bbd40817262eeb9dd72f7f0b9a5d81db611 (diff)
downloadspark-495b38658e99101240a9b028fd7c2fb8bf1034a4.tar.gz
spark-495b38658e99101240a9b028fd7c2fb8bf1034a4.tar.bz2
spark-495b38658e99101240a9b028fd7c2fb8bf1034a4.zip
Merge branch 'master' into mos-bt
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala25
-rw-r--r--examples/src/main/scala/spark/examples/CpuHog.scala26
-rw-r--r--examples/src/main/scala/spark/examples/HdfsTest.scala18
-rw-r--r--examples/src/main/scala/spark/examples/LocalALS.scala121
-rw-r--r--examples/src/main/scala/spark/examples/LocalFileLR.scala38
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala43
-rw-r--r--examples/src/main/scala/spark/examples/LocalPi.scala17
-rw-r--r--examples/src/main/scala/spark/examples/SleepJob.scala21
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala141
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala53
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala51
-rw-r--r--examples/src/main/scala/spark/examples/SparkPi.scala23
-rw-r--r--examples/src/main/scala/spark/examples/Vector.scala65
13 files changed, 642 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
new file mode 100644
index 0000000000..f3a173b183
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -0,0 +1,25 @@
+package spark.examples
+
+import spark.SparkContext
+
+object BroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <host> [<slices>] [numElem]")
+ System.exit(1)
+ }
+
+ val spark = new SparkContext(args(0), "Broadcast Test")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+
+ var arr1 = new Array[Int](num)
+ for (i <- 0 until arr1.length)
+ arr1(i) = i
+
+ val barr1 = spark.broadcast(arr1)
+ spark.parallelize(1 to 10, slices).foreach {
+ i => println(barr1.value.size)
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/CpuHog.scala b/examples/src/main/scala/spark/examples/CpuHog.scala
new file mode 100644
index 0000000000..94b3709850
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/CpuHog.scala
@@ -0,0 +1,26 @@
+package spark.examples
+
+import spark._
+
+object CpuHog {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: CpuHog <master> <tasks> <threads_per_task>");
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "CPU hog")
+ val tasks = args(1).toInt
+ val threads = args(2).toInt
+ def task {
+ for (i <- 0 until threads-1) {
+ new Thread() {
+ override def run {
+ while(true) {}
+ }
+ }.start()
+ }
+ while(true) {}
+ }
+ sc.runTasks(Array.make(tasks, () => task))
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala
new file mode 100644
index 0000000000..072b4ce417
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/HdfsTest.scala
@@ -0,0 +1,18 @@
+package spark.examples
+
+import spark._
+
+object HdfsTest {
+ def main(args: Array[String]) {
+ val sc = new SparkContext(args(0), "HdfsTest")
+ val file = sc.textFile(args(1))
+ val mapped = file.map(s => s.length).cache()
+ for (iter <- 1 to 10) {
+ val start = System.currentTimeMillis()
+ for (x <- mapped) { x + 2 }
+ // println("Processing: " + x)
+ val end = System.currentTimeMillis()
+ println("Iteration " + iter + " took " + (end-start) + " ms")
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala
new file mode 100644
index 0000000000..10360dab3d
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalALS.scala
@@ -0,0 +1,121 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+
+object LocalALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+
+ val LAMBDA = 0.01 // Regularization coefficient
+
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def main(args: Array[String]) {
+ args match {
+ case Array(m, u, f, iters) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ }
+ case _ => {
+ System.err.println("Usage: LocalALS <M> <U> <F> <iters>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+
+ val R = generateR()
+
+ // Initialize m and u randomly
+ var ms = Array.fromFunction(_ => factory1D.random(F))(M)
+ var us = Array.fromFunction(_ => factory1D.random(F))(U)
+
+ // Iteratively update movies then users
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = (0 until M).map(i => updateMovie(i, ms(i), us, R)).toArray
+ us = (0 until U).map(j => updateUser(j, us(j), ms, R)).toArray
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalFileLR.scala b/examples/src/main/scala/spark/examples/LocalFileLR.scala
new file mode 100644
index 0000000000..cc14aa7090
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalFileLR.scala
@@ -0,0 +1,38 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+
+object LocalFileLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ val nums = line.split(' ').map(_.toDouble)
+ return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ }
+
+ def main(args: Array[String]) {
+ val lines = scala.io.Source.fromFile(args(0)).getLines()
+ val points = lines.map(parsePoint _)
+ val ITERATIONS = args(1).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- points) {
+ val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
new file mode 100644
index 0000000000..3fd3f88fa8
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -0,0 +1,43 @@
+package spark.examples
+
+import java.util.Random
+import Vector._
+
+object LocalLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Numer of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.fromFunction(generatePoint _)(N)
+ }
+
+ def main(args: Array[String]) {
+ val data = generateData
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ var gradient = Vector.zeros(D)
+ for (p <- data) {
+ val scale = (1 / (1 + Math.exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/LocalPi.scala b/examples/src/main/scala/spark/examples/LocalPi.scala
new file mode 100644
index 0000000000..9457472f2d
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/LocalPi.scala
@@ -0,0 +1,17 @@
+package spark.examples
+
+import scala.math.random
+import spark._
+import SparkContext._
+
+object LocalPi {
+ def main(args: Array[String]) {
+ var count = 0
+ for (i <- 1 to 100000) {
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) count += 1
+ }
+ println("Pi is roughly " + 4 * count / 100000.0)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SleepJob.scala b/examples/src/main/scala/spark/examples/SleepJob.scala
new file mode 100644
index 0000000000..02673a5f88
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SleepJob.scala
@@ -0,0 +1,21 @@
+package spark.examples
+
+import spark._
+
+object SleepJob {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: SleepJob <master> <tasks> <task_duration>");
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "Sleep job")
+ val tasks = args(1).toInt
+ val duration = args(2).toInt
+ def task {
+ val start = System.currentTimeMillis
+ while (System.currentTimeMillis - start < duration * 1000L)
+ Thread.sleep(200)
+ }
+ sc.runTasks(Array.make(tasks, () => task))
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
new file mode 100644
index 0000000000..08e0420371
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -0,0 +1,141 @@
+package spark.examples
+
+import java.io.Serializable
+import java.util.Random
+import scala.math.sqrt
+import cern.jet.math._
+import cern.colt.matrix._
+import cern.colt.matrix.linalg._
+import spark._
+
+object SparkALS {
+ // Parameters set through command line arguments
+ var M = 0 // Number of movies
+ var U = 0 // Number of users
+ var F = 0 // Number of features
+ var ITERATIONS = 0
+
+ val LAMBDA = 0.01 // Regularization coefficient
+
+ // Some COLT objects
+ val factory2D = DoubleFactory2D.dense
+ val factory1D = DoubleFactory1D.dense
+ val algebra = Algebra.DEFAULT
+ val blas = SeqBlas.seqBlas
+
+ def generateR(): DoubleMatrix2D = {
+ val mh = factory2D.random(M, F)
+ val uh = factory2D.random(U, F)
+ return algebra.mult(mh, algebra.transpose(uh))
+ }
+
+ def rmse(targetR: DoubleMatrix2D, ms: Array[DoubleMatrix1D],
+ us: Array[DoubleMatrix1D]): Double =
+ {
+ val r = factory2D.make(M, U)
+ for (i <- 0 until M; j <- 0 until U) {
+ r.set(i, j, blas.ddot(ms(i), us(j)))
+ }
+ //println("R: " + r)
+ blas.daxpy(-1, targetR, r)
+ val sumSqs = r.aggregate(Functions.plus, Functions.square)
+ return sqrt(sumSqs / (M * U))
+ }
+
+ def updateMovie(i: Int, m: DoubleMatrix1D, us: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val U = us.size
+ val F = us(0).size
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each user that rated the movie
+ for (j <- 0 until U) {
+ val u = us(j)
+ // Add u * u^t to XtX
+ blas.dger(1, u, u, XtX)
+ // Add u * rating to Xty
+ blas.daxpy(R.get(i, j), u, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * U)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def updateUser(j: Int, u: DoubleMatrix1D, ms: Array[DoubleMatrix1D],
+ R: DoubleMatrix2D) : DoubleMatrix1D =
+ {
+ val M = ms.size
+ val F = ms(0).size
+ val XtX = factory2D.make(F, F)
+ val Xty = factory1D.make(F)
+ // For each movie that the user rated
+ for (i <- 0 until M) {
+ val m = ms(i)
+ // Add m * m^t to XtX
+ blas.dger(1, m, m, XtX)
+ // Add m * rating to Xty
+ blas.daxpy(R.get(i, j), m, Xty)
+ }
+ // Add regularization coefs to diagonal terms
+ for (d <- 0 until F) {
+ XtX.set(d, d, XtX.get(d, d) + LAMBDA * M)
+ }
+ // Solve it with Cholesky
+ val ch = new CholeskyDecomposition(XtX)
+ val Xty2D = factory2D.make(Xty.toArray, F)
+ val solved2D = ch.solve(Xty2D)
+ return solved2D.viewColumn(0)
+ }
+
+ def main(args: Array[String]) {
+ var host = ""
+ var slices = 0
+ args match {
+ case Array(m, u, f, iters, slices_, host_) => {
+ M = m.toInt
+ U = u.toInt
+ F = f.toInt
+ ITERATIONS = iters.toInt
+ slices = slices_.toInt
+ host = host_
+ }
+ case _ => {
+ System.err.println("Usage: SparkALS <M> <U> <F> <iters> <slices> <host>")
+ System.exit(1)
+ }
+ }
+ printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS);
+ val spark = new SparkContext(host, "SparkALS")
+
+ val R = generateR()
+
+ // Initialize m and u randomly
+ var ms = Array.fromFunction(_ => factory1D.random(F))(M)
+ var us = Array.fromFunction(_ => factory1D.random(F))(U)
+
+ // Iteratively update movies then users
+ val Rc = spark.broadcast(R)
+ var msc = spark.broadcast(ms)
+ var usc = spark.broadcast(us)
+ for (iter <- 1 to ITERATIONS) {
+ println("Iteration " + iter + ":")
+ ms = spark.parallelize(0 until M, slices)
+ .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
+ .toArray
+ msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
+ us = spark.parallelize(0 until U, slices)
+ .map(i => updateUser(i, usc.value(i), msc.value, Rc.value))
+ .toArray
+ usc = spark.broadcast(us) // Re-broadcast us because it was updated
+ println("RMSE = " + rmse(R, ms, us))
+ println()
+ }
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
new file mode 100644
index 0000000000..4c71fd0845
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -0,0 +1,53 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.exp
+import Vector._
+import spark._
+
+object SparkHdfsLR {
+ val D = 10 // Numer of dimensions
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def parsePoint(line: String): DataPoint = {
+ //val nums = line.split(' ').map(_.toDouble)
+ //return DataPoint(new Vector(nums.slice(1, D+1)), nums(0))
+ val tok = new java.util.StringTokenizer(line, " ")
+ var y = tok.nextToken.toDouble
+ var x = new Array[Double](D)
+ var i = 0
+ while (i < D) {
+ x(i) = tok.nextToken.toDouble; i += 1
+ }
+ return DataPoint(new Vector(x), y)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkHdfsLR")
+ val lines = sc.textFile(args(1))
+ val points = lines.map(parsePoint _).cache()
+ val ITERATIONS = args(2).toInt
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = sc.accumulator(Vector.zeros(D))
+ for (p <- points) {
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient.value
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
new file mode 100644
index 0000000000..d08f5d3f01
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -0,0 +1,51 @@
+package spark.examples
+
+import java.util.Random
+import scala.math.exp
+import Vector._
+import spark._
+
+object SparkLR {
+ val N = 10000 // Number of data points
+ val D = 10 // Numer of dimensions
+ val R = 0.7 // Scaling factor
+ val ITERATIONS = 5
+ val rand = new Random(42)
+
+ case class DataPoint(x: Vector, y: Double)
+
+ def generateData = {
+ def generatePoint(i: Int) = {
+ val y = if(i % 2 == 0) -1 else 1
+ val x = Vector(D, _ => rand.nextGaussian + y * R)
+ DataPoint(x, y)
+ }
+ Array.fromFunction(generatePoint _)(N)
+ }
+
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkLR <host> [<slices>]")
+ System.exit(1)
+ }
+ val sc = new SparkContext(args(0), "SparkLR")
+ val numSlices = if (args.length > 1) args(1).toInt else 2
+ val data = generateData
+
+ // Initialize w to a random value
+ var w = Vector(D, _ => 2 * rand.nextDouble - 1)
+ println("Initial w: " + w)
+
+ for (i <- 1 to ITERATIONS) {
+ println("On iteration " + i)
+ val gradient = sc.accumulator(Vector.zeros(D))
+ for (p <- sc.parallelize(data, numSlices)) {
+ val scale = (1 / (1 + exp(-p.y * (w dot p.x))) - 1) * p.y
+ gradient += scale * p.x
+ }
+ w -= gradient.value
+ }
+
+ println("Final w: " + w)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala
new file mode 100644
index 0000000000..31c6c5b9b1
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/SparkPi.scala
@@ -0,0 +1,23 @@
+package spark.examples
+
+import scala.math.random
+import spark._
+import SparkContext._
+
+object SparkPi {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkPi <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "SparkPi")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ var count = spark.accumulator(0)
+ for (i <- spark.parallelize(1 to 100000, slices)) {
+ val x = random * 2 - 1
+ val y = random * 2 - 1
+ if (x*x + y*y < 1) count += 1
+ }
+ println("Pi is roughly " + 4 * count.value / 100000.0)
+ }
+}
diff --git a/examples/src/main/scala/spark/examples/Vector.scala b/examples/src/main/scala/spark/examples/Vector.scala
new file mode 100644
index 0000000000..ea70626e71
--- /dev/null
+++ b/examples/src/main/scala/spark/examples/Vector.scala
@@ -0,0 +1,65 @@
+package spark.examples
+
+@serializable class Vector(val elements: Array[Double]) {
+ def length = elements.length
+
+ def apply(index: Int) = elements(index)
+
+ def + (other: Vector): Vector = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ return Vector(length, i => this(i) + other(i))
+ }
+
+ def - (other: Vector): Vector = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ return Vector(length, i => this(i) - other(i))
+ }
+
+ def dot(other: Vector): Double = {
+ if (length != other.length)
+ throw new IllegalArgumentException("Vectors of different length")
+ var ans = 0.0
+ for (i <- 0 until length)
+ ans += this(i) * other(i)
+ return ans
+ }
+
+ def * ( scale: Double): Vector = Vector(length, i => this(i) * scale)
+
+ def unary_- = this * -1
+
+ def sum = elements.reduceLeft(_ + _)
+
+ override def toString = elements.mkString("(", ", ", ")")
+
+}
+
+object Vector {
+ def apply(elements: Array[Double]) = new Vector(elements)
+
+ def apply(elements: Double*) = new Vector(elements.toArray)
+
+ def apply(length: Int, initializer: Int => Double): Vector = {
+ val elements = new Array[Double](length)
+ for (i <- 0 until length)
+ elements(i) = initializer(i)
+ return new Vector(elements)
+ }
+
+ def zeros(length: Int) = new Vector(new Array[Double](length))
+
+ def ones(length: Int) = Vector(length, _ => 1)
+
+ class Multiplier(num: Double) {
+ def * (vec: Vector) = vec * num
+ }
+
+ implicit def doubleToMultiplier(num: Double) = new Multiplier(num)
+
+ implicit object VectorAccumParam extends spark.AccumulatorParam[Vector] {
+ def addInPlace(t1: Vector, t2: Vector) = t1 + t2
+ def zero(initialValue: Vector) = Vector.zeros(initialValue.length)
+ }
+}