aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2014-01-09 15:06:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2014-01-09 15:06:24 -0800
commitf1d206c6b4c0a5b2517b05af05fdda6049e2f7c2 (patch)
tree452073d1bbee125f0c7c1b8d4908fae54fd1eb2a /examples/src/main/java
parent6f713e2a3e56185368b66fb087637dec112a1f5d (diff)
parent67b9a33628b9934804c36620d8cbc73ef70106ce (diff)
downloadspark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.tar.gz
spark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.tar.bz2
spark-f1d206c6b4c0a5b2517b05af05fdda6049e2f7c2.zip
Merge branch 'standalone-driver' into driver-test
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java31
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaKMeans.java25
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java22
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java12
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java20
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java18
-rw-r--r--examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java21
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java18
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java28
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java18
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java13
14 files changed, 162 insertions, 107 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index be0d38589c..d552c47b22 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -24,19 +24,19 @@ import org.apache.spark.api.java.function.Function2;
import java.io.Serializable;
import java.util.Arrays;
-import java.util.StringTokenizer;
import java.util.Random;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification.
*/
-public class JavaHdfsLR {
+public final class JavaHdfsLR {
- static int D = 10; // Number of dimensions
- static Random rand = new Random(42);
+ private static final int D = 10; // Number of dimensions
+ private static final Random rand = new Random(42);
static class DataPoint implements Serializable {
- public DataPoint(double[] x, double y) {
+ DataPoint(double[] x, double y) {
this.x = x;
this.y = y;
}
@@ -46,20 +46,22 @@ public class JavaHdfsLR {
}
static class ParsePoint extends Function<String, DataPoint> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public DataPoint call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- double y = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double y = Double.parseDouble(tok[0]);
double[] x = new double[D];
- int i = 0;
- while (i < D) {
- x[i] = Double.parseDouble(tok.nextToken());
- i += 1;
+ for (int i = 0; i < D; i++) {
+ x[i] = Double.parseDouble(tok[i + 1]);
}
return new DataPoint(x, y);
}
}
static class VectorSum extends Function2<double[], double[], double[]> {
+ @Override
public double[] call(double[] a, double[] b) {
double[] result = new double[D];
for (int j = 0; j < D; j++) {
@@ -70,12 +72,13 @@ public class JavaHdfsLR {
}
static class ComputeGradient extends Function<DataPoint, double[]> {
- double[] weights;
+ private final double[] weights;
- public ComputeGradient(double[] weights) {
+ ComputeGradient(double[] weights) {
this.weights = weights;
}
+ @Override
public double[] call(DataPoint p) {
double[] gradient = new double[D];
for (int i = 0; i < D; i++) {
@@ -106,7 +109,7 @@ public class JavaHdfsLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
int ITERATIONS = Integer.parseInt(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
index 5a6afe7eae..0dc879275a 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaKMeans.java
@@ -27,19 +27,24 @@ import org.apache.spark.util.Vector;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
/**
* K-means clustering using Java API.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
+
+ private static final Pattern SPACE = Pattern.compile(" ");
/** Parses numbers split by whitespace to a vector */
static Vector parseVector(String line) {
- String[] splits = line.split(" ");
+ String[] splits = SPACE.split(line);
double[] data = new double[splits.length];
int i = 0;
- for (String s : splits)
- data[i] = Double.parseDouble(splits[i++]);
+ for (String s : splits) {
+ data[i] = Double.parseDouble(s);
+ i++;
+ }
return new Vector(data);
}
@@ -74,7 +79,7 @@ public class JavaKMeans {
System.exit(1);
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
String path = args[1];
int K = Integer.parseInt(args[2]);
double convergeDist = Double.parseDouble(args[3]);
@@ -82,7 +87,7 @@ public class JavaKMeans {
JavaRDD<Vector> data = sc.textFile(path).map(
new Function<String, Vector>() {
@Override
- public Vector call(String line) throws Exception {
+ public Vector call(String line) {
return parseVector(line);
}
}
@@ -96,7 +101,7 @@ public class JavaKMeans {
JavaPairRDD<Integer, Vector> closest = data.map(
new PairFunction<Vector, Integer, Vector>() {
@Override
- public Tuple2<Integer, Vector> call(Vector vector) throws Exception {
+ public Tuple2<Integer, Vector> call(Vector vector) {
return new Tuple2<Integer, Vector>(
closestPoint(vector, centroids), vector);
}
@@ -107,7 +112,8 @@ public class JavaKMeans {
JavaPairRDD<Integer, List<Vector>> pointsGroup = closest.groupByKey();
Map<Integer, Vector> newCentroids = pointsGroup.mapValues(
new Function<List<Vector>, Vector>() {
- public Vector call(List<Vector> ps) throws Exception {
+ @Override
+ public Vector call(List<Vector> ps) {
return average(ps);
}
}).collectAsMap();
@@ -122,8 +128,9 @@ public class JavaKMeans {
} while (tempDist > convergeDist);
System.out.println("Final centers:");
- for (Vector c : centroids)
+ for (Vector c : centroids) {
System.out.println(c);
+ }
System.exit(0);
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 407cd7ccfa..9eb1cadd71 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -35,9 +35,9 @@ import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
*/
-public class JavaLogQuery {
+public final class JavaLogQuery {
- public static List<String> exampleApacheLogs = Lists.newArrayList(
+ public static final List<String> exampleApacheLogs = Lists.newArrayList(
"10.10.10.10 - \"FRED\" [18/Jan/2013:17:56:07 +1100] \"GET http://images.com/2013/Generic.jpg " +
"HTTP/1.1\" 304 315 \"http://referall.com/\" \"Mozilla/4.0 (compatible; MSIE 7.0; " +
"Windows NT 5.1; GTB7.4; .NET CLR 2.0.50727; .NET CLR 3.0.04506.30; .NET CLR 3.0.04506.648; " +
@@ -51,14 +51,14 @@ public class JavaLogQuery {
"3.5.30729; Release=ARP)\" \"UD-1\" - \"image/jpeg\" \"whatever\" 0.352 \"-\" - \"\" 256 977 988 \"\" " +
"0 73.23.2.15 images.com 1358492557 - Whatup");
- public static Pattern apacheLogRegex = Pattern.compile(
+ public static final Pattern apacheLogRegex = Pattern.compile(
"^([\\d.]+) (\\S+) (\\S+) \\[([\\w\\d:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3}) ([\\d\\-]+) \"([^\"]+)\" \"([^\"]+)\".*");
/** Tracks the total query count and number of aggregate bytes for a particular group. */
public static class Stats implements Serializable {
- private int count;
- private int numBytes;
+ private final int count;
+ private final int numBytes;
public Stats(int count, int numBytes) {
this.count = count;
@@ -92,32 +92,32 @@ public class JavaLogQuery {
if (m.find()) {
int bytes = Integer.parseInt(m.group(7));
return new Stats(1, bytes);
- }
- else
+ } else {
return new Stats(1, 0);
+ }
}
- public static void main(String[] args) throws Exception {
+ public static void main(String[] args) {
if (args.length == 0) {
System.err.println("Usage: JavaLogQuery <master> [logFile]");
System.exit(1);
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.map(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
- public Tuple2<Tuple3<String, String, String>, Stats> call(String s) throws Exception {
+ public Tuple2<Tuple3<String, String, String>, Stats> call(String s) {
return new Tuple2<Tuple3<String, String, String>, Stats>(extractKey(s), extractStats(s));
}
});
JavaPairRDD<Tuple3<String, String, String>, Stats> counts = extracted.reduceByKey(new Function2<Stats, Stats, Stats>() {
@Override
- public Stats call(Stats stats, Stats stats2) throws Exception {
+ public Stats call(Stats stats, Stats stats2) {
return stats.merge(stats2);
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index 89aed8f279..a84245b0c7 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -28,6 +28,7 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.List;
import java.util.ArrayList;
+import java.util.regex.Pattern;
/**
* Computes the PageRank of URLs from an input file. Input file should
@@ -38,7 +39,9 @@ import java.util.ArrayList;
* ...
* where URL and their neighbors are separated by space(s).
*/
-public class JavaPageRank {
+public final class JavaPageRank {
+ private static final Pattern SPACES = Pattern.compile("\\s+");
+
private static class Sum extends Function2<Double, Double, Double> {
@Override
public Double call(Double a, Double b) {
@@ -53,7 +56,7 @@ public class JavaPageRank {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
// Loads in input file. It should be in format of:
// URL neighbor URL
@@ -66,7 +69,7 @@ public class JavaPageRank {
JavaPairRDD<String, List<String>> links = lines.map(new PairFunction<String, String, String>() {
@Override
public Tuple2<String, String> call(String s) {
- String[] parts = s.split("\\s+");
+ String[] parts = SPACES.split(s);
return new Tuple2<String, String>(parts[0], parts[1]);
}
}).distinct().groupByKey().cache();
@@ -74,7 +77,7 @@ public class JavaPageRank {
// Loads all URLs with other URL(s) link to from input file and initialize ranks of them to one.
JavaPairRDD<String, Double> ranks = links.mapValues(new Function<List<String>, Double>() {
@Override
- public Double call(List<String> rs) throws Exception {
+ public Double call(List<String> rs) {
return 1.0;
}
});
@@ -97,7 +100,7 @@ public class JavaPageRank {
// Re-calculates URL ranks based on neighbor contributions.
ranks = contribs.reduceByKey(new Sum()).mapValues(new Function<Double, Double>() {
@Override
- public Double call(Double sum) throws Exception {
+ public Double call(Double sum) {
return 0.15 + sum * 0.85;
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index 4a2380caf5..3ec4a58d48 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -26,8 +26,7 @@ import java.util.ArrayList;
import java.util.List;
/** Computes an approximation to pi */
-public class JavaSparkPi {
-
+public final class JavaSparkPi {
public static void main(String[] args) throws Exception {
if (args.length == 0) {
@@ -36,26 +35,27 @@ public class JavaSparkPi {
}
JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
- for (int i = 0; i < n; i++)
+ for (int i = 0; i < n; i++) {
l.add(i);
+ }
JavaRDD<Integer> dataSet = jsc.parallelize(l, slices);
int count = dataSet.map(new Function<Integer, Integer>() {
@Override
- public Integer call(Integer integer) throws Exception {
+ public Integer call(Integer integer) {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y < 1) ? 1 : 0;
}
}).reduce(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer integer, Integer integer2) throws Exception {
+ public Integer call(Integer integer, Integer integer2) {
return integer + integer2;
}
});
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index 17f21f6b77..2ceb0fd94b 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -31,11 +31,11 @@ import java.util.Set;
/**
* Transitive closure on a graph, implemented in Java.
*/
-public class JavaTC {
+public final class JavaTC {
- static int numEdges = 200;
- static int numVertices = 100;
- static Random rand = new Random(42);
+ private static final int numEdges = 200;
+ private static final int numVertices = 100;
+ private static final Random rand = new Random(42);
static List<Tuple2<Integer, Integer>> generateGraph() {
Set<Tuple2<Integer, Integer>> edges = new HashSet<Tuple2<Integer, Integer>>(numEdges);
@@ -43,15 +43,18 @@ public class JavaTC {
int from = rand.nextInt(numVertices);
int to = rand.nextInt(numVertices);
Tuple2<Integer, Integer> e = new Tuple2<Integer, Integer>(from, to);
- if (from != to) edges.add(e);
+ if (from != to) {
+ edges.add(e);
+ }
}
return new ArrayList<Tuple2<Integer, Integer>>(edges);
}
static class ProjectFn extends PairFunction<Tuple2<Integer, Tuple2<Integer, Integer>>,
Integer, Integer> {
- static ProjectFn INSTANCE = new ProjectFn();
+ static final ProjectFn INSTANCE = new ProjectFn();
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Tuple2<Integer, Integer>> triple) {
return new Tuple2<Integer, Integer>(triple._2()._2(), triple._2()._1());
}
@@ -64,7 +67,7 @@ public class JavaTC {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
@@ -76,6 +79,7 @@ public class JavaTC {
// Because join() joins on keys, the edges are stored in reversed order.
JavaPairRDD<Integer, Integer> edges = tc.map(
new PairFunction<Tuple2<Integer, Integer>, Integer, Integer>() {
+ @Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> e) {
return new Tuple2<Integer, Integer>(e._2(), e._1());
}
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index bd6383e13d..6651f98d56 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -27,8 +27,11 @@ import org.apache.spark.api.java.function.PairFunction;
import java.util.Arrays;
import java.util.List;
+import java.util.regex.Pattern;
+
+public final class JavaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
-public class JavaWordCount {
public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaWordCount <master> <file>");
@@ -36,22 +39,25 @@ public class JavaWordCount {
}
JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
JavaRDD<String> lines = ctx.textFile(args[1], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
+ @Override
public Iterable<String> call(String s) {
- return Arrays.asList(s.split(" "));
+ return Arrays.asList(SPACE.split(s));
}
});
JavaPairRDD<String, Integer> ones = words.map(new PairFunction<String, String, Integer>() {
+ @Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
});
JavaPairRDD<String, Integer> counts = ones.reduceByKey(new Function2<Integer, Integer, Integer>() {
+ @Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
index 45a0d237da..435a86e62a 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaALS.java
@@ -26,28 +26,32 @@ import org.apache.spark.mllib.recommendation.MatrixFactorizationModel;
import org.apache.spark.mllib.recommendation.Rating;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
import scala.Tuple2;
/**
* Example using MLLib ALS from Java.
*/
-public class JavaALS {
+public final class JavaALS {
static class ParseRating extends Function<String, Rating> {
+ private static final Pattern COMMA = Pattern.compile(",");
+
+ @Override
public Rating call(String line) {
- StringTokenizer tok = new StringTokenizer(line, ",");
- int x = Integer.parseInt(tok.nextToken());
- int y = Integer.parseInt(tok.nextToken());
- double rating = Double.parseDouble(tok.nextToken());
+ String[] tok = COMMA.split(line);
+ int x = Integer.parseInt(tok[0]);
+ int y = Integer.parseInt(tok[1]);
+ double rating = Double.parseDouble(tok[2]);
return new Rating(x, y, rating);
}
}
static class FeaturesToString extends Function<Tuple2<Object, double[]>, String> {
+ @Override
public String call(Tuple2<Object, double[]> element) {
- return element._1().toString() + "," + Arrays.toString(element._2());
+ return element._1() + "," + Arrays.toString(element._2());
}
}
@@ -68,7 +72,7 @@ public class JavaALS {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
index cd59a139b9..4b2658f257 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaKMeans.java
@@ -25,20 +25,22 @@ import org.apache.spark.mllib.clustering.KMeans;
import org.apache.spark.mllib.clustering.KMeansModel;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Example using MLLib KMeans from Java.
*/
-public class JavaKMeans {
+public final class JavaKMeans {
static class ParsePoint extends Function<String, double[]> {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public double[] call(String line) {
- StringTokenizer tok = new StringTokenizer(line, " ");
- int numTokens = tok.countTokens();
- double[] point = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- point[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(line);
+ double[] point = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ point[i] = Double.parseDouble(tok[i]);
}
return point;
}
@@ -62,7 +64,7 @@ public class JavaKMeans {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<double[]> points = lines.map(new ParsePoint());
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 258061c8e6..21586ce817 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -27,22 +27,25 @@ import org.apache.spark.mllib.classification.LogisticRegressionModel;
import org.apache.spark.mllib.regression.LabeledPoint;
import java.util.Arrays;
-import java.util.StringTokenizer;
+import java.util.regex.Pattern;
/**
* Logistic regression based classification using ML Lib.
*/
-public class JavaLR {
+public final class JavaLR {
static class ParsePoint extends Function<String, LabeledPoint> {
+ private static final Pattern COMMA = Pattern.compile(",");
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ @Override
public LabeledPoint call(String line) {
- String[] parts = line.split(",");
+ String[] parts = COMMA.split(line);
double y = Double.parseDouble(parts[0]);
- StringTokenizer tok = new StringTokenizer(parts[1], " ");
- int numTokens = tok.countTokens();
- double[] x = new double[numTokens];
- for (int i = 0; i < numTokens; ++i) {
- x[i] = Double.parseDouble(tok.nextToken());
+ String[] tok = SPACE.split(parts[1]);
+ double[] x = new double[tok.length];
+ for (int i = 0; i < tok.length; ++i) {
+ x[i] = Double.parseDouble(tok[i]);
}
return new LabeledPoint(y, x);
}
@@ -59,7 +62,7 @@ public class JavaLR {
}
JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
JavaRDD<String> lines = sc.textFile(args[1]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
double stepSize = Double.parseDouble(args[2]);
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 261813bf2f..b11cfa667e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@@ -36,7 +37,10 @@ import org.apache.spark.streaming.dstream.SparkFlumeEvent;
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
-public class JavaFlumeEventCount {
+public final class JavaFlumeEventCount {
+ private JavaFlumeEventCount() {
+ }
+
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
@@ -49,10 +53,10 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
+ JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
@@ -63,6 +67,6 @@ public class JavaFlumeEventCount {
}
}).print();
- sc.start();
+ ssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 9a8e4209ed..16b8a948e6 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -19,6 +19,7 @@ package org.apache.spark.streaming.examples;
import java.util.Map;
import java.util.HashMap;
+import java.util.regex.Pattern;
import com.google.common.collect.Lists;
import org.apache.spark.api.java.function.FlatMapFunction;
@@ -29,6 +30,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
/**
@@ -41,11 +43,16 @@ import scala.Tuple2;
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * `./bin/run-example org.apache.spark.streaming.examples.JavaKafkaWordCount local[2] zoo01,zoo02,
* zoo03 my-consumer-group topic1,topic2 1`
*/
-public class JavaKafkaWordCount {
+public final class JavaKafkaWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaKafkaWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 5) {
System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
@@ -53,8 +60,9 @@ public class JavaKafkaWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
- new Duration(2000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ new Duration(2000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
int numThreads = Integer.parseInt(args[4]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
@@ -63,11 +71,11 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+ JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
- public String call(Tuple2<String, String> tuple2) throws Exception {
+ public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
@@ -75,24 +83,24 @@ public class JavaKafkaWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
- ssc.start();
+ jssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
index d8d6046914..e96996aa75 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaNetworkWordCount.java
@@ -27,6 +27,8 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import java.util.regex.Pattern;
+
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: NetworkWordCount <master> <hostname> <port>
@@ -38,7 +40,12 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
* and then run the example
* `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
-public class JavaNetworkWordCount {
+public final class JavaNetworkWordCount {
+ private static final Pattern SPACE = Pattern.compile(" ");
+
+ private JavaNetworkWordCount() {
+ }
+
public static void main(String[] args) {
if (args.length < 3) {
System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
@@ -48,7 +55,8 @@ public class JavaNetworkWordCount {
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ new Duration(1000), System.getenv("SPARK_HOME"),
+ JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
@@ -56,18 +64,18 @@ public class JavaNetworkWordCount {
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
- return Lists.newArrayList(x.split(" "));
+ return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
- public Tuple2<String, Integer> call(String s) throws Exception {
+ public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
index c8c7389dd1..e05551ab83 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaQueueStream.java
@@ -31,8 +31,11 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
-public class JavaQueueStream {
- public static void main(String[] args) throws InterruptedException {
+public final class JavaQueueStream {
+ private JavaQueueStream() {
+ }
+
+ public static void main(String[] args) throws Exception {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
@@ -40,7 +43,7 @@ public class JavaQueueStream {
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
+ System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
@@ -62,14 +65,14 @@ public class JavaQueueStream {
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
- public Tuple2<Integer, Integer> call(Integer i) throws Exception {
+ public Tuple2<Integer, Integer> call(Integer i) {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
- public Integer call(Integer i1, Integer i2) throws Exception {
+ public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});