aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java8
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java2
-rw-r--r--examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java2
-rw-r--r--external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java4
-rw-r--r--streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala2
11 files changed, 16 insertions, 14 deletions
diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
index 984909cb94..df901997e1 100644
--- a/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
+++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaStreamingTestExample.java
@@ -58,7 +58,7 @@ public class JavaStreamingTestExample {
private static int timeoutCounter = 0;
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length != 3) {
System.err.println("Usage: JavaStreamingTestExample " +
"<dataDir> <batchDuration> <numBatchesTimeout>");
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
index 4544ad2b42..1cba565b38 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaCustomReceiver.java
@@ -58,7 +58,7 @@ import java.util.regex.Pattern;
public class JavaCustomReceiver extends Receiver<String> {
private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaCustomReceiver <hostname> <port>");
System.exit(1);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
index 769b21cecf..ed118f86c0 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaDirectKafkaWordCount.java
@@ -21,6 +21,8 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Arrays;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import java.util.regex.Pattern;
import scala.Tuple2;
@@ -47,7 +49,7 @@ import org.apache.spark.streaming.Durations;
public final class JavaDirectKafkaWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaDirectKafkaWordCount <brokers> <topics>\n" +
" <brokers> is a list of one or more Kafka brokers\n" +
@@ -64,8 +66,8 @@ public final class JavaDirectKafkaWordCount {
SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
- HashSet<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
- HashMap<String, String> kafkaParams = new HashMap<>();
+ Set<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
+ Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("metadata.broker.list", brokers);
// Create direct kafka stream with brokers and topics
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
index bae4b78ac2..33c0a2df2f 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaFlumeEventCount.java
@@ -43,7 +43,7 @@ public final class JavaFlumeEventCount {
private JavaFlumeEventCount() {
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: JavaFlumeEventCount <host> <port>");
System.exit(1);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
index 655da6840c..8a5fd53372 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaKafkaWordCount.java
@@ -57,7 +57,7 @@ public final class JavaKafkaWordCount {
private JavaKafkaWordCount() {
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
index 5761da684b..7a8fe99f48 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaNetworkWordCount.java
@@ -48,7 +48,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public final class JavaNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
index e5fb2bfbfa..0563149448 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaRecoverableNetworkWordCount.java
@@ -183,7 +183,7 @@ public final class JavaRecoverableNetworkWordCount {
return ssc;
}
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length != 4) {
System.err.println("You arguments were " + Arrays.asList(args));
System.err.println(
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
index 4b9d9efc85..7aa8862761 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaSqlNetworkWordCount.java
@@ -53,7 +53,7 @@ import org.apache.spark.streaming.api.java.JavaStreamingContext;
public final class JavaSqlNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCount <hostname> <port>");
System.exit(1);
diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
index 4230dab52e..ed36df852a 100644
--- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
+++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java
@@ -50,7 +50,7 @@ import org.apache.spark.streaming.api.java.*;
public class JavaStatefulNetworkWordCount {
private static final Pattern SPACE = Pattern.compile(" ");
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
if (args.length < 2) {
System.err.println("Usage: JavaStatefulNetworkWordCount <hostname> <port>");
System.exit(1);
diff --git a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
index 0e43e9272d..d40bd3ff56 100644
--- a/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
+++ b/external/kinesis-asl/src/main/java/org/apache/spark/examples/streaming/JavaKinesisWordCountASL.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.regex.Pattern;
import com.amazonaws.regions.RegionUtils;
-import org.apache.log4j.Logger;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
@@ -81,9 +80,8 @@ import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionIn
*/
public final class JavaKinesisWordCountASL { // needs to be public for access from run-example
private static final Pattern WORD_SEPARATOR = Pattern.compile(" ");
- private static final Logger logger = Logger.getLogger(JavaKinesisWordCountASL.class);
- public static void main(String[] args) {
+ public static void main(String[] args) throws Exception {
// Check that all required args were passed in.
if (args.length != 3) {
System.err.println(
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
index 922e4a5e4d..7e78fa1d7e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingContext.scala
@@ -558,6 +558,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*/
+ @throws[InterruptedException]
def awaitTermination(): Unit = {
ssc.awaitTermination()
}
@@ -570,6 +571,7 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
* @return `true` if it's stopped; or throw the reported error during the execution; or `false`
* if the waiting time elapsed before returning from the method.
*/
+ @throws[InterruptedException]
def awaitTerminationOrTimeout(timeout: Long): Boolean = {
ssc.awaitTerminationOrTimeout(timeout)
}