diff options
Diffstat (limited to 'examples/src')
27 files changed, 179 insertions, 122 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java index 12f3355bc4..d552c47b22 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java @@ -24,19 +24,19 @@ import org.apache.spark.api.java.function.Function2; import java.io.Serializable; import java.util.Arrays; -import java.util.StringTokenizer; import java.util.Random; +import java.util.regex.Pattern; /** * Logistic regression based classification. */ -public class JavaHdfsLR { +public final class JavaHdfsLR { - static int D = 10; // Number of dimensions - static Random rand = new Random(42); + private static final int D = 10; // Number of dimensions + private static final Random rand = new Random(42); static class DataPoint implements Serializable { - public DataPoint(double[] x, double y) { + DataPoint(double[] x, double y) { this.x = x; this.y = y; } @@ -46,20 +46,22 @@ public class JavaHdfsLR { } static class ParsePoint extends Function<String, DataPoint> { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public DataPoint call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - double y = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double y = Double.parseDouble(tok[0]); double[] x = new double[D]; - int i = 0; - while (i < D) { - x[i] = Double.parseDouble(tok.nextToken()); - i += 1; + for (int i = 0; i < D; i++) { + x[i] = Double.parseDouble(tok[i + 1]); } return new DataPoint(x, y); } } static class VectorSum extends Function2<double[], double[], double[]> { + @Override public double[] call(double[] a, double[] b) { double[] result = new double[D]; for (int j = 0; j < D; j++) { @@ -70,12 +72,13 @@ public class JavaHdfsLR { } static class ComputeGradient extends Function<DataPoint, double[]> { - double[] weights; + private final double[] weights; - public ComputeGradient(double[] weights) { + ComputeGradient(double[] weights) { this.weights = weights; } + @Override public double[] call(DataPoint p) { double[] gradient = new double[D]; for (int i = 0; i < D; i++) { diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java index 63465a3bbf..0dc879275a 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java @@ -27,19 +27,24 @@ import org.apache.spark.util.Vector; import java.util.List; import java.util.Map; +import java.util.regex.Pattern; /** * K-means clustering using Java API. */ -public class JavaKMeans { +public final class JavaKMeans { + + private static final Pattern SPACE = Pattern.compile(" "); /** Parses numbers split by whitespace to a vector */ static Vector parseVector(String line) { - String[] splits = line.split(" "); + String[] splits = SPACE.split(line); double[] data = new double[splits.length]; int i = 0; - for (String s : splits) - data[i] = Double.parseDouble(splits[i++]); + for (String s : splits) { + data[i] = Double.parseDouble(s); + i++; + } return new Vector(data); } @@ -82,7 +87,7 @@ public class JavaKMeans { JavaRDD<Vector> data = sc.textFile(path).map( new Function<String, Vector>() { @Override - public Vector call(String line) throws Exception { + public Vector call(String line) { return parseVector(line); } } @@ -96,7 +101,7 @@ public class JavaKMeans { JavaPairRDD<Integer, Vector> closest = data.map( new PairFunction<Vector, Integer, Vector>() { @Override - public Tuple2<Integer, Vector> call(Vector vector) throws Exception { + public Tuple2<Integer, Vector> call(Vector vector) { return new Tuple2<Integer, Vector>( closestPoint(vector, centroids), vector); } @@ -107,7 +112,8 @@ public class JavaKMeans { JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey(); Map<Integer, Vector> newCentroids = pointsGroup.mapValues( new Function<List<Vector>, Vector>() { - public Vector call(List<Vector> ps) throws Exception { + @Override + public Vector call(List<Vector> ps) { return average(ps); } }).collectAsMap(); @@ -122,8 +128,9 @@ public class JavaKMeans { } while (tempDist > convergeDist); System.out.println("Final centers:"); - for (Vector c : centroids) + for (Vector c : centroids) { System.out.println(c); + } System.exit(0); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java index 74e4d9291a..9eb1cadd71 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java @@ -35,9 +35,9 @@ import java.util.regex.Pattern; /** * Executes a roll up-style query against Apache logs. */ -public class JavaLogQuery { +public final class JavaLogQuery { - public static List<String> exampleApacheLogs = Lists.newArrayList( + public static final List<String> exampleApacheLogs = Lists.newArrayList( "10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " + "HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " + "Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " + @@ -51,14 +51,14 @@ public class JavaLogQuery { "3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " + "0 73.23.2.15 images.com 1358492557 - Whatup"); - public static Pattern apacheLogRegex = Pattern.compile( + public static final Pattern apacheLogRegex = Pattern.compile( "^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*"); /** Tracks the total query count and number of aggregate bytes for a particular group. */ public static class Stats implements Serializable { - private int count; - private int numBytes; + private final int count; + private final int numBytes; public Stats(int count, int numBytes) { this.count = count; @@ -92,12 +92,12 @@ public class JavaLogQuery { if (m.find()) { int bytes = Integer.parseInt(m.group(7)); return new Stats(1, bytes); - } - else + } else { return new Stats(1, 0); + } } - public static void main(String[] args) throws Exception { + public static void main(String[] args) { if (args.length == 0) { System.err.println("Usage: JavaLogQuery <master> [logFile]"); System.exit(1); @@ -110,14 +110,14 @@ public class JavaLogQuery { JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() { @Override - public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception { + public Tuple2<Tuple3<String, String, String>, Stats> call(String s) { return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s)); } }); JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() { @Override - public Stats call(Stats stats, Stats stats2) throws Exception { + public Stats call(Stats stats, Stats stats2) { return stats.merge(stats2); } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java index f774f6a04e..a84245b0c7 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java @@ -17,7 +17,6 @@ package org.apache.spark.examples; -import org.apache.spark.SparkContext; import scala.Tuple2; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; @@ -29,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.List; import java.util.ArrayList; +import java.util.regex.Pattern; /** * Computes the PageRank of URLs from an input file. Input file should @@ -39,7 +39,9 @@ import java.util.ArrayList; * ... * where URL and their neighbors are separated by space(s). */ -public class JavaPageRank { +public final class JavaPageRank { + private static final Pattern SPACES = Pattern.compile("\\s+"); + private static class Sum extends Function2<Double, Double, Double> { @Override public Double call(Double a, Double b) { @@ -67,7 +69,7 @@ public class JavaPageRank { JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() { @Override public Tuple2<String, String> call(String s) { - String[] parts = s.split("\\s+"); + String[] parts = SPACES.split(s); return new Tuple2<String, String>(parts[0], parts[1]); } }).distinct().groupByKey().cache(); @@ -75,7 +77,7 @@ public class JavaPageRank { // Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one. JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() { @Override - public Double call(List<String> rs) throws Exception { + public Double call(List<String> rs) { return 1.0; } }); @@ -98,7 +100,7 @@ public class JavaPageRank { // Re-calculates URL ranks based on neighbor contributions. ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() { @Override - public Double call(Double sum) throws Exception { + public Double call(Double sum) { return 0.15 + sum * 0.85; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java index 5558ab7c03..3ec4a58d48 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java @@ -26,8 +26,7 @@ import java.util.ArrayList; import java.util.List; /** Computes an approximation to pi */ -public class JavaSparkPi { - +public final class JavaSparkPi { public static void main(String[] args) throws Exception { if (args.length == 0) { @@ -41,21 +40,22 @@ public class JavaSparkPi { int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2; int n = 100000 * slices; List<Integer> l = new ArrayList<Integer>(n); - for (int i = 0; i < n; i++) + for (int i = 0; i < n; i++) { l.add(i); + } JavaRDD<Integer> dataSet = jsc.parallelize(l, slices); int count = dataSet.map(new Function<Integer, Integer>() { @Override - public Integer call(Integer integer) throws Exception { + public Integer call(Integer integer) { double x = Math.random() * 2 - 1; double y = Math.random() * 2 - 1; return (x * x + y * y < 1) ? 1 : 0; } }).reduce(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer integer, Integer integer2) throws Exception { + public Integer call(Integer integer, Integer integer2) { return integer + integer2; } }); diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java index 99e6ba347c..2ceb0fd94b 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java @@ -31,11 +31,11 @@ import java.util.Set; /** * Transitive closure on a graph, implemented in Java. */ -public class JavaTC { +public final class JavaTC { - static int numEdges = 200; - static int numVertices = 100; - static Random rand = new Random(42); + private static final int numEdges = 200; + private static final int numVertices = 100; + private static final Random rand = new Random(42); static List<Tuple2<Integer, Integer>> generateGraph() { Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges); @@ -43,15 +43,18 @@ public class JavaTC { int from = rand.nextInt(numVertices); int to = rand.nextInt(numVertices); Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to); - if (from != to) edges.add(e); + if (from != to) { + edges.add(e); + } } return new ArrayList<Tuple2<Integer, Integer>>(edges); } static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>, Integer, Integer> { - static ProjectFn INSTANCE = new ProjectFn(); + static final ProjectFn INSTANCE = new ProjectFn(); + @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) { return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1()); } @@ -76,6 +79,7 @@ public class JavaTC { // Because join() joins on keys, the edges are stored in reversed order. JavaPairRDD<Integer, Integer> edges = tc.map( new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() { + @Override public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) { return new Tuple2<Integer, Integer>(e._2(), e._1()); } diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java index 8a071caf13..6651f98d56 100644 --- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java @@ -27,8 +27,11 @@ import org.apache.spark.api.java.function.PairFunction; import java.util.Arrays; import java.util.List; +import java.util.regex.Pattern; + +public final class JavaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); -public class JavaWordCount { public static void main(String[] args) throws Exception { if (args.length < 2) { System.err.println("Usage: JavaWordCount <master> <file>"); @@ -40,18 +43,21 @@ public class JavaWordCount { JavaRDD<String> lines = ctx.textFile(args[1], 1); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { + @Override public Iterable<String> call(String s) { - return Arrays.asList(s.split(" ")); + return Arrays.asList(SPACE.split(s)); } }); JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() { + @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }); JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() { + @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java index 5e1a77baaa..435a86e62a 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java @@ -26,28 +26,32 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel; import org.apache.spark.mllib.recommendation.Rating; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; import scala.Tuple2; /** * Example using MLLib ALS from Java. */ -public class JavaALS { +public final class JavaALS { static class ParseRating extends Function<String, Rating> { + private static final Pattern COMMA = Pattern.compile(","); + + @Override public Rating call(String line) { - StringTokenizer tok = new StringTokenizer(line, ","); - int x = Integer.parseInt(tok.nextToken()); - int y = Integer.parseInt(tok.nextToken()); - double rating = Double.parseDouble(tok.nextToken()); + String[] tok = COMMA.split(line); + int x = Integer.parseInt(tok[0]); + int y = Integer.parseInt(tok[1]); + double rating = Double.parseDouble(tok[2]); return new Rating(x, y, rating); } } static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> { + @Override public String call(Tuple2<Object, double[]> element) { - return element._1().toString() + "," + Arrays.toString(element._2()); + return element._1() + "," + Arrays.toString(element._2()); } } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java index 1f12f518a0..4b2658f257 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java @@ -25,20 +25,22 @@ import org.apache.spark.mllib.clustering.KMeans; import org.apache.spark.mllib.clustering.KMeansModel; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Example using MLLib KMeans from Java. */ -public class JavaKMeans { +public final class JavaKMeans { static class ParsePoint extends Function<String, double[]> { + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public double[] call(String line) { - StringTokenizer tok = new StringTokenizer(line, " "); - int numTokens = tok.countTokens(); - double[] point = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - point[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(line); + double[] point = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + point[i] = Double.parseDouble(tok[i]); } return point; } diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java index 593e4df111..21586ce817 100644 --- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java +++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java @@ -27,22 +27,25 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel; import org.apache.spark.mllib.regression.LabeledPoint; import java.util.Arrays; -import java.util.StringTokenizer; +import java.util.regex.Pattern; /** * Logistic regression based classification using ML Lib. */ -public class JavaLR { +public final class JavaLR { static class ParsePoint extends Function<String, LabeledPoint> { + private static final Pattern COMMA = Pattern.compile(","); + private static final Pattern SPACE = Pattern.compile(" "); + + @Override public LabeledPoint call(String line) { - String[] parts = line.split(","); + String[] parts = COMMA.split(line); double y = Double.parseDouble(parts[0]); - StringTokenizer tok = new StringTokenizer(parts[1], " "); - int numTokens = tok.countTokens(); - double[] x = new double[numTokens]; - for (int i = 0; i < numTokens; ++i) { - x[i] = Double.parseDouble(tok.nextToken()); + String[] tok = SPACE.split(parts[1]); + double[] x = new double[tok.length]; + for (int i = 0; i < tok.length; ++i) { + x[i] = Double.parseDouble(tok[i]); } return new LabeledPoint(y, x); } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 64ac72474b..b11cfa667e 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; +import org.apache.spark.streaming.flume.FlumeUtils; +import org.apache.spark.streaming.flume.SparkFlumeEvent; /** * Produces a count of events received from Flume. @@ -36,7 +37,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent; * creates a server and listens for flume events. * <port> is the port the Flume receiver will listen on. */ -public class JavaFlumeEventCount { +public final class JavaFlumeEventCount { + private JavaFlumeEventCount() { + } + public static void main(String[] args) { if (args.length != 3) { System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>"); @@ -49,11 +53,10 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - - JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); + JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); flumeStream.count(); @@ -64,6 +67,6 @@ public class JavaFlumeEventCount { } }).print(); - sc.start(); + ssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 0a56e7abdf..16b8a948e6 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples; import java.util.Map; import java.util.HashMap; +import java.util.regex.Pattern; import com.google.common.collect.Lists; import org.apache.spark.api.java.function.FlatMapFunction; @@ -29,6 +30,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** @@ -45,7 +47,12 @@ import scala.Tuple2; * zoo03 my-consumer-group topic1,topic2 1` */ -public class JavaKafkaWordCount { +public final class JavaKafkaWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaKafkaWordCount() { + } + public static void main(String[] args) { if (args.length < 5) { System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>"); @@ -53,7 +60,7 @@ public class JavaKafkaWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount", + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); @@ -64,11 +71,11 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override - public String call(Tuple2<String, String> tuple2) throws Exception { + public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } }); @@ -76,24 +83,24 @@ public class JavaKafkaWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String s) throws Exception { + public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); - ssc.start(); + jssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java index ec6f6a8c56..1e2efd359c 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java @@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import java.util.regex.Pattern; + /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * Usage: NetworkWordCount <master> <hostname> <port> @@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext; * and then run the example * `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999` */ -public class JavaNetworkWordCount { +public final class JavaNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + private JavaNetworkWordCount() { + } + public static void main(String[] args) { if (args.length < 3) { System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" + @@ -57,18 +64,18 @@ public class JavaNetworkWordCount { JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String x) { - return Lists.newArrayList(x.split(" ")); + return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream<String, Integer> wordCounts = words.map( new PairFunction<String, String, Integer>() { @Override - public Tuple2<String, Integer> call(String s) throws Exception { + public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); } }).reduceByKey(new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java index 4b9fd52713..e05551ab83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java @@ -31,8 +31,11 @@ import java.util.LinkedList; import java.util.List; import java.util.Queue; -public class JavaQueueStream { - public static void main(String[] args) throws InterruptedException { +public final class JavaQueueStream { + private JavaQueueStream() { + } + + public static void main(String[] args) throws Exception { if (args.length < 1) { System.err.println("Usage: JavaQueueStream <master>"); System.exit(1); @@ -62,14 +65,14 @@ public class JavaQueueStream { JavaPairDStream<Integer, Integer> mappedStream = inputStream.map( new PairFunction<Integer, Integer, Integer>() { @Override - public Tuple2<Integer, Integer> call(Integer i) throws Exception { + public Tuple2<Integer, Integer> call(Integer i) { return new Tuple2<Integer, Integer>(i % 10, 1); } }); JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey( new Function2<Integer, Integer, Integer>() { @Override - public Integer call(Integer i1, Integer i2) throws Exception { + public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala index 546495357f..4e0058cd70 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ActorWordCount.scala @@ -134,9 +134,9 @@ object FeederActor { * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on. * * To run this example locally, you may run Feeder Actor as - * `$ ./bin/run-example spark.streaming.examples.FeederActor 127.0.1.1 9999` + * `$ ./bin/run-example org.apache.spark.streaming.examples.FeederActor 127.0.1.1 9999` * and then run the example - * `$ ./bin/run-example spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` + * `$ ./bin/run-example org.apache.spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999` */ object ActorWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 5ef1928294..ae3709b3d9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ /** * Produces a count of events received from Flume. @@ -51,7 +52,7 @@ object FlumeEventCount { System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a flume stream - val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala index 1486d77d8a..ea6ea67419 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/HdfsWordCount.scala @@ -28,7 +28,7 @@ import org.apache.spark.streaming.StreamingContext._ * <directory> is the directory that Spark Streaming will use to find and read new text files. * * To run this on your local machine on directory `localdir`, run this example - * `$ ./bin/run-example spark.streaming.examples.HdfsWordCount local[2] localdir` + * `$ ./bin/run-example org.apache.spark.streaming.examples.HdfsWordCount local[2] localdir` * Then create a text file in `localdir` and the words in the file will get counted. */ object HdfsWordCount { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 172091be2e..31a94bd224 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -24,6 +24,7 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.util.RawTextHelper._ +import org.apache.spark.streaming.kafka._ /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -35,7 +36,7 @@ import org.apache.spark.streaming.util.RawTextHelper._ * <numThreads> is the number of threads the kafka consumer should use * * Example: - * `./bin/run-example spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` + * `./bin/run-example org.apache.spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1` */ object KafkaWordCount { def main(args: Array[String]) { @@ -52,7 +53,7 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 2d02ef77c0..325290b66f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,11 +17,6 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.MQTTReceiver -import org.apache.spark.storage.StorageLevel - import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" @@ -97,7 +97,7 @@ object MQTTWordCount { val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala index 74d76ec26c..6a32c75373 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/NetworkWordCount.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example spark.streaming.examples.NetworkWordCount local[2] localhost 9999` + * `$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala index f43c8ab61d..002db57d59 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/StatefulNetworkWordCount.scala @@ -29,7 +29,7 @@ import org.apache.spark.streaming.StreamingContext._ * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example - * `$ ./bin/run-example spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` + * `$ ./bin/run-example org.apache.spark.streaming.examples.StatefulNetworkWordCount local[2] localhost 9999` */ object StatefulNetworkWordCount { def main(args: Array[String]) { diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 9d21d3178f..3ccdc908e2 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -23,6 +23,8 @@ import com.twitter.algebird._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ + /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. @@ -33,7 +35,7 @@ import org.apache.spark.SparkContext._ * <p> * <p> * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold @@ -61,7 +63,7 @@ object TwitterAlgebirdCMS { val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 5111e6f62a..c7e83e76b0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid -import org.apache.spark.streaming.dstream.TwitterInputDStream +import org.apache.spark.streaming.twitter._ /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute @@ -50,7 +50,7 @@ object TwitterAlgebirdHLL { val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 7a3df687b7..e2b0418d55 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter @@ -39,7 +40,7 @@ object TwitterPopularTags { val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters) + val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index 89d3042123..03902ec353 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe import akka.util.ByteString +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. @@ -62,9 +64,9 @@ object SimpleZeroMQPublisher { * <zeroMQurl> and <topic> describe where zeroMq publisher is running. * * To run this example locally, you may run publisher as - * `$ ./bin/run-example spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` + * `$ ./bin/run-example org.apache.spark.streaming.examples.SimpleZeroMQPublisher tcp://127.0.1.1:1234 foo.bar` * and run the example as - * `$ ./bin/run-example spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` + * `$ ./bin/run-example org.apache.spark.streaming.examples.ZeroMQWordCount local[2] tcp://127.0.1.1:1234 foo` */ object ZeroMQWordCount { def main(args: Array[String]) { @@ -83,11 +85,10 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator //For this stream, a zeroMQ publisher should be running. - val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() } - } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala index 1a40fdb9a3..4fe57de4a4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewGenerator.scala @@ -39,8 +39,8 @@ object PageView extends Serializable { /** Generates streaming events to simulate page views on a website. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 * * When running this, you may want to set the root logging level to ERROR in * conf/log4j.properties to reduce the verbosity of the output. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala index 0569846f18..807af199f4 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/clickstream/PageViewStream.scala @@ -25,8 +25,8 @@ import org.apache.spark.SparkContext._ * operators available in Spark streaming. * * This should be used in tandem with PageViewStream.scala. Example: - * $ ./bin/run-example spark.streaming.examples.clickstream.PageViewGenerator 44444 10 - * $ ./bin/run-example spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 + * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewGenerator 44444 10 + * $ ./bin/run-example org.apache.spark.streaming.examples.clickstream.PageViewStream errorRatePerZipCode localhost 44444 */ object PageViewStream { def main(args: Array[String]) { |