aboutsummaryrefslogtreecommitdiff
path: root/examples/src/main/java
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-12-30 11:13:24 -0800
commitf4e40661912af2a23e250a49f72f00675172e2de (patch)
tree97d40d041b08bf8a320f908e7b241cce9432c014 /examples/src/main/java
parent6e43039614ed1ec55a134fb82fb3e8d4e80996ef (diff)
downloadspark-f4e40661912af2a23e250a49f72f00675172e2de.tar.gz
spark-f4e40661912af2a23e250a49f72f00675172e2de.tar.bz2
spark-f4e40661912af2a23e250a49f72f00675172e2de.zip
Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects.
Diffstat (limited to 'examples/src/main/java')
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java11
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java4
2 files changed, 9 insertions, 6 deletions
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index 261813bf2f..e53c4f9e83 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@@ -49,10 +50,10 @@ public class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+ JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc);
+ JavaDStream<SparkFlumeEvent> flumeStream = sscWithFlume.flumeStream("localhost", port);
flumeStream.count();
@@ -63,6 +64,6 @@ public class JavaFlumeEventCount {
}
}).print();
- sc.start();
+ ssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index 22994fb2ec..de0420ca83 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -29,6 +29,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka;
import scala.Tuple2;
/**
@@ -63,7 +64,8 @@ public class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+ JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc);
+ JavaPairDStream<String, String> messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override