aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java/org
diff options
context:
space:
mode:
Diffstat (limited to 'examples/src/main/java/org')
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java13
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaPageRank.java15
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaTC.java24
-rw-r--r--examples/src/main/java/org/apache/spark/examples/JavaWordCount.java12
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java22
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java22
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java18
-rw-r--r--examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java5
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java19
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java27
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java25
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java22
14 files changed, 127 insertions, 128 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
index bd96274021..6c177de359 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaHdfsLR.java
@@ -17,6 +17,7 @@
package org.apache.spark.examples;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -103,16 +104,16 @@ public final class JavaHdfsLR {
public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: JavaHdfsLR <master> <file> <iters>");
+ if (args.length < 2) {
+ System.err.println("Usage: JavaHdfsLR <file> <iters>");
System.exit(1);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaHdfsLR",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaHdfsLR.class));
- JavaRDD<String> lines = sc.textFile(args[1]);
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<DataPoint> points = lines.map(new ParsePoint()).cache();
- int ITERATIONS = Integer.parseInt(args[2]);
+ int ITERATIONS = Integer.parseInt(args[1]);
// Initialize w to a random value
double[] w = new double[D];
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
index 3f7a879538..812e9d5580 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaLogQuery.java
@@ -20,6 +20,7 @@ package org.apache.spark.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import scala.Tuple3;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -34,6 +35,8 @@ import java.util.regex.Pattern;
/**
* Executes a roll up-style query against Apache logs.
+ *
+ * Usage: JavaLogQuery [logFile]
*/
public final class JavaLogQuery {
@@ -97,15 +100,11 @@ public final class JavaLogQuery {
}
public static void main(String[] args) {
- if (args.length == 0) {
- System.err.println("Usage: JavaLogQuery <master> [logFile]");
- System.exit(1);
- }
- JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaLogQuery",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLogQuery.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaLogQuery");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- JavaRDD<String> dataSet = (args.length == 2) ? jsc.textFile(args[1]) : jsc.parallelize(exampleApacheLogs);
+ JavaRDD<String> dataSet = (args.length == 1) ? jsc.textFile(args[0]) : jsc.parallelize(exampleApacheLogs);
JavaPairRDD<Tuple3<String, String, String>, Stats> extracted = dataSet.mapToPair(new PairFunction<String, Tuple3<String, String, String>, Stats>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
index e31f676f5f..7ea6df9c17 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaPageRank.java
@@ -18,9 +18,12 @@
package org.apache.spark.examples;
+
import scala.Tuple2;
import com.google.common.collect.Iterables;
+
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -54,20 +57,20 @@ public final class JavaPageRank {
}
public static void main(String[] args) throws Exception {
- if (args.length < 3) {
- System.err.println("Usage: JavaPageRank <master> <file> <number_of_iterations>");
+ if (args.length < 2) {
+ System.err.println("Usage: JavaPageRank <file> <number_of_iterations>");
System.exit(1);
}
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaPageRank",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaPageRank.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaPageRank");
+ JavaSparkContext ctx = new JavaSparkContext(sparkConf);
// Loads in input file. It should be in format of:
// URL neighbor URL
// URL neighbor URL
// URL neighbor URL
// ...
- JavaRDD<String> lines = ctx.textFile(args[1], 1);
+ JavaRDD<String> lines = ctx.textFile(args[0], 1);
// Loads all URLs from input file and initialize their neighbors.
JavaPairRDD<String, Iterable<String>> links = lines.mapToPair(new PairFunction<String, String, String>() {
@@ -87,7 +90,7 @@ public final class JavaPageRank {
});
// Calculates and updates URL ranks continuously using PageRank algorithm.
- for (int current = 0; current < Integer.parseInt(args[2]); current++) {
+ for (int current = 0; current < Integer.parseInt(args[1]); current++) {
// Calculates URL contributions to the rank of other URLs.
JavaPairRDD<String, Double> contribs = links.join(ranks).values()
.flatMapToPair(new PairFlatMapFunction<Tuple2<Iterable<String>, Double>, String, Double>() {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
index ac8df02c46..11157d7573 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaSparkPi.java
@@ -17,6 +17,7 @@
package org.apache.spark.examples;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -25,19 +26,18 @@ import org.apache.spark.api.java.function.Function2;
import java.util.ArrayList;
import java.util.List;
-/** Computes an approximation to pi */
+/**
+ * Computes an approximation to pi
+ * Usage: JavaSparkPi [slices]
+ */
public final class JavaSparkPi {
+
public static void main(String[] args) throws Exception {
- if (args.length == 0) {
- System.err.println("Usage: JavaSparkPi <master> [slices]");
- System.exit(1);
- }
-
- JavaSparkContext jsc = new JavaSparkContext(args[0], "JavaSparkPi",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkPi.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
+ JavaSparkContext jsc = new JavaSparkContext(sparkConf);
- int slices = (args.length == 2) ? Integer.parseInt(args[1]) : 2;
+ int slices = (args.length == 1) ? Integer.parseInt(args[0]) : 2;
int n = 100000 * slices;
List<Integer> l = new ArrayList<Integer>(n);
for (int i = 0; i < n; i++) {
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaTC.java b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
index d66b9ba265..2563fcdd23 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaTC.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaTC.java
@@ -17,19 +17,22 @@
package org.apache.spark.examples;
-import scala.Tuple2;
-import org.apache.spark.api.java.JavaPairRDD;
-import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.api.java.function.PairFunction;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import scala.Tuple2;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+
/**
* Transitive closure on a graph, implemented in Java.
+ * Usage: JavaTC [slices]
*/
public final class JavaTC {
@@ -61,14 +64,9 @@ public final class JavaTC {
}
public static void main(String[] args) {
- if (args.length == 0) {
- System.err.println("Usage: JavaTC <host> [<slices>]");
- System.exit(1);
- }
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaTC",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaTC.class));
- Integer slices = (args.length > 1) ? Integer.parseInt(args[1]): 2;
+ SparkConf sparkConf = new SparkConf().setAppName("JavaHdfsLR");
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ Integer slices = (args.length > 0) ? Integer.parseInt(args[0]): 2;
JavaPairRDD<Integer, Integer> tc = sc.parallelizePairs(generateGraph(), slices).cache();
// Linear transitive closure: each round grows paths by one edge,
diff --git a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
index 87c1b80981..9a6a944f7e 100644
--- a/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java
@@ -18,6 +18,7 @@
package org.apache.spark.examples;
import scala.Tuple2;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
@@ -33,14 +34,15 @@ public final class JavaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) throws Exception {
- if (args.length < 2) {
- System.err.println("Usage: JavaWordCount <master> <file>");
+
+ if (args.length < 1) {
+ System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}
- JavaSparkContext ctx = new JavaSparkContext(args[0], "JavaWordCount",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaWordCount.class));
- JavaRDD<String> lines = ctx.textFile(args[1], 1);
+ SparkConf sparkConf = new SparkConf().setAppName("JavaWordCount");
+ JavaSparkContext ctx = new JavaSparkContext(sparkConf);
+ JavaRDD<String> lines = ctx.textFile(args[0], 1);
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
index 4533c4c5f2..8d381d4e0a 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaALS.java
@@ -17,6 +17,7 @@
package org.apache.spark.examples.mllib;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -57,23 +58,22 @@ public final class JavaALS {
public static void main(String[] args) {
- if (args.length != 5 && args.length != 6) {
+ if (args.length < 4) {
System.err.println(
- "Usage: JavaALS <master> <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
+ "Usage: JavaALS <ratings_file> <rank> <iterations> <output_dir> [<blocks>]");
System.exit(1);
}
-
- int rank = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
- String outputDir = args[4];
+ SparkConf sparkConf = new SparkConf().setAppName("JavaALS");
+ int rank = Integer.parseInt(args[1]);
+ int iterations = Integer.parseInt(args[2]);
+ String outputDir = args[3];
int blocks = -1;
- if (args.length == 6) {
- blocks = Integer.parseInt(args[5]);
+ if (args.length == 5) {
+ blocks = Integer.parseInt(args[4]);
}
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaALS",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaALS.class));
- JavaRDD<String> lines = sc.textFile(args[1]);
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<Rating> ratings = lines.map(new ParseRating());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
index 0cfb8e69ed..f796123a25 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaKMeans.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib;
import java.util.regex.Pattern;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -48,24 +49,21 @@ public final class JavaKMeans {
}
public static void main(String[] args) {
-
- if (args.length < 4) {
+ if (args.length < 3) {
System.err.println(
- "Usage: JavaKMeans <master> <input_file> <k> <max_iterations> [<runs>]");
+ "Usage: JavaKMeans <input_file> <k> <max_iterations> [<runs>]");
System.exit(1);
}
-
- String inputFile = args[1];
- int k = Integer.parseInt(args[2]);
- int iterations = Integer.parseInt(args[3]);
+ String inputFile = args[0];
+ int k = Integer.parseInt(args[1]);
+ int iterations = Integer.parseInt(args[2]);
int runs = 1;
- if (args.length >= 5) {
- runs = Integer.parseInt(args[4]);
+ if (args.length >= 4) {
+ runs = Integer.parseInt(args[3]);
}
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaKMeans",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaKMeans.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaKMeans");
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
JavaRDD<String> lines = sc.textFile(inputFile);
JavaRDD<Vector> points = lines.map(new ParsePoint());
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
index f6e48b4987..eceb6927d5 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaLR.java
@@ -19,6 +19,7 @@ package org.apache.spark.examples.mllib;
import java.util.regex.Pattern;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -51,17 +52,16 @@ public final class JavaLR {
}
public static void main(String[] args) {
- if (args.length != 4) {
- System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
+ if (args.length != 3) {
+ System.err.println("Usage: JavaLR <input_dir> <step_size> <niters>");
System.exit(1);
}
-
- JavaSparkContext sc = new JavaSparkContext(args[0], "JavaLR",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaLR.class));
- JavaRDD<String> lines = sc.textFile(args[1]);
+ SparkConf sparkConf = new SparkConf().setAppName("JavaLR");
+ JavaSparkContext sc = new JavaSparkContext(sparkConf);
+ JavaRDD<String> lines = sc.textFile(args[0]);
JavaRDD<LabeledPoint> points = lines.map(new ParsePoint()).cache();
- double stepSize = Double.parseDouble(args[2]);
- int iterations = Integer.parseInt(args[3]);
+ double stepSize = Double.parseDouble(args[1]);
+ int iterations = Integer.parseInt(args[2]);
// Another way to configure LogisticRegression
//
@@ -73,7 +73,7 @@ public final class JavaLR {
// LogisticRegressionModel model = lr.train(points.rdd());
LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
- iterations, stepSize);
+ iterations, stepSize);
System.out.print("Final w: " + model.weights());
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index d62a72f534..ad5ec84b71 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -20,6 +20,7 @@ package org.apache.spark.examples.sql;
import java.io.Serializable;
import java.util.List;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
@@ -51,8 +52,8 @@ public class JavaSparkSQL {
}
public static void main(String[] args) throws Exception {
- JavaSparkContext ctx = new JavaSparkContext("local", "JavaSparkSQL",
- System.getenv("SPARK_HOME"), JavaSparkContext.jarOfClass(JavaSparkSQL.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaSparkSQL");
+ JavaSparkContext ctx = new JavaSparkContext(sparkConf);
JavaSQLContext sqlCtx = new JavaSQLContext(ctx);
// Load a text file and convert each line to a Java Bean.
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index a5ece68cef..400b68c221 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -17,6 +17,7 @@
package org.apache.spark.examples.streaming;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.*;
@@ -31,9 +32,8 @@ import org.apache.spark.streaming.flume.SparkFlumeEvent;
* an Avro server on at the request host:port address and listen for requests.
* Your Flume AvroSink should be pointed to this address.
*
- * Usage: JavaFlumeEventCount <master> <host> <port>
+ * Usage: JavaFlumeEventCount <host> <port>
*
- * <master> is a Spark master URL
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
@@ -43,22 +43,19 @@ public final class JavaFlumeEventCount {
}
public static void main(String[] args) {
- if (args.length != 3) {
- System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
+ if (args.length != 2) {
+ System.err.println("Usage: JavaFlumeEventCount <host> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
- String master = args[0];
- String host = args[1];
- int port = Integer.parseInt(args[2]);
+ String host = args[0];
+ int port = Integer.parseInt(args[1]);
Duration batchInterval = new Duration(2000);
-
- JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
- System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
+ SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, batchInterval);
JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index da51eb189a..6a74cc50d1 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -21,7 +21,11 @@ import java.util.Map;
import java.util.HashMap;
import java.util.regex.Pattern;
+
+import scala.Tuple2;
+
import com.google.common.collect.Lists;
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
@@ -33,19 +37,18 @@ import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
-import scala.Tuple2;
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
- * Usage: JavaKafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>
* <zkQuorum> is a list of one or more zookeeper servers that make quorum
* <group> is the name of kafka consumer group
* <topics> is a list of one or more kafka topics to consume from
* <numThreads> is the number of threads the kafka consumer should use
*
* Example:
- * `./bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount local[2] zoo01,zoo02,
+ * `./bin/spark-submit examples.jar \
+ * --class org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \
* zoo03 my-consumer-group topic1,topic2 1`
*/
@@ -56,27 +59,25 @@ public final class JavaKafkaWordCount {
}
public static void main(String[] args) {
- if (args.length < 5) {
- System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>");
+ if (args.length < 4) {
+ System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
-
+ SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with a 1 second batch size
- JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
- new Duration(2000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
+ JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
- int numThreads = Integer.parseInt(args[4]);
+ int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
- String[] topics = args[3].split(",");
+ String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
- KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
+ KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index ac84991d87..e5cbd39f43 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -17,9 +17,10 @@
package org.apache.spark.examples.streaming;
-import com.google.common.collect.Lists;
-import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import scala.Tuple2;
+import com.google.common.collect.Lists;
+
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
@@ -27,41 +28,39 @@ import org.apache.spark.examples.streaming.StreamingExamples;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import java.util.regex.Pattern;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
- * Usage: JavaNetworkWordCount <master> <hostname> <port>
- * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * Usage: JavaNetworkWordCount <hostname> <port>
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
- * `$ ./run org.apache.spark.examples.streaming.JavaNetworkWordCount local[2] localhost 9999`
+ * `$ ./bin/spark-submit examples.jar \
+ * --class org.apache.spark.examples.streaming.JavaNetworkWordCount localhost 9999`
*/
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
public static void main(String[] args) {
- if (args.length < 3) {
- System.err.println("Usage: JavaNetworkWordCount <master> <hostname> <port>\n" +
- "In local mode, <master> should be 'local[n]' with n > 1");
+ if (args.length < 2) {
+ System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
}
StreamingExamples.setStreamingLogLevels();
-
+ SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "JavaNetworkWordCount",
- new Duration(1000), System.getenv("SPARK_HOME"),
- JavaStreamingContext.jarOfClass(JavaNetworkWordCount.class));
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
- JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
+ JavaReceiverInputDStream<String> lines = ssc.socketTextStream(args[0], Integer.parseInt(args[1]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
index 819311968f..4ce8437f82 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaQueueStream.java
@@ -17,8 +17,16 @@
package org.apache.spark.examples.streaming;
-import com.google.common.collect.Lists;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
import scala.Tuple2;
+
+import com.google.common.collect.Lists;
+
+import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
@@ -28,25 +36,17 @@ import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
public final class JavaQueueStream {
private JavaQueueStream() {
}
public static void main(String[] args) throws Exception {
- if (args.length < 1) {
- System.err.println("Usage: JavaQueueStream <master>");
- System.exit(1);
- }
StreamingExamples.setStreamingLogLevels();
+ SparkConf sparkConf = new SparkConf().setAppName("JavaQueueStream");
// Create the context
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000),
- System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaQueueStream.class));
+ JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(1000));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream