diff options
author | Patrick Wendell <pwendell@gmail.com> | 2014-01-07 22:21:52 -0800 |
---|---|---|
committer | Patrick Wendell <pwendell@gmail.com> | 2014-01-07 22:21:52 -0800 |
commit | c0f0155eca6405d0768a476f0be00594e478fce0 (patch) | |
tree | 0ceb61d8adf28cc47291e055ff0fdc503766f571 /examples | |
parent | f5f12dc28218f3ed89836434ab0530e88b043e47 (diff) | |
parent | 8f02f1c3d45ee553ed6bec2dc81fbae4435274fc (diff) | |
download | spark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.gz spark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.bz2 spark-c0f0155eca6405d0768a476f0be00594e478fce0.zip |
Merge pull request #313 from tdas/project-refactor
Refactored the streaming project to separate external libraries like Twitter, Kafka, Flume, etc.
At a high level, these are the following changes.
1. All the external code was put in `SPARK_HOME/external/` as separate SBT projects and Maven modules. Their artifact names are `spark-streaming-twitter`, `spark-streaming-kafka`, etc. Both SparkBuild.scala and pom.xml files have been updated. References to external libraries and repositories have been removed from the settings of root and streaming projects/modules.
2. To avail the external functionality (say, creating a Twitter stream), the developer has to `import org.apache.spark.streaming.twitter._` . For Scala API, the developer has to call `TwitterUtils.createStream(streamingContext, ...)`. For the Java API, the developer has to call `TwitterUtils.createStream(javaStreamingContext, ...)`.
3. Each external project has its own scala and java unit tests. Note the unit tests of each external library use classes of the streaming unit tests (`TestSuiteBase`, `LocalJavaStreamingContext`, etc.). To enable this code sharing among test classes, `dependsOn(streaming % "compile->compile,test->test")` was used in the SparkBuild.scala . In the streaming/pom.xml, an additional `maven-jar-plugin` was necessary to capture this dependency (see comment inside the pom.xml for more information).
4. Jars of the external projects have been added to examples project but not to the assembly project.
5. In some files, imports have been rearrange to conform to the Spark coding guidelines.
Diffstat (limited to 'examples')
10 files changed, 58 insertions, 41 deletions
diff --git a/examples/pom.xml b/examples/pom.xml index 7e41bef252..cb4f7ee33b 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -72,6 +72,31 @@ <scope>provided</scope> </dependency> <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.spark</groupId> + <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> <version>0.94.6</version> @@ -87,21 +112,6 @@ </exclusions> </dependency> <dependency> - <groupId>com.sksamuel.kafka</groupId> - <artifactId>kafka_${scala.binary.version}</artifactId> - <version>0.8.0-beta1</version> - <exclusions> - <exclusion> - <groupId>com.sun.jmx</groupId> - <artifactId>jmxri</artifactId> - </exclusion> - <exclusion> - <groupId>com.sun.jdmk</groupId> - <artifactId>jmxtools</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-server</artifactId> </dependency> @@ -174,7 +184,7 @@ <artifactId>maven-shade-plugin</artifactId> <configuration> <shadedArtifactAttached>false</shadedArtifactAttached> - <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile> + <outputFile>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile> <artifactSet> <includes> <include>*:*</include> diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java index fd683ce0d3..b11cfa667e 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java @@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples; import org.apache.spark.api.java.function.Function; import org.apache.spark.streaming.*; import org.apache.spark.streaming.api.java.*; -import org.apache.spark.streaming.dstream.SparkFlumeEvent; +import org.apache.spark.streaming.flume.FlumeUtils; +import org.apache.spark.streaming.flume.SparkFlumeEvent; /** * Produces a count of events received from Flume. @@ -52,11 +53,10 @@ public final class JavaFlumeEventCount { Duration batchInterval = new Duration(2000); - JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, + JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval, System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class)); - - JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port); + JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port); flumeStream.count(); @@ -67,6 +67,6 @@ public final class JavaFlumeEventCount { } }).print(); - sc.start(); + ssc.start(); } } diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java index d8b4f4dddd..16b8a948e6 100644 --- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java +++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java @@ -30,6 +30,7 @@ import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; +import org.apache.spark.streaming.kafka.KafkaUtils; import scala.Tuple2; /** @@ -59,7 +60,7 @@ public final class JavaKafkaWordCount { } // Create the context with a 1 second batch size - JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount", + JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount", new Duration(2000), System.getenv("SPARK_HOME"), JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class)); @@ -70,7 +71,7 @@ public final class JavaKafkaWordCount { topicMap.put(topic, numThreads); } - JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap); + JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override @@ -100,6 +101,6 @@ public final class JavaKafkaWordCount { }); wordCounts.print(); - ssc.start(); + jssc.start(); } } diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala index 5ef1928294..ae3709b3d9 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.util.IntParam import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming._ +import org.apache.spark.streaming.flume._ /** * Produces a count of events received from Flume. @@ -51,7 +52,7 @@ object FlumeEventCount { System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) // Create a flume stream - val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY) + val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2) // Print out the count of events received from this server in each batch stream.count().map(cnt => "Received " + cnt + " flume events." ).print() diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala index 197461655e..31a94bd224 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala @@ -24,6 +24,7 @@ import kafka.producer._ import org.apache.spark.streaming._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.streaming.util.RawTextHelper._ +import org.apache.spark.streaming.kafka._ /** * Consumes messages from one or more topics in Kafka and does wordcount. @@ -52,7 +53,7 @@ object KafkaWordCount { ssc.checkpoint("checkpoint") val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap - val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2) + val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1l)) .reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala index 2d02ef77c0..325290b66f 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala @@ -17,11 +17,6 @@ package org.apache.spark.streaming.examples -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ -import org.apache.spark.streaming.dstream.MQTTReceiver -import org.apache.spark.storage.StorageLevel - import org.eclipse.paho.client.mqttv3.MqttClient import org.eclipse.paho.client.mqttv3.MqttClientPersistence import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence @@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException import org.eclipse.paho.client.mqttv3.MqttMessage import org.eclipse.paho.client.mqttv3.MqttTopic +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.mqtt._ + /** * A simple Mqtt publisher for demonstration purposes, repeatedly publishes * Space separated String Message "hello mqtt demo for spark streaming" @@ -97,7 +97,7 @@ object MQTTWordCount { val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY) + val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2) val words = lines.flatMap(x => x.toString.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala index 9d21d3178f..3ccdc908e2 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala @@ -23,6 +23,8 @@ import com.twitter.algebird._ import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ + /** * Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute * windowed and global Top-K estimates of user IDs occurring in a Twitter stream. @@ -33,7 +35,7 @@ import org.apache.spark.SparkContext._ * <p> * <p> * <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/"> - * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure + * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure * for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc), * that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the * estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold @@ -61,7 +63,7 @@ object TwitterAlgebirdCMS { val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala index 5111e6f62a..c7e83e76b0 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala @@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel import com.twitter.algebird.HyperLogLog._ import com.twitter.algebird.HyperLogLogMonoid -import org.apache.spark.streaming.dstream.TwitterInputDStream +import org.apache.spark.streaming.twitter._ /** * Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute @@ -50,7 +50,7 @@ object TwitterAlgebirdHLL { val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER) + val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER) val users = stream.map(status => status.getUser.getId) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala index 7a3df687b7..e2b0418d55 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples import org.apache.spark.streaming.{Seconds, StreamingContext} import StreamingContext._ import org.apache.spark.SparkContext._ +import org.apache.spark.streaming.twitter._ /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter @@ -39,7 +40,7 @@ object TwitterPopularTags { val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2), System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass)) - val stream = ssc.twitterStream(None, filters) + val stream = TwitterUtils.createStream(ssc, None, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala index beda73a71b..03902ec353 100644 --- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala +++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala @@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples import akka.actor.ActorSystem import akka.actor.actorRef2Scala import akka.zeromq._ -import org.apache.spark.streaming.{ Seconds, StreamingContext } -import org.apache.spark.streaming.StreamingContext._ import akka.zeromq.Subscribe import akka.util.ByteString +import org.apache.spark.streaming.{Seconds, StreamingContext} +import org.apache.spark.streaming.StreamingContext._ +import org.apache.spark.streaming.zeromq._ + /** * A simple publisher for demonstration purposes, repeatedly publishes random Messages * every one second. @@ -83,11 +85,10 @@ object ZeroMQWordCount { def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator //For this stream, a zeroMQ publisher should be running. - val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator) + val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() } - } |