aboutsummaryrefslogtreecommitdiff
path: root/src/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2010-04-03 23:44:55 -0700
committerMatei Zaharia <matei@eecs.berkeley.edu>2010-04-03 23:44:55 -0700
commit06aac8a88902f10182830259197d83adbafea516 (patch)
treed0cce1c32352412c116f53a0c1b6672e8068eb42 /src/examples
parentdf29d0ea4c8b7137fdd1844219c7d489e3b0d9c9 (diff)
downloadspark-06aac8a88902f10182830259197d83adbafea516.tar.gz
spark-06aac8a88902f10182830259197d83adbafea516.tar.bz2
spark-06aac8a88902f10182830259197d83adbafea516.zip
Imported changes from old repository (mostly Mosharaf's work,
plus some fault tolerance code).
Diffstat (limited to 'src/examples')
-rw-r--r--src/examples/BroadcastTest.scala24
-rw-r--r--src/examples/SparkALS.scala12
2 files changed, 30 insertions, 6 deletions
diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala
new file mode 100644
index 0000000000..7764013413
--- /dev/null
+++ b/src/examples/BroadcastTest.scala
@@ -0,0 +1,24 @@
+import spark.SparkContext
+
+object BroadcastTest {
+ def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: BroadcastTest <host> [<slices>]")
+ System.exit(1)
+ }
+ val spark = new SparkContext(args(0), "Broadcast Test")
+ val slices = if (args.length > 1) args(1).toInt else 2
+ val num = if (args.length > 2) args(2).toInt else 1000000
+
+ var arr = new Array[Int](num)
+ for (i <- 0 until arr.length)
+ arr(i) = i
+
+ val barr = spark.broadcast(arr)
+ spark.parallelize(1 to 10, slices).foreach {
+ println("in task: barr = " + barr)
+ i => println(barr.value.size)
+ }
+ }
+}
+
diff --git a/src/examples/SparkALS.scala b/src/examples/SparkALS.scala
index 2fd58ed3a5..38dd0e665d 100644
--- a/src/examples/SparkALS.scala
+++ b/src/examples/SparkALS.scala
@@ -119,18 +119,18 @@ object SparkALS {
// Iteratively update movies then users
val Rc = spark.broadcast(R)
- var msb = spark.broadcast(ms)
- var usb = spark.broadcast(us)
+ var msc = spark.broadcast(ms)
+ var usc = spark.broadcast(us)
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
ms = spark.parallelize(0 until M, slices)
- .map(i => updateMovie(i, msb.value(i), usb.value, Rc.value))
+ .map(i => updateMovie(i, msc.value(i), usc.value, Rc.value))
.toArray
- msb = spark.broadcast(ms) // Re-broadcast ms because it was updated
+ msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
us = spark.parallelize(0 until U, slices)
- .map(i => updateUser(i, usb.value(i), msb.value, Rc.value))
+ .map(i => updateUser(i, usc.value(i), msc.value, Rc.value))
.toArray
- usb = spark.broadcast(us) // Re-broadcast us because it was updated
+ usc = spark.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
}