aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 19:34:32 -0800
committerMatei Zaharia <matei@eecs.berkeley.edu>2013-02-25 19:34:32 -0800
commit5d7b591cfe14177f083814fe3e81745c5d279810 (patch)
tree0fd84b8a60d4ae79d3165f34159ded0e1b97c135
parent7b8853493248f4b2855a548facc407a3db939ba0 (diff)
downloadspark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.gz
spark-5d7b591cfe14177f083814fe3e81745c5d279810.tar.bz2
spark-5d7b591cfe14177f083814fe3e81745c5d279810.zip
Pass a code JAR to SparkContext in our examples. Fixes SPARK-594.
-rw-r--r--examples/src/main/java/spark/examples/JavaHdfsLR.java6
-rw-r--r--examples/src/main/java/spark/examples/JavaTC.java5
-rw-r--r--examples/src/main/java/spark/examples/JavaWordCount.java5
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java3
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java4
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaQueueStream.java3
-rw-r--r--examples/src/main/scala/spark/examples/BroadcastTest.scala10
-rw-r--r--examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/GroupByTest.scala5
-rw-r--r--examples/src/main/scala/spark/examples/HdfsTest.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalALS.scala4
-rw-r--r--examples/src/main/scala/spark/examples/LocalKMeans.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LocalLR.scala3
-rw-r--r--examples/src/main/scala/spark/examples/LogQuery.scala4
-rw-r--r--examples/src/main/scala/spark/examples/MultiBroadcastTest.scala16
-rw-r--r--examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala3
-rw-r--r--examples/src/main/scala/spark/examples/SkewedGroupByTest.scala11
-rw-r--r--examples/src/main/scala/spark/examples/SparkALS.scala62
-rw-r--r--examples/src/main/scala/spark/examples/SparkHdfsLR.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkKMeans.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkLR.scala6
-rw-r--r--examples/src/main/scala/spark/examples/SparkPi.scala3
-rw-r--r--examples/src/main/scala/spark/examples/SparkTC.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala4
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/QueueStream.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala3
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala5
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala3
-rwxr-xr-xrun10
-rw-r--r--run2.cmd10
-rw-r--r--streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala17
38 files changed, 174 insertions, 82 deletions
diff --git a/examples/src/main/java/spark/examples/JavaHdfsLR.java b/examples/src/main/java/spark/examples/JavaHdfsLR.java
index 29839d5668..8b0a9b6808 100644
--- a/examples/src/main/java/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/spark/examples/JavaHdfsLR.java
@@ -10,6 +10,9 @@ import java.util.Arrays;
import java.util.StringTokenizer;
import java.util.Random;
+/**
+ * Logistic regression based classification.
+ */
public class JavaHdfsLR {
static int D = 10; // Number of dimensions
@@ -85,7 +88,8 @@ public class JavaHdfsLR {
System.exit(1);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR");
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/spark/examples/JavaTC.java b/examples/src/main/java/spark/examples/JavaTC.java
index e3bd881b8f..b319bdab44 100644
--- a/examples/src/main/java/spark/examples/JavaTC.java
+++ b/examples/src/main/java/spark/examples/JavaTC.java
@@ -28,7 +28,7 @@ public class JavaTC {
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
if (from != to) edges.add(e);
}
- return new ArrayList(edges);
+ return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
@@ -46,7 +46,8 @@ public class JavaTC {
System.exit(1);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC");
+ JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
diff --git a/examples/src/main/java/spark/examples/JavaWordCount.java b/examples/src/main/java/spark/examples/JavaWordCount.java
index a44cf8a120..9d4c7a252d 100644
--- a/examples/src/main/java/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/spark/examples/JavaWordCount.java
@@ -18,7 +18,8 @@ public class JavaWordCount {
System.exit(1);
}
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount");
+ JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@@ -29,7 +30,7 @@ public class JavaWordCount {
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String s) {
- return new Tuple2(s, 1);
+ return new Tuple2<String, Integer>(s, 1);
}
});
diff --git a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
index cddce16e39..e24c6ddaa7 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
@@ -32,7 +32,8 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval);
+ JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
diff --git a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 0e9eadd01b..3e57580fd4 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -30,8 +30,8 @@ public class JavaNetworkWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(
- args[0], "NetworkWordCount", new Duration(1000));
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
+ new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
index 43c3cd4dfa..15b82c8da1 100644
--- a/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
@@ -22,7 +22,8 @@ public class JavaQueueStream {
}
// Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
+ JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
+ System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/spark/examples/BroadcastTest.scala b/examples/src/main/scala/spark/examples/BroadcastTest.scala
index 230097c7db..ba59be1687 100644
--- a/examples/src/main/scala/spark/examples/BroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/BroadcastTest.scala
@@ -9,19 +9,21 @@ object BroadcastTest {
System.exit(1)
}
- val spark = new SparkContext(args(0), "Broadcast Test")
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
+ }
for (i <- 0 until 2) {
println("Iteration " + i)
println("===========")
- val barr1 = spark.broadcast(arr1)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size)
}
}
diff --git a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
index c89f3dac0c..21a90f2e5a 100644
--- a/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
+++ b/examples/src/main/scala/spark/examples/ExceptionHandlingTest.scala
@@ -9,9 +9,10 @@ object ExceptionHandlingTest {
System.exit(1)
}
- val sc = new SparkContext(args(0), "ExceptionHandlingTest")
+ val sc = new SparkContext(args(0), "ExceptionHandlingTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
sc.parallelize(0 until sc.defaultParallelism).foreach { i =>
- if (Math.random > 0.75)
+ if (math.random > 0.75)
throw new Exception("Testing exception handling")
}
diff --git a/examples/src/main/scala/spark/examples/GroupByTest.scala b/examples/src/main/scala/spark/examples/GroupByTest.scala
index 86dfba3a40..a6603653f1 100644
--- a/examples/src/main/scala/spark/examples/GroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/GroupByTest.scala
@@ -9,14 +9,15 @@ object GroupByTest {
if (args.length == 0) {
System.err.println("Usage: GroupByTest <master> [numMappers] [numKVPairs] [KeySize] [numReducers]")
System.exit(1)
- }
+ }
var numMappers = if (args.length > 1) args(1).toInt else 2
var numKVPairs = if (args.length > 2) args(2).toInt else 1000
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/examples/HdfsTest.scala b/examples/src/main/scala/spark/examples/HdfsTest.scala
index 7a4530609d..dd61c467f7 100644
--- a/examples/src/main/scala/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/spark/examples/HdfsTest.scala
@@ -4,7 +4,8 @@ import spark._
object HdfsTest {
def main(args: Array[String]) {
- val sc = new SparkContext(args(0), "HdfsTest")
+ val sc = new SparkContext(args(0), "HdfsTest",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val file = sc.textFile(args(1))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
diff --git a/examples/src/main/scala/spark/examples/LocalALS.scala b/examples/src/main/scala/spark/examples/LocalALS.scala
index 10e03359c9..2de810e062 100644
--- a/examples/src/main/scala/spark/examples/LocalALS.scala
+++ b/examples/src/main/scala/spark/examples/LocalALS.scala
@@ -1,11 +1,13 @@
package spark.examples
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
+/**
+ * Alternating least squares matrix factorization.
+ */
object LocalALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
diff --git a/examples/src/main/scala/spark/examples/LocalKMeans.scala b/examples/src/main/scala/spark/examples/LocalKMeans.scala
index b442c604cd..b07e799cef 100644
--- a/examples/src/main/scala/spark/examples/LocalKMeans.scala
+++ b/examples/src/main/scala/spark/examples/LocalKMeans.scala
@@ -6,6 +6,9 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object LocalKMeans {
val N = 1000
val R = 1000 // Scaling factor
diff --git a/examples/src/main/scala/spark/examples/LocalLR.scala b/examples/src/main/scala/spark/examples/LocalLR.scala
index 9553162004..cd73f553d6 100644
--- a/examples/src/main/scala/spark/examples/LocalLR.scala
+++ b/examples/src/main/scala/spark/examples/LocalLR.scala
@@ -3,6 +3,9 @@ package spark.examples
import java.util.Random
import spark.util.Vector
+/**
+ * Logistic regression based classification.
+ */
object LocalLR {
val N = 10000 // Number of data points
val D = 10 // Number of dimensions
diff --git a/examples/src/main/scala/spark/examples/LogQuery.scala b/examples/src/main/scala/spark/examples/LogQuery.scala
index 5330b8da94..6497596d35 100644
--- a/examples/src/main/scala/spark/examples/LogQuery.scala
+++ b/examples/src/main/scala/spark/examples/LogQuery.scala
@@ -26,7 +26,9 @@ object LogQuery {
System.err.println("Usage: LogQuery <master> [logFile]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "Log Query")
+
+ val sc = new SparkContext(args(0), "Log Query",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val dataSet =
if (args.length == 2) sc.textFile(args(1))
diff --git a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
index 83ae014e94..92cd81c487 100644
--- a/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
+++ b/examples/src/main/scala/spark/examples/MultiBroadcastTest.scala
@@ -9,21 +9,25 @@ object MultiBroadcastTest {
System.exit(1)
}
- val spark = new SparkContext(args(0), "Broadcast Test")
+ val sc = new SparkContext(args(0), "Broadcast Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
+
val slices = if (args.length > 1) args(1).toInt else 2
val num = if (args.length > 2) args(2).toInt else 1000000
var arr1 = new Array[Int](num)
- for (i <- 0 until arr1.length)
+ for (i <- 0 until arr1.length) {
arr1(i) = i
+ }
var arr2 = new Array[Int](num)
- for (i <- 0 until arr2.length)
+ for (i <- 0 until arr2.length) {
arr2(i) = i
+ }
- val barr1 = spark.broadcast(arr1)
- val barr2 = spark.broadcast(arr2)
- spark.parallelize(1 to 10, slices).foreach {
+ val barr1 = sc.broadcast(arr1)
+ val barr2 = sc.broadcast(arr2)
+ sc.parallelize(1 to 10, slices).foreach {
i => println(barr1.value.size + barr2.value.size)
}
diff --git a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
index 50b3a263b4..0d17bda004 100644
--- a/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SimpleSkewedGroupByTest.scala
@@ -18,7 +18,8 @@ object SimpleSkewedGroupByTest {
var numReducers = if (args.length > 4) args(4).toInt else numMappers
var ratio = if (args.length > 5) args(5).toInt else 5.0
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
diff --git a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
index d2117a263e..83be3fc27b 100644
--- a/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
+++ b/examples/src/main/scala/spark/examples/SkewedGroupByTest.scala
@@ -16,13 +16,14 @@ object SkewedGroupByTest {
var valSize = if (args.length > 3) args(3).toInt else 1000
var numReducers = if (args.length > 4) args(4).toInt else numMappers
- val sc = new SparkContext(args(0), "GroupBy Test")
+ val sc = new SparkContext(args(0), "GroupBy Test",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
// map output sizes lineraly increase from the 1st to the last
- numKVPairs = (1. * (p + 1) / numMappers * numKVPairs).toInt
+ numKVPairs = (1.0 * (p + 1) / numMappers * numKVPairs).toInt
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
@@ -31,11 +32,11 @@ object SkewedGroupByTest {
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
- }.cache
+ }.cache()
// Enforce that everything has been calculated and in cache
- pairs1.count
+ pairs1.count()
- println(pairs1.groupByKey(numReducers).count)
+ println(pairs1.groupByKey(numReducers).count())
System.exit(0)
}
diff --git a/examples/src/main/scala/spark/examples/SparkALS.scala b/examples/src/main/scala/spark/examples/SparkALS.scala
index 5e01885dbb..8fb3b0fb2a 100644
--- a/examples/src/main/scala/spark/examples/SparkALS.scala
+++ b/examples/src/main/scala/spark/examples/SparkALS.scala
@@ -1,14 +1,14 @@
package spark.examples
-import java.io.Serializable
-import java.util.Random
import scala.math.sqrt
import cern.jet.math._
import cern.colt.matrix._
import cern.colt.matrix.linalg._
import spark._
-import scala.Option
+/**
+ * Alternating least squares matrix factorization.
+ */
object SparkALS {
// Parameters set through command line arguments
var M = 0 // Number of movies
@@ -70,30 +70,32 @@ object SparkALS {
}
def main(args: Array[String]) {
+ if (args.length == 0) {
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
+ System.exit(1)
+ }
+
var host = ""
var slices = 0
- (0 to 5).map(i => {
- i match {
- case a if a < args.length => Some(args(a))
- case _ => None
- }
- }).toArray match {
- case Array(host_, m, u, f, iters, slices_) => {
- host = host_ getOrElse "local"
- M = (m getOrElse "100").toInt
- U = (u getOrElse "500").toInt
- F = (f getOrElse "10").toInt
- ITERATIONS = (iters getOrElse "5").toInt
- slices = (slices_ getOrElse "2").toInt
- }
- case _ => {
- System.err.println("Usage: SparkALS [<master> <M> <U> <F> <iters> <slices>]")
+ val options = (0 to 5).map(i => if (i < args.length) Some(args(i)) else None)
+
+ options.toArray match {
+ case Array(host_, m, u, f, iters, slices_) =>
+ host = host_.get
+ M = m.getOrElse("100").toInt
+ U = u.getOrElse("500").toInt
+ F = f.getOrElse("10").toInt
+ ITERATIONS = iters.getOrElse("5").toInt
+ slices = slices_.getOrElse("2").toInt
+ case _ =>
+ System.err.println("Usage: SparkALS <master> [<M> <U> <F> <iters> <slices>]")
System.exit(1)
- }
}
printf("Running with M=%d, U=%d, F=%d, iters=%d\n", M, U, F, ITERATIONS)
- val spark = new SparkContext(host, "SparkALS")
+
+ val sc = new SparkContext(host, "SparkALS",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val R = generateR()
@@ -102,19 +104,19 @@ object SparkALS {
var us = Array.fill(U)(factory1D.random(F))
// Iteratively update movies then users
- val Rc = spark.broadcast(R)
- var msc = spark.broadcast(ms)
- var usc = spark.broadcast(us)
+ val Rc = sc.broadcast(R)
+ var msb = sc.broadcast(ms)
+ var usb = sc.broadcast(us)
for (iter <- 1 to ITERATIONS) {
println("Iteration " + iter + ":")
- ms = spark.parallelize(0 until M, slices)
- .map(i => update(i, msc.value(i), usc.value, Rc.value))
+ ms = sc.parallelize(0 until M, slices)
+ .map(i => update(i, msb.value(i), usb.value, Rc.value))
.toArray
- msc = spark.broadcast(ms) // Re-broadcast ms because it was updated
- us = spark.parallelize(0 until U, slices)
- .map(i => update(i, usc.value(i), msc.value, algebra.transpose(Rc.value)))
+ msb = sc.broadcast(ms) // Re-broadcast ms because it was updated
+ us = sc.parallelize(0 until U, slices)
+ .map(i => update(i, usb.value(i), msb.value, algebra.transpose(Rc.value)))
.toArray
- usc = spark.broadcast(us) // Re-broadcast us because it was updated
+ usb = sc.broadcast(us) // Re-broadcast us because it was updated
println("RMSE = " + rmse(R, ms, us))
println()
}
diff --git a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
index 5b2bc84d69..0f42f405a0 100644
--- a/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkHdfsLR.scala
@@ -5,6 +5,9 @@ import scala.math.exp
import spark.util.Vector
import spark._
+/**
+ * Logistic regression based classification.
+ */
object SparkHdfsLR {
val D = 10 // Numer of dimensions
val rand = new Random(42)
@@ -29,7 +32,8 @@ object SparkHdfsLR {
System.err.println("Usage: SparkHdfsLR <master> <file> <iters>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkHdfsLR")
+ val sc = new SparkContext(args(0), "SparkHdfsLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = sc.textFile(args(1))
val points = lines.map(parsePoint _).cache()
val ITERATIONS = args(2).toInt
diff --git a/examples/src/main/scala/spark/examples/SparkKMeans.scala b/examples/src/main/scala/spark/examples/SparkKMeans.scala
index 6375961390..7c21ea12fb 100644
--- a/examples/src/main/scala/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/spark/examples/SparkKMeans.scala
@@ -7,6 +7,9 @@ import spark.SparkContext._
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
+/**
+ * K-means clustering.
+ */
object SparkKMeans {
val R = 1000 // Scaling factor
val rand = new Random(42)
@@ -36,7 +39,8 @@ object SparkKMeans {
System.err.println("Usage: SparkLocalKMeans <master> <file> <k> <convergeDist>")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLocalKMeans")
+ val sc = new SparkContext(args(0), "SparkLocalKMeans",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val lines = sc.textFile(args(1))
val data = lines.map(parseVector _).cache()
val K = args(2).toInt
diff --git a/examples/src/main/scala/spark/examples/SparkLR.scala b/examples/src/main/scala/spark/examples/SparkLR.scala
index aaaf062c8f..2f41aeb376 100644
--- a/examples/src/main/scala/spark/examples/SparkLR.scala
+++ b/examples/src/main/scala/spark/examples/SparkLR.scala
@@ -5,6 +5,9 @@ import scala.math.exp
import spark.util.Vector
import spark._
+/**
+ * Logistic regression based classification.
+ */
object SparkLR {
val N = 10000 // Number of data points
val D = 10 // Numer of dimensions
@@ -28,7 +31,8 @@ object SparkLR {
System.err.println("Usage: SparkLR <master> [<slices>]")
System.exit(1)
}
- val sc = new SparkContext(args(0), "SparkLR")
+ val sc = new SparkContext(args(0), "SparkLR",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val numSlices = if (args.length > 1) args(1).toInt else 2
val points = sc.parallelize(generateData, numSlices).cache()
diff --git a/examples/src/main/scala/spark/examples/SparkPi.scala b/examples/src/main/scala/spark/examples/SparkPi.scala
index 2f226f1380..5a31d74444 100644
--- a/examples/src/main/scala/spark/examples/SparkPi.scala
+++ b/examples/src/main/scala/spark/examples/SparkPi.scala
@@ -10,7 +10,8 @@ object SparkPi {
System.err.println("Usage: SparkPi <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkPi")
+ val spark = new SparkContext(args(0), "SparkPi",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
val n = 100000 * slices
val count = spark.parallelize(1 to n, slices).map { i =>
diff --git a/examples/src/main/scala/spark/examples/SparkTC.scala b/examples/src/main/scala/spark/examples/SparkTC.scala
index 90bae011ad..911ae8f168 100644
--- a/examples/src/main/scala/spark/examples/SparkTC.scala
+++ b/examples/src/main/scala/spark/examples/SparkTC.scala
@@ -9,7 +9,6 @@ import scala.collection.mutable
* Transitive closure on a graph.
*/
object SparkTC {
-
val numEdges = 200
val numVertices = 100
val rand = new Random(42)
@@ -29,7 +28,8 @@ object SparkTC {
System.err.println("Usage: SparkTC <master> [<slices>]")
System.exit(1)
}
- val spark = new SparkContext(args(0), "SparkTC")
+ val spark = new SparkContext(args(0), "SparkTC",
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val slices = if (args.length > 1) args(1).toInt else 2
var tc = spark.parallelize(generateGraph, slices).cache()
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
index 76293fbb96..3b847fe603 100644
--- a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -131,7 +131,8 @@ object ActorWordCount {
val Seq(master, host, port) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ActorWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
/*
* Following is the use of actorStream to plug in custom actor as receiver
diff --git a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
index 461929fba2..39c76fd98a 100644
--- a/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/FlumeEventCount.scala
@@ -30,7 +30,8 @@ object FlumeEventCount {
val batchInterval = Milliseconds(2000)
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval)
+ val ssc = new StreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a flume stream
val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
diff --git a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
index 8530f5c175..9389f8a38d 100644
--- a/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/HdfsWordCount.scala
@@ -22,7 +22,8 @@ object HdfsWordCount {
}
// Create the context
- val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2))
+ val ssc = new StreamingContext(args(0), "HdfsWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the FileInputDStream on the directory and use the
// stream to count words in new files created
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index 9b135a5c54..c3a9e491ba 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -32,8 +32,8 @@ object KafkaWordCount {
val Array(master, zkQuorum, group, topics, numThreads) = args
- val sc = new SparkContext(master, "KafkaWordCount")
- val ssc = new StreamingContext(sc, Seconds(2))
+ val ssc = new StreamingContext(master, "KafkaWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 5ac6d19b34..704540c2bf 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -23,7 +23,8 @@ object NetworkWordCount {
}
// Create the context with a 1 second batch size
- val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
+ val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
diff --git a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
index e9cb7b55ea..f450e21040 100644
--- a/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/QueueStream.scala
@@ -15,7 +15,8 @@ object QueueStream {
}
// Create the context
- val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1))
+ val ssc = new StreamingContext(args(0), "QueueStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
diff --git a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
index 49b3223eec..175281e095 100644
--- a/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
+++ b/examples/src/main/scala/spark/streaming/examples/RawNetworkGrep.scala
@@ -31,7 +31,8 @@ object RawNetworkGrep {
val Array(master, IntParam(numStreams), host, IntParam(port), IntParam(batchMillis)) = args
// Create the context
- val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis))
+ val ssc = new StreamingContext(master, "RawNetworkGrep", Milliseconds(batchMillis),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Warm up the JVMs on master and slave for JIT compilation to kick in
RawTextHelper.warmUp(ssc.sparkContext)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 39a1a702ee..483aae452b 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -43,7 +43,8 @@ object TwitterAlgebirdCMS {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 914fba4ca2..f3288bfb85 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -32,7 +32,8 @@ object TwitterAlgebirdHLL {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5))
+ val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index fdb3a4c73c..9d4494c6f2 100644
--- a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -21,7 +21,8 @@ object TwitterPopularTags {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
index 5ed9b7cb76..74d0d338b7 100644
--- a/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/ZeroMQWordCount.scala
@@ -58,7 +58,8 @@ object ZeroMQWordCount {
val Seq(master, url, topic) = args.toSeq
// Create the context and set the batch size
- val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2))
+ val ssc = new StreamingContext(master, "ZeroMQWordCount", Seconds(2),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
def bytesToStringIterator(x: Seq[Seq[Byte]]) = (x.map(x => new String(x.toArray))).iterator
@@ -70,4 +71,4 @@ object ZeroMQWordCount {
ssc.start()
}
-} \ No newline at end of file
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index 9a2ba30ee4..e226a4a73a 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -24,7 +24,8 @@ object PageViewStream {
val port = args(2).toInt
// Create the context
- val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
+ val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1),
+ System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_EXAMPLES_JAR")))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
val pageViews = ssc.socketTextStream(host, port)
diff --git a/run b/run
index fd06fbe7c7..2c780623c8 100755
--- a/run
+++ b/run
@@ -134,6 +134,16 @@ for jar in `find $PYSPARK_DIR/lib -name '*jar'`; do
done
export CLASSPATH # Needed for spark-shell
+# Figure out the JAR file that our examples were packaged into.
+if [ -e "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar" ]; then
+ # Use the JAR from the SBT build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/scala-$SCALA_VERSION/spark-examples"*".jar"`
+fi
+if [ -e "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar" ]; then
+ # Use the JAR from the Maven build
+ export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR/target/spark-examples-"*hadoop*".jar"`
+fi
+
# Figure out whether to run our class with java or with the scala launcher.
# In most cases, we'd prefer to execute our process with java because scala
# creates a shell script as the parent of its Java process, which makes it
diff --git a/run2.cmd b/run2.cmd
index 705a4d1ff6..f34869f1b1 100644
--- a/run2.cmd
+++ b/run2.cmd
@@ -62,6 +62,16 @@ set CLASSPATH=%CLASSPATH%;%FWDIR%repl\lib\*
set CLASSPATH=%CLASSPATH%;%FWDIR%python\lib\*
set CLASSPATH=%CLASSPATH%;%BAGEL_DIR%\target\scala-%SCALA_VERSION%\classes
+rem Figure out the JAR file that our examples were packaged into.
+rem First search in the build path from SBT:
+for /D %%d in ("%EXAMPLES_DIR%/target/scala-%SCALA_VERSION%/spark-examples*.jar") do (
+ set SPARK_EXAMPLES_JAR=%%d
+)
+rem Then search in the build path from Maven:
+for /D %%d in ("%EXAMPLES_DIR%/target/spark-examples*hadoop*.jar") do (
+ set SPARK_EXAMPLES_JAR=%%d
+)
+
rem Figure out whether to run our class with java or with the scala launcher.
rem In most cases, we'd prefer to execute our process with java because scala
rem creates a shell script as the parent of its Java process, which makes it
diff --git a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
index 755407aecc..3d149a742c 100644
--- a/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/spark/streaming/api/java/JavaStreamingContext.scala
@@ -49,6 +49,23 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @param appName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
* @param sparkHome The SPARK_HOME directory on the slave nodes
+ * @param jarFile JAR file containing job code, to ship to cluster. This can be a path on the local
+ * file system or an HDFS, HTTP, HTTPS, or FTP URL.
+ */
+ def this(
+ master: String,
+ appName: String,
+ batchDuration: Duration,
+ sparkHome: String,
+ jarFile: String) =
+ this(new StreamingContext(master, appName, batchDuration, sparkHome, Seq(jarFile), Map()))
+
+ /**
+ * Creates a StreamingContext.
+ * @param master Name of the Spark Master
+ * @param appName Name to be used when registering with the scheduler
+ * @param batchDuration The time interval at which streaming data will be divided into batches
+ * @param sparkHome The SPARK_HOME directory on the slave nodes
* @param jars Collection of JARs to send to the cluster. These can be paths on the local file
* system or HDFS, HTTP, HTTPS, or FTP URLs.
*/