diff options
author | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-25 19:34:32 -0800 |
---|---|---|
committer | Matei Zaharia <matei@eecs.berkeley.edu> | 2013-02-25 19:34:32 -0800 |
commit | 5d7b591cfe14177f083814fe3e81745c5d279810 (patch) | |
tree | 0fd84b8a60d4ae79d3165f34159ded0e1b97c135 /examples/src/main | |
parent | 7b8853493248f4b2855a548facc407a3db939ba0 (diff) | |
download | spark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.gz spark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.bz2 spark-5d7b591cfe14177f083814fe3e81745c5d279810.zip |
Pass a code JAR to SparkContext in our examples. Fixes SPARK-594.
Diffstat (limited to 'examples/src/main')
35 files changed, 137 insertions, 82 deletions
diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java index 29839d5668..8b0a9b6808 100644 --- a/examples/src/main/java/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java @@ -10,6 +10,9 @@ import java.util.Arrays; import java.util.StringTokenizer; import java.util.Random; +/** + * Logistic regression based classification. + */ public class JavaHdfsLR { static int D = 10; // Number of dimensions @@ -85,7 +88,8 @@ public class JavaHdfsLR { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD<String> lines = sc.textFile(args[1]); JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache(); int ITERATIONS = Integer.parseInt(args[2]); diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java index e3bd881b8f..b319bdab44 100644 --- a/examples/src/main/java/spark/examples/JavaTC.java +++ b/examples/src/main/java/spark/examples/JavaTC.java @@ -28,7 +28,7 @@ public class JavaTC { Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to); if (from != to) edges.add(e); } - return new ArrayList(edges); + return new ArrayList<Tuple2<Integer, Integer>>(edges); } static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>, @@ -46,7 +46,8 @@ public class JavaTC { System.exit(1); } - JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC"); + JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2; JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache(); diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java index a44cf8a120..9d4c7a252d 100644 --- a/examples/src/main/java/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/spark/examples/JavaWordCount.java @@ -18,7 +18,8 @@ public class JavaWordCount { System.exit(1); } - JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount"); + JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount", + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaRDD<String> lines = ctx.textFile(args[1], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @@ -29,7 +30,7 @@ public class JavaWordCount { JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String s) { - return new Tuple2(s, 1); + return new Tuple2<String, Integer>(s, 1); } }); diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java index cddce16e39..e24c6ddaa7 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java @@ -32,7 +32,8 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval); + JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java index 0e9eadd01b..3e57580fd4 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java @@ -30,8 +30,8 @@ public class JavaNetworkWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext( - args[0], "NetworkWordCount", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount", + new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java index 43c3cd4dfa..15b82c8da1 100644 --- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java @@ -22,7 +22,8 @@ public class JavaQueueStream { } // Create the context - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000)); + JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000), + System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala index 230097c7db..ba59be1687 100644 --- a/examples/src/main/scala/spark/examples/BroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala @@ -9,19 +9,21 @@ object BroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } for (i <- 0 until 2) { println("Iteration " + i) println("===========") - val barr1 = spark.broadcast(arr1) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size) } } diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala index c89f3dac0c..21a90f2e5a 100644 --- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala +++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala @@ -9,9 +9,10 @@ object ExceptionHandlingTest { System.exit(1) } - val sc = new SparkContext(args(0), "ExceptionHandlingTest") + val sc = new SparkContext(args(0), "ExceptionHandlingTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) sc.parallelize(0 until sc.defaultParallelism).foreach { i => - if (Math.random > 0.75) + if (math.random > 0.75) throw new Exception("Testing exception handling") } diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala index 86dfba3a40..a6603653f1 100644 --- a/examples/src/main/scala/spark/examples/GroupByTest.scala +++ b/examples/src/main/scala/spark/examples/GroupByTest.scala @@ -9,14 +9,15 @@ object GroupByTest { if (args.length == 0) { System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]") System.exit(1) - } + } var numMappers = if (args.length > 1) args(1).toInt else 2 var numKVPairs = if (args.length > 2) args(2).toInt else 1000 var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala index 7a4530609d..dd61c467f7 100644 --- a/examples/src/main/scala/spark/examples/HdfsTest.scala +++ b/examples/src/main/scala/spark/examples/HdfsTest.scala @@ -4,7 +4,8 @@ import spark._ object HdfsTest { def main(args: Array[String]) { - val sc = new SparkContext(args(0), "HdfsTest") + val sc = new SparkContext(args(0), "HdfsTest", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val file = sc.textFile(args(1)) val mapped = file.map(s => s.length).cache() for (iter <- 1 to 10) { diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala index 10e03359c9..2de810e062 100644 --- a/examples/src/main/scala/spark/examples/LocalALS.scala +++ b/examples/src/main/scala/spark/examples/LocalALS.scala @@ -1,11 +1,13 @@ package spark.examples -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ +/** + * Alternating least squares matrix factorization. + */ object LocalALS { // Parameters set through command line arguments var M = 0 // Number of movies diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala index b442c604cd..b07e799cef 100644 --- a/examples/src/main/scala/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala @@ -6,6 +6,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object LocalKMeans { val N = 1000 val R = 1000 // Scaling factor diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala index 9553162004..cd73f553d6 100644 --- a/examples/src/main/scala/spark/examples/LocalLR.scala +++ b/examples/src/main/scala/spark/examples/LocalLR.scala @@ -3,6 +3,9 @@ package spark.examples import java.util.Random import spark.util.Vector +/** + * Logistic regression based classification. + */ object LocalLR { val N = 10000 // Number of data points val D = 10 // Number of dimensions diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala index 5330b8da94..6497596d35 100644 --- a/examples/src/main/scala/spark/examples/LogQuery.scala +++ b/examples/src/main/scala/spark/examples/LogQuery.scala @@ -26,7 +26,9 @@ object LogQuery { System.err.println("Usage: LogQuery <master> [logFile]") System.exit(1) } - val sc = new SparkContext(args(0), "Log Query") + + val sc = new SparkContext(args(0), "Log Query", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val dataSet = if (args.length == 2) sc.textFile(args(1)) diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala index 83ae014e94..92cd81c487 100644 --- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala +++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala @@ -9,21 +9,25 @@ object MultiBroadcastTest { System.exit(1) } - val spark = new SparkContext(args(0), "Broadcast Test") + val sc = new SparkContext(args(0), "Broadcast Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) + val slices = if (args.length > 1) args(1).toInt else 2 val num = if (args.length > 2) args(2).toInt else 1000000 var arr1 = new Array[Int](num) - for (i <- 0 until arr1.length) + for (i <- 0 until arr1.length) { arr1(i) = i + } var arr2 = new Array[Int](num) - for (i <- 0 until arr2.length) + for (i <- 0 until arr2.length) { arr2(i) = i + } - val barr1 = spark.broadcast(arr1) - val barr2 = spark.broadcast(arr2) - spark.parallelize(1 to 10, slices).foreach { + val barr1 = sc.broadcast(arr1) + val barr2 = sc.broadcast(arr2) + sc.parallelize(1 to 10, slices).foreach { i => println(barr1.value.size + barr2.value.size) } diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala index 50b3a263b4..0d17bda004 100644 --- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala @@ -18,7 +18,8 @@ object SimpleSkewedGroupByTest { var numReducers = if (args.length > 4) args(4).toInt else numMappers var ratio = if (args.length > 5) args(5).toInt else 5.0 - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala index d2117a263e..83be3fc27b 100644 --- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala +++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala @@ -16,13 +16,14 @@ object SkewedGroupByTest { var valSize = if (args.length > 3) args(3).toInt else 1000 var numReducers = if (args.length > 4) args(4).toInt else numMappers - val sc = new SparkContext(args(0), "GroupBy Test") + val sc = new SparkContext(args(0), "GroupBy Test", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p => val ranGen = new Random // map output sizes lineraly increase from the 1st to the last - numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt + numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt var arr1 = new Array[(Int, Array[Byte])](numKVPairs) for (i <- 0 until numKVPairs) { @@ -31,11 +32,11 @@ object SkewedGroupByTest { arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr) } arr1 - }.cache + }.cache() // Enforce that everything has been calculated and in cache - pairs1.count + pairs1.count() - println(pairs1.groupByKey(numReducers).count) + println(pairs1.groupByKey(numReducers).count()) System.exit(0) } diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala index 5e01885dbb..8fb3b0fb2a 100644 --- a/examples/src/main/scala/spark/examples/SparkALS.scala +++ b/examples/src/main/scala/spark/examples/SparkALS.scala @@ -1,14 +1,14 @@ package spark.examples -import java.io.Serializable -import java.util.Random import scala.math.sqrt import cern.jet.math._ import cern.colt.matrix._ import cern.colt.matrix.linalg._ import spark._ -import scala.Option +/** + * Alternating least squares matrix factorization. + */ object SparkALS { // Parameters set through command line arguments var M = 0 // Number of movies @@ -70,30 +70,32 @@ object SparkALS { } def main(args: Array[String]) { + if (args.length == 0) { + System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]") + System.exit(1) + } + var host = "" var slices = 0 - (0 to 5).map(i => { - i match { - case a if a < args.length => Some(args(a)) - case _ => None - } - }).toArray match { - case Array(host_, m, u, f, iters, slices_) => { - host = host_ getOrElse "local" - M = (m getOrElse "100").toInt - U = (u getOrElse "500").toInt - F = (f getOrElse "10").toInt - ITERATIONS = (iters getOrElse "5").toInt - slices = (slices_ getOrElse "2").toInt - } - case _ => { - System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]") + val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None) + + options.toArray match { + case Array(host_, m, u, f, iters, slices_) => + host = host_.get + M = m.getOrElse("100").toInt + U = u.getOrElse("500").toInt + F = f.getOrElse("10").toInt + ITERATIONS = iters.getOrElse("5").toInt + slices = slices_.getOrElse("2").toInt + case _ => + System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]") System.exit(1) - } } printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS) - val spark = new SparkContext(host, "SparkALS") + + val sc = new SparkContext(host, "SparkALS", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val R = generateR() @@ -102,19 +104,19 @@ object SparkALS { var us = Array.fill(U)(factory1D.random(F)) // Iteratively update movies then users - val Rc = spark.broadcast(R) - var msc = spark.broadcast(ms) - var usc = spark.broadcast(us) + val Rc = sc.broadcast(R) + var msb = sc.broadcast(ms) + var usb = sc.broadcast(us) for (iter <- 1 to ITERATIONS) { println("Iteration " + iter + ":") - ms = spark.parallelize(0 until M, slices) - .map(i => update(i, msc.value(i), usc.value, Rc.value)) + ms = sc.parallelize(0 until M, slices) + .map(i => update(i, msb.value(i), usb.value, Rc.value)) .toArray - msc = spark.broadcast(ms) // Re-broadcast ms because it was updated - us = spark.parallelize(0 until U, slices) - .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value))) + msb = sc.broadcast(ms) // Re-broadcast ms because it was updated + us = sc.parallelize(0 until U, slices) + .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value))) .toArray - usc = spark.broadcast(us) // Re-broadcast us because it was updated + usb = sc.broadcast(us) // Re-broadcast us because it was updated println("RMSE = " + rmse(R, ms, us)) println() } diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala index 5b2bc84d69..0f42f405a0 100644 --- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala +++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkHdfsLR { val D = 10 // Numer of dimensions val rand = new Random(42) @@ -29,7 +32,8 @@ object SparkHdfsLR { System.err.println("Usage: SparkHdfsLR <master> <file> <iters>") System.exit(1) } - val sc = new SparkContext(args(0), "SparkHdfsLR") + val sc = new SparkContext(args(0), "SparkHdfsLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val points = lines.map(parsePoint _).cache() val ITERATIONS = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala index 6375961390..7c21ea12fb 100644 --- a/examples/src/main/scala/spark/examples/SparkKMeans.scala +++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala @@ -7,6 +7,9 @@ import spark.SparkContext._ import scala.collection.mutable.HashMap import scala.collection.mutable.HashSet +/** + * K-means clustering. + */ object SparkKMeans { val R = 1000 // Scaling factor val rand = new Random(42) @@ -36,7 +39,8 @@ object SparkKMeans { System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLocalKMeans") + val sc = new SparkContext(args(0), "SparkLocalKMeans", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val lines = sc.textFile(args(1)) val data = lines.map(parseVector _).cache() val K = args(2).toInt diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala index aaaf062c8f..2f41aeb376 100644 --- a/examples/src/main/scala/spark/examples/SparkLR.scala +++ b/examples/src/main/scala/spark/examples/SparkLR.scala @@ -5,6 +5,9 @@ import scala.math.exp import spark.util.Vector import spark._ +/** + * Logistic regression based classification. + */ object SparkLR { val N = 10000 // Number of data points val D = 10 // Numer of dimensions @@ -28,7 +31,8 @@ object SparkLR { System.err.println("Usage: SparkLR <master> [<slices>]") System.exit(1) } - val sc = new SparkContext(args(0), "SparkLR") + val sc = new SparkContext(args(0), "SparkLR", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val numSlices = if (args.length > 1) args(1).toInt else 2 val points = sc.parallelize(generateData, numSlices).cache() diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala index 2f226f1380..5a31d74444 100644 --- a/examples/src/main/scala/spark/examples/SparkPi.scala +++ b/examples/src/main/scala/spark/examples/SparkPi.scala @@ -10,7 +10,8 @@ object SparkPi { System.err.println("Usage: SparkPi <master> [<slices>]") System.exit(1) } - val spark = new SparkContext(args(0), "SparkPi") + val spark = new SparkContext(args(0), "SparkPi", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 val n = 100000 * slices val count = spark.parallelize(1 to n, slices).map { i => diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala index 90bae011ad..911ae8f168 100644 --- a/examples/src/main/scala/spark/examples/SparkTC.scala +++ b/examples/src/main/scala/spark/examples/SparkTC.scala @@ -9,7 +9,6 @@ import scala.collection.mutable * Transitive closure on a graph. */ object SparkTC { - val numEdges = 200 val numVertices = 100 val rand = new Random(42) @@ -29,7 +28,8 @@ object SparkTC { System.err.println("Usage: SparkTC <master> [<slices>]") System.exit(1) } - val spark = new SparkContext(args(0), "SparkTC") + val spark = new SparkContext(args(0), "SparkTC", + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val slices = if (args.length > 1) args(1).toInt else 2 var tc = spark.parallelize(generateGraph, slices).cache() diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala index 76293fbb96..3b847fe603 100644 --- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala @@ -131,7 +131,8 @@ object ActorWordCount { val Seq(master, host, port) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) /* * Following is the use of actorStream to plug in custom actor as receiver diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala index 461929fba2..39c76fd98a 100644 --- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala @@ -30,7 +30,8 @@ object FlumeEventCount { val batchInterval = Milliseconds(2000) // Create the context and set the batch size - val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval) + val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval, + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a flume stream val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala index 8530f5c175..9389f8a38d 100644 --- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala @@ -22,7 +22,8 @@ object HdfsWordCount { } // Create the context - val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2)) + val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the FileInputDStream on the directory and use the // stream to count words in new files created diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala index 9b135a5c54..c3a9e491ba 100644 --- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala @@ -32,8 +32,8 @@ object KafkaWordCount { val Array(master, zkQuorum, group, topics, numThreads) = args - val sc = new SparkContext(master, "KafkaWordCount") - val ssc = new StreamingContext(sc, Seconds(2)) + val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala index 5ac6d19b34..704540c2bf 100644 --- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala @@ -23,7 +23,8 @@ object NetworkWordCount { } // Create the context with a 1 second batch size - val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1)) + val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target ip:port and count the // words in input stream of \n delimited test (eg. generated by 'nc') diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala index e9cb7b55ea..f450e21040 100644 --- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala @@ -15,7 +15,8 @@ object QueueStream { } // Create the context - val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1)) + val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create the queue through which RDDs can be pushed to // a QueueInputDStream diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala index 49b3223eec..175281e095 100644 --- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala +++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala @@ -31,7 +31,8 @@ object RawNetworkGrep { val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args // Create the context - val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis)) + val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Warm up the JVMs on master and slave for JIT compilation to kick in RawTextHelper.warmUp(ssc.sparkContext) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala index 39a1a702ee..483aae452b 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -43,7 +43,8 @@ object TwitterAlgebirdCMS { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10)) + val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala index 914fba4ca2..f3288bfb85 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -32,7 +32,8 @@ object TwitterAlgebirdHLL { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5)) + val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index fdb3a4c73c..9d4494c6f2 100644 --- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -21,7 +21,8 @@ object TwitterPopularTags { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala index 5ed9b7cb76..74d0d338b7 100644 --- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala @@ -58,7 +58,8 @@ object ZeroMQWordCount { val Seq(master, url, topic) = args.toSeq // Create the context and set the batch size - val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2)) + val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator @@ -70,4 +71,4 @@ object ZeroMQWordCount { ssc.start() } -}
\ No newline at end of file +} diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala index 9a2ba30ee4..e226a4a73a 100644 --- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala @@ -24,7 +24,8 @@ object PageViewStream { val port = args(2).toInt // Create the context - val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1)) + val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1), + System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR"))) // Create a NetworkInputDStream on target host:port and convert each line to a PageView val pageViews = ssc.socketTextStream(host, port) |