From f4e40661912af2a23e250a49f72f00675172e2de Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 30 Dec 2013 11:13:24 -0800 Subject: Refactored kafka, flume, zeromq, mqtt as separate external projects, with their own self-contained scala API, java API, scala unit tests and java unit tests. Updated examples to use the external projects. --- .../apache/spark/streaming/examples/JavaFlumeEventCount.java | 11 ++++++----- .../apache/spark/streaming/examples/JavaKafkaWordCount.java | 4 +++- .../org/apache/spark/streaming/examples/FlumeEventCount.scala | 1 + .../org/apache/spark/streaming/examples/KafkaWordCount.scala | 1 + .../org/apache/spark/streaming/examples/MQTTWordCount.scala | 10 +++++----- .../org/apache/spark/streaming/examples/ZeroMQWordCount.scala | 6 ++++-- 6 files changed, 20 insertions(+), 13 deletions(-) (limited to 'examples/src') diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index 261813bf2f..e53c4f9e83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; +import org.apache.spark.streaming.flume.JavaStreamingContextWithFlume; +import org.apache.spark.streaming.flume.SparkFlumeEvent; /** * Produces a count of events received from Flume. @@ -49,10 +50,10 @@ public class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR")); - - JavaDStream flumeStream = sc.flumeStream("localhost", port); + JavaStreamingContextWithFlume sscWithFlume = new JavaStreamingContextWithFlume(ssc); + JavaDStream flumeStream = sscWithFlume.flumeStream("localhost", port); flumeStream.count(); @@ -63,6 +64,6 @@ public class JavaFlumeEventCount { } }).print(); - sc.start(); + ssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index 22994fb2ec..de0420ca83 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -29,6 +29,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.JavaStreamingContextWithKafka; import scala.Tuple2; /** @@ -63,7 +64,8 @@ public class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream messages = ssc.kafkaStream(args[1], args[2], topicMap); + JavaStreamingContextWithKafka sscWithKafka = new JavaStreamingContextWithKafka(ssc); + JavaPairDStream messages = sscWithKafka.kafkaStream(args[1], args[2], topicMap); JavaDStream lines = messages.map(new Function, String>() { @Override diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 9f6e163454..f1641e299f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ /** * Produces a count of events received from Flume. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 570ba4c81a..833c83a5ef 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -24,6 +24,7 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.util.RawTextHelper._ +import org.apache.spark.streaming.kafka._ /** * Consumes messages from one or more topics in Kafka and does wordcount. diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index ff332a0282..edb46ac1da 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,11 +17,6 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.MQTTReceiver -import org.apache.spark.storage.StorageLevel - import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index e83ce78aa5..037b96e9eb 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe import akka.util.ByteString +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. -- cgit v1.2.3