diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-04-03 23:44:55 -0700 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2010-04-03 23:44:55 -0700 |
commit | 06aac8a88902f10182830259197d83adbafea516 (patch) | |
tree | d0cce1c32352412c116f53a0c1b6672e8068eb42 /src/examples | |
parent | df29d0ea4c8b7137fdd1844219c7d489e3b0d9c9 (diff) | |
download | spark-06aac8a88902f10182830259197d83adbafea516.tar.gz spark-06aac8a88902f10182830259197d83adbafea516.tar.bz2 spark-06aac8a88902f10182830259197d83adbafea516.zip |
Imported changes from old repository (mostly Mosharaf's work,
plus some fault tolerance code).
Diffstat (limited to 'src/examples')
-rw-r--r-- | src/examples/BroadcastTest.scala | 24 | ||||
-rw-r--r-- | src/examples/SparkALS.scala | 12 |
2 files changed, 30 insertions, 6 deletions
diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala new file mode 100644 index 0000000000..7764013413 --- /dev/null +++ b/src/examples/BroadcastTest.scala @@ -0,0 +1,24 @@ +import spark.SparkContext + +object BroadcastTest { + def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: BroadcastTest <host> [<slices>]") + System.exit(1) + } + val spark = new SparkContext(args(0), "Broadcast Test") + val slices = if (args.length > 1) args(1).toInt else 2 + val num = if (args.length > 2) args(2).toInt else 1000000 + + var arr = new Array[Int](num) + for (i <- 0 until arr.length) + arr(i) = i + + val barr = spark.broadcast(arr) + spark.parallelize(1 to 10, slices).foreach { + println("in task: barr = " + barr) + i => println(barr.value.size) + } + } +} + diff --git a/src/examples/SparkALS.scala b/src/examples/SparkALS.scala index 2fd58ed3a5..38dd0e665d 100644 --- a/src/examples/SparkALS.scala +++ b/src/examples/SparkALS.scala @@ -119,18 +119,18 @@ object SparkALS { // Iteratively update movies then users val Rc = spark.broadcast(R) - var msb = spark.broadcast(ms) - var usb = spark.broadcast(us) + var msc = spark.broadcast(ms) + var usc = spark.broadcast(us) for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") ms = spark.parallelize(0 until M, slices) - .map(i => updateMovie(i, msb.value(i), usb.value, Rc.value)) + .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value)) .toArray - msb = spark.broadcast(ms) // Re-broadcast ms because it was updated + msc = spark.broadcast(ms) // Re-broadcast ms because it was updated us = spark.parallelize(0 until U, slices) - .map(i => updateUser(i, usb.value(i), msb.value, Rc.value)) + .map(i => updateUser(i, usc.value(i), msc.value, Rc.value)) .toArray - usb = spark.broadcast(us) // Re-broadcast us because it was updated + usc = spark.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) println() } |