aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2014-01-07 22:21:52 -0800
committerPatrick Wendell <pwendell@gmail.com>2014-01-07 22:21:52 -0800
commitc0f0155eca6405d0768a476f0be00594e478fce0 (patch)
tree0ceb61d8adf28cc47291e055ff0fdc503766f571 /examples
parentf5f12dc28218f3ed89836434ab0530e88b043e47 (diff)
parent8f02f1c3d45ee553ed6bec2dc81fbae4435274fc (diff)
downloadspark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.gz
spark-c0f0155eca6405d0768a476f0be00594e478fce0.tar.bz2
spark-c0f0155eca6405d0768a476f0be00594e478fce0.zip
Merge pull request #313 from tdas/project-refactor
Refactored the streaming project to separate external libraries like Twitter, Kafka, Flume, etc. At a high level, these are the following changes. 1. All the external code was put in `SPARK_HOME/external/` as separate SBT projects and Maven modules. Their artifact names are `spark-streaming-twitter`, `spark-streaming-kafka`, etc. Both SparkBuild.scala and pom.xml files have been updated. References to external libraries and repositories have been removed from the settings of root and streaming projects/modules. 2. To avail the external functionality (say, creating a Twitter stream), the developer has to `import org.apache.spark.streaming.twitter._` . For Scala API, the developer has to call `TwitterUtils.createStream(streamingContext, ...)`. For the Java API, the developer has to call `TwitterUtils.createStream(javaStreamingContext, ...)`. 3. Each external project has its own scala and java unit tests. Note the unit tests of each external library use classes of the streaming unit tests (`TestSuiteBase`, `LocalJavaStreamingContext`, etc.). To enable this code sharing among test classes, `dependsOn(streaming % "compile->compile,test->test")` was used in the SparkBuild.scala . In the streaming/pom.xml, an additional `maven-jar-plugin` was necessary to capture this dependency (see comment inside the pom.xml for more information). 4. Jars of the external projects have been added to examples project but not to the assembly project. 5. In some files, imports have been rearrange to conform to the Spark coding guidelines.
Diffstat (limited to 'examples')
-rw-r--r--examples/pom.xml42
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java10
-rw-r--r--examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java7
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala12
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala6
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala4
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala3
-rw-r--r--examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala9
10 files changed, 58 insertions, 41 deletions
diff --git a/examples/pom.xml b/examples/pom.xml
index 7e41bef252..cb4f7ee33b 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -72,6 +72,31 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-twitter_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-flume_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-zeromq_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-streaming-mqtt_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase</artifactId>
<version>0.94.6</version>
@@ -87,21 +112,6 @@
</exclusions>
</dependency>
<dependency>
- <groupId>com.sksamuel.kafka</groupId>
- <artifactId>kafka_${scala.binary.version}</artifactId>
- <version>0.8.0-beta1</version>
- <exclusions>
- <exclusion>
- <groupId>com.sun.jmx</groupId>
- <artifactId>jmxri</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.sun.jdmk</groupId>
- <artifactId>jmxtools</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
@@ -174,7 +184,7 @@
<artifactId>maven-shade-plugin</artifactId>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
- <outputFile>${project.build.directory}/scala-${scala.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
+ <outputFile>${project.build.directory}/scala-${scala.binary.version}/${project.artifactId}-assembly-${project.version}.jar</outputFile>
<artifactSet>
<includes>
<include>*:*</include>
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
index fd683ce0d3..b11cfa667e 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaFlumeEventCount.java
@@ -20,7 +20,8 @@ package org.apache.spark.streaming.examples;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.streaming.*;
import org.apache.spark.streaming.api.java.*;
-import org.apache.spark.streaming.dstream.SparkFlumeEvent;
+import org.apache.spark.streaming.flume.FlumeUtils;
+import org.apache.spark.streaming.flume.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
@@ -52,11 +53,10 @@ public final class JavaFlumeEventCount {
Duration batchInterval = new Duration(2000);
- JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
+ JavaStreamingContext ssc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaFlumeEventCount.class));
-
- JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
+ JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, "localhost", port);
flumeStream.count();
@@ -67,6 +67,6 @@ public final class JavaFlumeEventCount {
}
}).print();
- sc.start();
+ ssc.start();
}
}
diff --git a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
index d8b4f4dddd..16b8a948e6 100644
--- a/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
+++ b/examples/src/main/java/org/apache/spark/streaming/examples/JavaKafkaWordCount.java
@@ -30,6 +30,7 @@ import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
+import org.apache.spark.streaming.kafka.KafkaUtils;
import scala.Tuple2;
/**
@@ -59,7 +60,7 @@ public final class JavaKafkaWordCount {
}
// Create the context with a 1 second batch size
- JavaStreamingContext ssc = new JavaStreamingContext(args[0], "KafkaWordCount",
+ JavaStreamingContext jssc = new JavaStreamingContext(args[0], "KafkaWordCount",
new Duration(2000), System.getenv("SPARK_HOME"),
JavaStreamingContext.jarOfClass(JavaKafkaWordCount.class));
@@ -70,7 +71,7 @@ public final class JavaKafkaWordCount {
topicMap.put(topic, numThreads);
}
- JavaPairDStream<String, String> messages = ssc.kafkaStream(args[1], args[2], topicMap);
+ JavaPairDStream<String, String> messages = KafkaUtils.createStream(jssc, args[1], args[2], topicMap);
JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
@Override
@@ -100,6 +101,6 @@ public final class JavaKafkaWordCount {
});
wordCounts.print();
- ssc.start();
+ jssc.start();
}
}
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
index 5ef1928294..ae3709b3d9 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/FlumeEventCount.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.util.IntParam
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
+import org.apache.spark.streaming.flume._
/**
* Produces a count of events received from Flume.
@@ -51,7 +52,7 @@ object FlumeEventCount {
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
// Create a flume stream
- val stream = ssc.flumeStream(host,port,StorageLevel.MEMORY_ONLY)
+ val stream = FlumeUtils.createStream(ssc, host,port,StorageLevel.MEMORY_ONLY_SER_2)
// Print out the count of events received from this server in each batch
stream.count().map(cnt => "Received " + cnt + " flume events." ).print()
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
index 197461655e..31a94bd224 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/KafkaWordCount.scala
@@ -24,6 +24,7 @@ import kafka.producer._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.util.RawTextHelper._
+import org.apache.spark.streaming.kafka._
/**
* Consumes messages from one or more topics in Kafka and does wordcount.
@@ -52,7 +53,7 @@ object KafkaWordCount {
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream(zkQuorum, group, topicpMap).map(_._2)
+ val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l))
.reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
index 2d02ef77c0..325290b66f 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/MQTTWordCount.scala
@@ -17,11 +17,6 @@
package org.apache.spark.streaming.examples
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
-import org.apache.spark.streaming.dstream.MQTTReceiver
-import org.apache.spark.storage.StorageLevel
-
import org.eclipse.paho.client.mqttv3.MqttClient
import org.eclipse.paho.client.mqttv3.MqttClientPersistence
import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence
@@ -29,6 +24,11 @@ import org.eclipse.paho.client.mqttv3.MqttException
import org.eclipse.paho.client.mqttv3.MqttMessage
import org.eclipse.paho.client.mqttv3.MqttTopic
+import org.apache.spark.storage.StorageLevel
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.mqtt._
+
/**
* A simple Mqtt publisher for demonstration purposes, repeatedly publishes
* Space separated String Message "hello mqtt demo for spark streaming"
@@ -97,7 +97,7 @@ object MQTTWordCount {
val ssc = new StreamingContext(master, "MqttWordCount", Seconds(2), System.getenv("SPARK_HOME"),
StreamingContext.jarOfClass(this.getClass))
- val lines = ssc.mqttStream(brokerUrl, topic, StorageLevel.MEMORY_ONLY)
+ val lines = MQTTUtils.createStream(ssc, brokerUrl, topic, StorageLevel.MEMORY_ONLY_SER_2)
val words = lines.flatMap(x => x.toString.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
index 9d21d3178f..3ccdc908e2 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdCMS.scala
@@ -23,6 +23,8 @@ import com.twitter.algebird._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
+
/**
* Illustrates the use of the Count-Min Sketch, from Twitter's Algebird library, to compute
* windowed and global Top-K estimates of user IDs occurring in a Twitter stream.
@@ -33,7 +35,7 @@ import org.apache.spark.SparkContext._
* <p>
* <p>
* <a href="http://highlyscalable.wordpress.com/2012/05/01/probabilistic-structures-web-analytics-data-mining/">
- * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a datastructure
+ * This blog post</a> has a good overview of the Count-Min Sketch (CMS). The CMS is a data structure
* for approximate frequency estimation in data streams (e.g. Top-K elements, frequency of any given element, etc),
* that uses space sub-linear in the number of elements in the stream. Once elements are added to the CMS, the
* estimated count of an element can be computed, as well as "heavy-hitters" that occur more than a threshold
@@ -61,7 +63,7 @@ object TwitterAlgebirdCMS {
val ssc = new StreamingContext(master, "TwitterAlgebirdCMS", Seconds(10),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER_2)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
index 5111e6f62a..c7e83e76b0 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterAlgebirdHLL.scala
@@ -21,7 +21,7 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
-import org.apache.spark.streaming.dstream.TwitterInputDStream
+import org.apache.spark.streaming.twitter._
/**
* Illustrates the use of the HyperLogLog algorithm, from Twitter's Algebird library, to compute
@@ -50,7 +50,7 @@ object TwitterAlgebirdHLL {
val ssc = new StreamingContext(master, "TwitterAlgebirdHLL", Seconds(5),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters, StorageLevel.MEMORY_ONLY_SER)
+ val stream = TwitterUtils.createStream(ssc, None, filters, StorageLevel.MEMORY_ONLY_SER)
val users = stream.map(status => status.getUser.getId)
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
index 7a3df687b7..e2b0418d55 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/TwitterPopularTags.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.examples
import org.apache.spark.streaming.{Seconds, StreamingContext}
import StreamingContext._
import org.apache.spark.SparkContext._
+import org.apache.spark.streaming.twitter._
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
@@ -39,7 +40,7 @@ object TwitterPopularTags {
val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2),
System.getenv("SPARK_HOME"), StreamingContext.jarOfClass(this.getClass))
- val stream = ssc.twitterStream(None, filters)
+ val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
diff --git a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
index beda73a71b..03902ec353 100644
--- a/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
+++ b/examples/src/main/scala/org/apache/spark/streaming/examples/ZeroMQWordCount.scala
@@ -20,11 +20,13 @@ package org.apache.spark.streaming.examples
import akka.actor.ActorSystem
import akka.actor.actorRef2Scala
import akka.zeromq._
-import org.apache.spark.streaming.{ Seconds, StreamingContext }
-import org.apache.spark.streaming.StreamingContext._
import akka.zeromq.Subscribe
import akka.util.ByteString
+import org.apache.spark.streaming.{Seconds, StreamingContext}
+import org.apache.spark.streaming.StreamingContext._
+import org.apache.spark.streaming.zeromq._
+
/**
* A simple publisher for demonstration purposes, repeatedly publishes random Messages
* every one second.
@@ -83,11 +85,10 @@ object ZeroMQWordCount {
def bytesToStringIterator(x: Seq[ByteString]) = (x.map(_.utf8String)).iterator
//For this stream, a zeroMQ publisher should be running.
- val lines = ssc.zeroMQStream(url, Subscribe(topic), bytesToStringIterator)
+ val lines = ZeroMQUtils.createStream(ssc, url, Subscribe(topic), bytesToStringIterator _)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
}
-
}