aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorPatrick Wendell <pwendell@gmail.com>2012-12-21 17:11:39 -0800
committerPatrick Wendell <pwendell@gmail.com>2012-12-21 17:18:19 -0800
commit9ac4cb1c5fd7637ff9936f1ef54fa27f6f6aa214 (patch)
treecc53b73075f7cc3e95e6d651b6212c00befee6e8 /streaming/src
parent556c38ed91a405e0665897873e025e94971226af (diff)
downloadspark-9ac4cb1c5fd7637ff9936f1ef54fa27f6f6aa214.tar.gz
spark-9ac4cb1c5fd7637ff9936f1ef54fa27f6f6aa214.tar.bz2
spark-9ac4cb1c5fd7637ff9936f1ef54fa27f6f6aa214.zip
Adding a Twitter InputDStream with an example
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala59
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala37
2 files changed, 96 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
new file mode 100644
index 0000000000..5d177e96de
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
@@ -0,0 +1,59 @@
+package spark.streaming
+
+import spark.RDD
+import spark.streaming.{Time, InputDStream}
+import twitter4j._
+import twitter4j.auth.BasicAuthorization
+import collection.mutable.ArrayBuffer
+import collection.JavaConversions._
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*/
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ username: String,
+ password: String,
+ filters: Seq[String]
+ ) extends InputDStream[Status](ssc_) {
+ val statuses: ArrayBuffer[Status] = ArrayBuffer()
+ var twitterStream: TwitterStream = _
+
+ override def start() = {
+ twitterStream = new TwitterStreamFactory()
+ .getInstance(new BasicAuthorization(username, password))
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ statuses += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) {}
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ }
+
+ override def stop() = {
+ twitterStream.shutdown()
+ }
+
+ override def compute(validTime: Time): Option[RDD[Status]] = {
+ // Flush the current tweet buffer
+ val rdd = Some(ssc.sc.parallelize(statuses))
+ statuses.foreach(x => statuses -= x)
+ rdd
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
new file mode 100644
index 0000000000..c7e380fbe1
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
@@ -0,0 +1,37 @@
+package spark.streaming.examples
+
+import spark.streaming.StreamingContext._
+import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
+
+object TwitterBasic {
+ def main(args: Array[String]) {
+ if (args.length != 3) {
+ System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>")
+ System.exit(1)
+ }
+
+ val Array(master, username, password) = args
+
+ val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, Seq())
+ ssc.graph.addInputStream(stream)
+
+ val hashTags = stream.flatMap(
+ status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ // Word count over hashtags
+ val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
+
+ // TODO: Sorts on one node - should do with global sorting once streaming supports it
+ val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
+
+ topCounts.foreachRDD(rdd => {
+ val topList = rdd.take(1)(0)
+ println("\nPopular topics in last 60 seconds:")
+ topList.foreach(t => println("%s (%s tweets)".format(t._1, t._2)))
+ }
+ )
+
+ ssc.start()
+ }
+}