aboutsummaryrefslogtreecommitdiff
path: root/examples/src
diff options
context:
space:
mode:
authorNick Pentreath <nick.pentreath@gmail.com>2013-02-19 13:58:05 +0200
committerNick Pentreath <nick.pentreath@gmail.com>2013-02-19 13:58:05 +0200
commit315ea069e8aeb78dde23836827bd51462208aa7a (patch)
tree69943e269aedaf22a6267b65f1412576b2ad1a95 /examples/src
parent015893f0e8983a7e249709d9820d1bf0dd74d607 (diff)
parent8b9c673fce1c733c7fcd8b978e84f943be9e9e35 (diff)
downloadspark-315ea069e8aeb78dde23836827bd51462208aa7a.tar.gz
spark-315ea069e8aeb78dde23836827bd51462208aa7a.tar.bz2
spark-315ea069e8aeb78dde23836827bd51462208aa7a.zip
Merge remote-tracking branch 'upstream/streaming' into streaming-eg-algebird
Conflicts: project/SparkBuild.scala
Diffstat (limited to 'examples/src')
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java)0
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java)2
-rw-r--r--examples/src/main/java/spark/streaming/examples/JavaQueueStream.java (renamed from examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java)0
-rw-r--r--examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala158
-rw-r--r--examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala28
-rw-r--r--examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala2
-rw-r--r--examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala)33
-rw-r--r--examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala13
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala1
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala71
10 files changed, 200 insertions, 108 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
index cddce16e39..cddce16e39 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaFlumeEventCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaFlumeEventCount.java
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
index 4299febfd6..07342beb02 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaNetworkWordCount.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaNetworkWordCount.java
@@ -35,7 +35,7 @@ public class JavaNetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
+ JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
diff --git a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
index 43c3cd4dfa..43c3cd4dfa 100644
--- a/examples/src/main/scala/spark/streaming/examples/JavaQueueStream.java
+++ b/examples/src/main/java/spark/streaming/examples/JavaQueueStream.java
diff --git a/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
new file mode 100644
index 0000000000..71b4e5bf1a
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/ActorWordCount.scala
@@ -0,0 +1,158 @@
+package spark.streaming.examples
+
+import scala.collection.mutable.LinkedList
+import scala.util.Random
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.actor.Props
+import akka.actor.actorRef2Scala
+
+import spark.streaming.Seconds
+import spark.streaming.StreamingContext
+import spark.streaming.StreamingContext.toPairDStreamFunctions
+import spark.streaming.receivers.Receiver
+import spark.util.AkkaUtils
+
+case class SubscribeReceiver(receiverActor: ActorRef)
+case class UnsubscribeReceiver(receiverActor: ActorRef)
+
+/**
+ * Sends the random content to every receiver subscribed with 1/2
+ * second delay.
+ */
+class FeederActor extends Actor {
+
+ val rand = new Random()
+ var receivers: LinkedList[ActorRef] = new LinkedList[ActorRef]()
+
+ val strings: Array[String] = Array("words ", "may ", "count ")
+
+ def makeMessage(): String = {
+ val x = rand.nextInt(3)
+ strings(x) + strings(2 - x)
+ }
+
+ /*
+ * A thread to generate random messages
+ */
+ new Thread() {
+ override def run() {
+ while (true) {
+ Thread.sleep(500)
+ receivers.foreach(_ ! makeMessage)
+ }
+ }
+ }.start()
+
+ def receive: Receive = {
+
+ case SubscribeReceiver(receiverActor: ActorRef) =>
+ println("received subscribe from %s".format(receiverActor.toString))
+ receivers = LinkedList(receiverActor) ++ receivers
+
+ case UnsubscribeReceiver(receiverActor: ActorRef) =>
+ println("received unsubscribe from %s".format(receiverActor.toString))
+ receivers = receivers.dropWhile(x => x eq receiverActor)
+
+ }
+}
+
+/**
+ * A sample actor as receiver, is also simplest. This receiver actor
+ * goes and subscribe to a typical publisher/feeder actor and receives
+ * data.
+ *
+ * @see [[spark.streaming.examples.FeederActor]]
+ */
+class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
+extends Actor with Receiver {
+
+ lazy private val remotePublisher = context.actorFor(urlOfPublisher)
+
+ override def preStart = remotePublisher ! SubscribeReceiver(context.self)
+
+ def receive = {
+ case msg ⇒ context.parent ! pushBlock(msg.asInstanceOf[T])
+ }
+
+ override def postStop() = remotePublisher ! UnsubscribeReceiver(context.self)
+
+}
+
+/**
+ * A sample feeder actor
+ *
+ * Usage: FeederActor <hostname> <port>
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder would start on.
+ */
+object FeederActor {
+
+ def main(args: Array[String]) {
+ if(args.length < 2){
+ System.err.println(
+ "Usage: FeederActor <hostname> <port>\n"
+ )
+ System.exit(1)
+ }
+ val Seq(host, port) = args.toSeq
+
+
+ val actorSystem = AkkaUtils.createActorSystem("test", host, port.toInt)._1
+ val feeder = actorSystem.actorOf(Props[FeederActor], "FeederActor")
+
+ println("Feeder started as:" + feeder)
+
+ actorSystem.awaitTermination();
+ }
+}
+
+/**
+ * A sample word count program demonstrating the use of plugging in
+ * Actor as Receiver
+ * Usage: ActorWordCount <master> <hostname> <port>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <hostname> and <port> describe the AkkaSystem that Spark Sample feeder is running on.
+ *
+ * To run this example locally, you may run Feeder Actor as
+ * `$ ./run spark.streaming.examples.FeederActor 127.0.1.1 9999`
+ * and then run the example
+ * `$ ./run spark.streaming.examples.ActorWordCount local[2] 127.0.1.1 9999`
+ */
+object ActorWordCount {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println(
+ "Usage: ActorWordCount <master> <hostname> <port>" +
+ "In local mode, <master> should be 'local[n]' with n > 1")
+ System.exit(1)
+ }
+
+ val Seq(master, host, port) = args.toSeq
+
+ // Create the context and set the batch size
+ val ssc = new StreamingContext(master, "ActorWordCount",
+ Seconds(10))
+
+ /*
+ * Following is the use of actorStream to plug in custom actor as receiver
+ *
+ * An important point to note:
+ * Since Actor may exist outside the spark framework, It is thus user's responsibility
+ * to ensure the type safety, i.e type of data received and InputDstream
+ * should be same.
+ *
+ * For example: Both actorStream and SampleActorReceiver are parameterized
+ * to same type to ensure type safety.
+ */
+
+ val lines = ssc.actorStream[String](
+ Props(new SampleActorReceiver[String]("akka://spark@%s:%s/user/FeederActor".format(
+ host, port.toInt))), "SampleReceiver")
+
+ //compute wordcount
+ lines.flatMap(_.split("\\s+")).map(x => (x, 1)).reduceByKey(_ + _).print()
+
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
index fe55db6e2c..9b135a5c54 100644
--- a/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/KafkaWordCount.scala
@@ -10,22 +10,34 @@ import spark.streaming.StreamingContext._
import spark.storage.StorageLevel
import spark.streaming.util.RawTextHelper._
+/**
+ * Consumes messages from one or more topics in Kafka and does wordcount.
+ * Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>
+ * <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
+ * <zkQuorum> is a list of one or more zookeeper servers that make quorum
+ * <group> is the name of kafka consumer group
+ * <topics> is a list of one or more kafka topics to consume from
+ * <numThreads> is the number of threads the kafka consumer should use
+ *
+ * Example:
+ * `./run spark.streaming.examples.KafkaWordCount local[2] zoo01,zoo02,zoo03 my-consumer-group topic1,topic2 1`
+ */
object KafkaWordCount {
def main(args: Array[String]) {
- if (args.length < 6) {
- System.err.println("Usage: KafkaWordCount <master> <hostname> <port> <group> <topics> <numThreads>")
+ if (args.length < 5) {
+ System.err.println("Usage: KafkaWordCount <master> <zkQuorum> <group> <topics> <numThreads>")
System.exit(1)
}
- val Array(master, hostname, port, group, topics, numThreads) = args
+ val Array(master, zkQuorum, group, topics, numThreads) = args
val sc = new SparkContext(master, "KafkaWordCount")
val ssc = new StreamingContext(sc, Seconds(2))
ssc.checkpoint("checkpoint")
val topicpMap = topics.split(",").map((_,numThreads.toInt)).toMap
- val lines = ssc.kafkaStream[String](hostname, port.toInt, group, topicpMap)
+ val lines = ssc.kafkaStream[String](zkQuorum, group, topicpMap)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1l)).reduceByKeyAndWindow(add _, subtract _, Minutes(10), Seconds(2), 2)
wordCounts.print()
@@ -38,16 +50,16 @@ object KafkaWordCount {
object KafkaWordCountProducer {
def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: KafkaWordCountProducer <hostname> <port> <topic> <messagesPerSec> <wordsPerMessage>")
+ if (args.length < 2) {
+ System.err.println("Usage: KafkaWordCountProducer <zkQuorum> <topic> <messagesPerSec> <wordsPerMessage>")
System.exit(1)
}
- val Array(hostname, port, topic, messagesPerSec, wordsPerMessage) = args
+ val Array(zkQuorum, topic, messagesPerSec, wordsPerMessage) = args
// Zookeper connection properties
val props = new Properties()
- props.put("zk.connect", hostname + ":" + port)
+ props.put("zk.connect", zkQuorum)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val config = new ProducerConfig(props)
diff --git a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
index 32f7d57bea..7ff70ae2e5 100644
--- a/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
+++ b/examples/src/main/scala/spark/streaming/examples/NetworkWordCount.scala
@@ -27,7 +27,7 @@ object NetworkWordCount {
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
- val lines = ssc.networkTextStream(args(1), args(2).toInt)
+ val lines = ssc.socketTextStream(args(1), args(2).toInt)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
index 377bc0c98e..fdb3a4c73c 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala
@@ -1,19 +1,19 @@
-package spark.streaming.examples.twitter
+package spark.streaming.examples
-import spark.streaming.StreamingContext._
import spark.streaming.{Seconds, StreamingContext}
+import StreamingContext._
import spark.SparkContext._
-import spark.storage.StorageLevel
/**
* Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter
* stream. The stream is instantiated with credentials and optionally filters supplied by the
* command line arguments.
+ *
*/
-object TwitterBasic {
+object TwitterPopularTags {
def main(args: Array[String]) {
if (args.length < 3) {
- System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" +
" [filter1] [filter2] ... [filter n]")
System.exit(1)
}
@@ -21,10 +21,8 @@ object TwitterBasic {
val Array(master, username, password) = args.slice(0, 3)
val filters = args.slice(3, args.length)
- val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters,
- StorageLevel.MEMORY_ONLY_SER)
- ssc.registerInputStream(stream)
+ val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2))
+ val stream = ssc.twitterStream(username, password, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
@@ -39,22 +37,17 @@ object TwitterBasic {
// Print popular hashtags
topCounts60.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
topCounts10.foreach(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(5)
- println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
- topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
- }
+ val topList = rdd.take(5)
+ println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
+ topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))}
})
ssc.start()
}
-
}
diff --git a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
index a191321d91..fba72519a9 100644
--- a/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/clickstream/PageViewStream.scala
@@ -27,17 +27,16 @@ object PageViewStream {
val ssc = new StreamingContext("local[2]", "PageViewStream", Seconds(1))
// Create a NetworkInputDStream on target host:port and convert each line to a PageView
- val pageViews = ssc.networkTextStream(host, port)
- .flatMap(_.split("\n"))
- .map(PageView.fromString(_))
+ val pageViews = ssc.socketTextStream(host, port)
+ .flatMap(_.split("\n"))
+ .map(PageView.fromString(_))
// Return a count of views per URL seen in each batch
- val pageCounts = pageViews.map(view => ((view.url, 1))).countByKey()
+ val pageCounts = pageViews.map(view => view.url).countByValue()
// Return a sliding window of page views per URL in the last ten seconds
- val slidingPageCounts = pageViews.map(view => ((view.url, 1)))
- .window(Seconds(10), Seconds(2))
- .countByKey()
+ val slidingPageCounts = pageViews.map(view => view.url)
+ .countByValueAndWindow(Seconds(10), Seconds(2))
// Return the rate of error pages (a non 200 status) in each zip code over the last 30 seconds
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
index f67bb029c6..023a0add80 100644
--- a/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/StreamingHLL.scala
@@ -4,6 +4,7 @@ import spark.streaming.{Seconds, StreamingContext}
import spark.storage.StorageLevel
import com.twitter.algebird.HyperLogLog._
import com.twitter.algebird.HyperLogLogMonoid
+import spark.streaming.dstream.TwitterInputDStream
/**
* Example of using HyperLogLog monoid from Twitter's Algebird together with Spark Streaming's
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
deleted file mode 100644
index 99ed4cdc1c..0000000000
--- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-package spark.streaming.examples.twitter
-
-import spark._
-import spark.streaming._
-import dstream.{NetworkReceiver, NetworkInputDStream}
-import storage.StorageLevel
-import twitter4j._
-import twitter4j.auth.BasicAuthorization
-import collection.JavaConversions._
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied username and password to authenticate.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*/
-class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
- username: String,
- password: String,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkInputDStream[Status](ssc_) {
-
- override def createReceiver(): NetworkReceiver[Status] = {
- new TwitterReceiver(username, password, filters, storageLevel)
- }
-}
-
-class TwitterReceiver(
- username: String,
- password: String,
- filters: Seq[String],
- storageLevel: StorageLevel
- ) extends NetworkReceiver[Status] {
-
- var twitterStream: TwitterStream = _
- lazy val blockGenerator = new BlockGenerator(storageLevel)
-
- protected override def onStart() {
- blockGenerator.start()
- twitterStream = new TwitterStreamFactory()
- .getInstance(new BasicAuthorization(username, password))
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- blockGenerator += status
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {}
- })
-
- val query: FilterQuery = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
- }
- logInfo("Twitter receiver started")
- }
-
- protected override def onStop() {
- blockGenerator.stop()
- twitterStream.shutdown()
- logInfo("Twitter receiver stopped")
- }
-}