aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 19:34:32 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 19:34:32 -0800
commit5d7b591cfe14177f083814fe3e81745c5d279810 (patch)
tree0fd84b8a60d4ae79d3165f34159ded0e1b97c135 /examples
parent7b8853493248f4b2855a548facc407a3db939ba0 (diff)
downloadspark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.gz
spark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.bz2
spark-5d7b591cfe14177f083814fe3e81745c5d279810.zip
Pass a code JAR to SparkContext in our examples. Fixes SPARK-594.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/java/spark/examples/JavaHdfsLR.java6
-rw-r--r--examples/src/main/java/spark/examples/JavaTC.java5
-rw-r--r--examples/src/main/java/spark/examples/JavaWordCount.java5
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java3
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java4
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaQueueStream.java3
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala10
-rw-r--r--examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/HdfsTest.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalALS.scala4
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/spark/examples/MultiBroadcastTest.scala16
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala3
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala11
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala62
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkPi.scala3
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/QueueStream.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala3
35 files changed, 137 insertions, 82 deletions
diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java
index 29839d5668..8b0a9b6808 100644
--- a/examples/src/main/java/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java
@@ -10,6 +10,9 @@ import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.Random;
+/**
+ * Logistic regression based classification.
+ */
public class JavaHdfsLR {
static int D = 10; // Number of dimensions
@@ -85,7 +88,8 @@ public class JavaHdfsLR {
System.exit(1);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR");
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
index e3bd881b8f..b319bdab44 100644
--- a/examples/src/main/java/spark/examples/JavaTC.java
+++ b/examples/src/main/java/spark/examples/JavaTC.java
@@ -28,7 +28,7 @@ public class JavaTC {
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
if (from != to) edges.add(e);
}
- return new ArrayList(edges);
+ return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
@@ -46,7 +46,8 @@ public class JavaTC {
System.exit(1);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC");
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java
index a44cf8a120..9d4c7a252d 100644
--- a/examples/src/main/java/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/spark/examples/JavaWordCount.java
@@ -18,7 +18,8 @@ public class JavaWordCount {
System.exit(1);
}
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount");
+ JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@@ -29,7 +30,7 @@ public class JavaWordCount {
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
- return new Tuple2(s, 1);
+ return new Tuple2<String, Integer>(s, 1);
}
});
diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
index cddce16e39..e24c6ddaa7 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
@@ -32,7 +32,8 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval);
+ JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 0e9eadd01b..3e57580fd4 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -30,8 +30,8 @@ public class JavaNetworkWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(
- args[0], "NetworkWordCount", new Duration(1000));
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
index 43c3cd4dfa..15b82c8da1 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
@@ -22,7 +22,8 @@ public class JavaQueueStream {
}
// Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
index 230097c7db..ba59be1687 100644
--- a/examples/src/main/scala/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -9,19 +9,21 @@ object BroadcastTest {
System.exit(1)
}
- val spark = new SparkContext(args(0), "Broadcast Test")
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
+ }
for (i <- 0 until 2) {
println("Iteration " + i)
println("===========")
- val barr1 = spark.broadcast(arr1)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
}
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
index c89f3dac0c..21a90f2e5a 100644
--- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -9,9 +9,10 @@ object ExceptionHandlingTest {
System.exit(1)
}
- val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
- if (Math.random > 0.75)
+ if (math.random > 0.75)
throw new Exception("Testing exception handling")
}
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
index 86dfba3a40..a6603653f1 100644
--- a/examples/src/main/scala/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -9,14 +9,15 @@ object GroupByTest {
if (args.length == 0) {
System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
- }
+ }
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala
index 7a4530609d..dd61c467f7 100644
--- a/examples/src/main/scala/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/spark/examples/HdfsTest.scala
@@ -4,7 +4,8 @@ import spark._
object HdfsTest {
def main(args: Array[String]) {
- val sc = new SparkContext(args(0), "HdfsTest")
+ val sc = new SparkContext(args(0), "HdfsTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala
index 10e03359c9..2de810e062 100644
--- a/examples/src/main/scala/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/spark/examples/LocalALS.scala
@@ -1,11 +1,13 @@
package spark.examples
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
+/**
+ * Alternating least squares matrix factorization.
+ */
object LocalALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index b442c604cd..b07e799cef 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -6,6 +6,9 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object LocalKMeans {
val N = 1000
val R = 1000 // Scaling factor
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index 9553162004..cd73f553d6 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -3,6 +3,9 @@ package spark.examples
import java.util.Random
import spark.util.Vector
+/**
+ * Logistic regression based classification.
+ */
object LocalLR {
val N = 10000 // Number of data points
val D = 10 // Number of dimensions
diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala
index 5330b8da94..6497596d35 100644
--- a/examples/src/main/scala/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/spark/examples/LogQuery.scala
@@ -26,7 +26,9 @@ object LogQuery {
System.err.println("Usage: LogQuery <master> [logFile]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Log Query")
+
+ val sc = new SparkContext(args(0), "Log Query",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val dataSet =
if (args.length == 2) sc.textFile(args(1))
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
index 83ae014e94..92cd81c487 100644
--- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -9,21 +9,25 @@ object MultiBroadcastTest {
System.exit(1)
}
- val spark = new SparkContext(args(0), "Broadcast Test")
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
+ }
var arr2 = new Array[Int](num)
- for (i <- 0 until arr2.length)
+ for (i <- 0 until arr2.length) {
arr2(i) = i
+ }
- val barr1 = spark.broadcast(arr1)
- val barr2 = spark.broadcast(arr2)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ val barr2 = sc.broadcast(arr2)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size + barr2.value.size)
}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
index 50b3a263b4..0d17bda004 100644
--- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -18,7 +18,8 @@ object SimpleSkewedGroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
var ratio = if (args.length > 5) args(5).toInt else 5.0
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
index d2117a263e..83be3fc27b 100644
--- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -16,13 +16,14 @@ object SkewedGroupByTest {
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
// map output sizes lineraly increase from the 1st to the last
- numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+ numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
@@ -31,11 +32,11 @@ object SkewedGroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
- }.cache
+ }.cache()
// Enforce that everything has been calculated and in cache
- pairs1.count
+ pairs1.count()
- println(pairs1.groupByKey(numReducers).count)
+ println(pairs1.groupByKey(numReducers).count())
System.exit(0)
}
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
index 5e01885dbb..8fb3b0fb2a 100644
--- a/examples/src/main/scala/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -1,14 +1,14 @@
package spark.examples
-import java.io.Serializable
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
import spark._
-import scala.Option
+/**
+ * Alternating least squares matrix factorization.
+ */
object SparkALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
@@ -70,30 +70,32 @@ object SparkALS {
}
def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
+ System.exit(1)
+ }
+
var host = ""
var slices = 0
- (0 to 5).map(i => {
- i match {
- case a if a < args.length => Some(args(a))
- case _ => None
- }
- }).toArray match {
- case Array(host_, m, u, f, iters, slices_) => {
- host = host_ getOrElse "local"
- M = (m getOrElse "100").toInt
- U = (u getOrElse "500").toInt
- F = (f getOrElse "10").toInt
- ITERATIONS = (iters getOrElse "5").toInt
- slices = (slices_ getOrElse "2").toInt
- }
- case _ => {
- System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]")
+ val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None)
+
+ options.toArray match {
+ case Array(host_, m, u, f, iters, slices_) =>
+ host = host_.get
+ M = m.getOrElse("100").toInt
+ U = u.getOrElse("500").toInt
+ F = f.getOrElse("10").toInt
+ ITERATIONS = iters.getOrElse("5").toInt
+ slices = slices_.getOrElse("2").toInt
+ case _ =>
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
System.exit(1)
- }
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
- val spark = new SparkContext(host, "SparkALS")
+
+ val sc = new SparkContext(host, "SparkALS",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val R = generateR()
@@ -102,19 +104,19 @@ object SparkALS {
var us = Array.fill(U)(factory1D.random(F))
// Iteratively update movies then users
- val Rc = spark.broadcast(R)
- var msc = spark.broadcast(ms)
- var usc = spark.broadcast(us)
+ val Rc = sc.broadcast(R)
+ var msb = sc.broadcast(ms)
+ var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
- ms = spark.parallelize(0 until M, slices)
- .map(i => update(i, msc.value(i), usc.value, Rc.value))
+ ms = sc.parallelize(0 until M, slices)
+ .map(i => update(i, msb.value(i), usb.value, Rc.value))
.toArray
- msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
- us = spark.parallelize(0 until U, slices)
- .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value)))
+ msb = sc.broadcast(ms) // Re-broadcast ms because it was updated
+ us = sc.parallelize(0 until U, slices)
+ .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value)))
.toArray
- usc = spark.broadcast(us) // Re-broadcast us because it was updated
+ usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
}
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index 5b2bc84d69..0f42f405a0 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -5,6 +5,9 @@ import scala.math.exp
import spark.util.Vector
import spark._
+/**
+ * Logistic regression based classification.
+ */
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)
@@ -29,7 +32,8 @@ object SparkHdfsLR {
System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkHdfsLR")
+ val sc = new SparkContext(args(0), "SparkHdfsLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = sc.textFile(args(1))
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index 6375961390..7c21ea12fb 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -7,6 +7,9 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object SparkKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)
@@ -36,7 +39,8 @@ object SparkKMeans {
System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLocalKMeans")
+ val sc = new SparkContext(args(0), "SparkLocalKMeans",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
index aaaf062c8f..2f41aeb376 100644
--- a/examples/src/main/scala/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -5,6 +5,9 @@ import scala.math.exp
import spark.util.Vector
import spark._
+/**
+ * Logistic regression based classification.
+ */
object SparkLR {
val N = 10000 // Number of data points
val D = 10 // Numer of dimensions
@@ -28,7 +31,8 @@ object SparkLR {
System.err.println("Usage: SparkLR <master> [<slices>]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLR")
+ val sc = new SparkContext(args(0), "SparkLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala
index 2f226f1380..5a31d74444 100644
--- a/examples/src/main/scala/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/spark/examples/SparkPi.scala
@@ -10,7 +10,8 @@ object SparkPi {
System.err.println("Usage: SparkPi <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkPi")
+ val spark = new SparkContext(args(0), "SparkPi",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
index 90bae011ad..911ae8f168 100644
--- a/examples/src/main/scala/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -9,7 +9,6 @@ import scala.collection.mutable
* Transitive closure on a graph.
*/
object SparkTC {
-
val numEdges = 200
val numVertices = 100
val rand = new Random(42)
@@ -29,7 +28,8 @@ object SparkTC {
System.err.println("Usage: SparkTC <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkTC")
+ val spark = new SparkContext(args(0), "SparkTC",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 76293fbb96..3b847fe603 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -131,7 +131,8 @@ object ActorWordCount {
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
/*
* Following is the use of actorStream to plug in custom actor as receiver
diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index 461929fba2..39c76fd98a 100644
--- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -30,7 +30,8 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
index 8530f5c175..9389f8a38d 100644
--- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -22,7 +22,8 @@ object HdfsWordCount {
}
// Create the context
- val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 9b135a5c54..c3a9e491ba 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -32,8 +32,8 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
- val sc = new SparkContext(master, "KafkaWordCount")
- val ssc = new StreamingContext(sc, Seconds(2))
+ val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 5ac6d19b34..704540c2bf 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -23,7 +23,8 @@ object NetworkWordCount {
}
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
index e9cb7b55ea..f450e21040 100644
--- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -15,7 +15,8 @@ object QueueStream {
}
// Create the context
- val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
index 49b3223eec..175281e095 100644
--- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -31,7 +31,8 @@ object RawNetworkGrep {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context
- val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis))
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Warm up the JVMs on master and slave for JIT compilation to kick in
RawTextHelper.warmUp(ssc.sparkContext)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 39a1a702ee..483aae452b 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -43,7 +43,8 @@ object TwitterAlgebirdCMS {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 914fba4ca2..f3288bfb85 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -32,7 +32,8 @@ object TwitterAlgebirdHLL {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index fdb3a4c73c..9d4494c6f2 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -21,7 +21,8 @@ object TwitterPopularTags {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
index 5ed9b7cb76..74d0d338b7 100644
--- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -58,7 +58,8 @@ object ZeroMQWordCount {
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
@@ -70,4 +71,4 @@ object ZeroMQWordCount {
ssc.start()
}
-} \ No newline at end of file
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 9a2ba30ee4..e226a4a73a 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -24,7 +24,8 @@ object PageViewStream {
val port = args(2).toInt
// Create the context
- val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.socketTextStream(host, port)