From 9ac4cb1c5fd7637ff9936f1ef54fa27f6f6aa214 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Fri, 21 Dec 2012 17:11:39 -0800 Subject: Adding a Twitter InputDStream with an example --- .../spark/streaming/TwitterInputDStream.scala | 59 ++++++++++++++++++++++ .../spark/streaming/examples/TwitterBasic.scala | 37 ++++++++++++++ 2 files changed, 96 insertions(+) create mode 100644 streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala create mode 100644 streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala new file mode 100644 index 0000000000..5d177e96de --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala @@ -0,0 +1,59 @@ +package spark.streaming + +import spark.RDD +import spark.streaming.{Time, InputDStream} +import twitter4j._ +import twitter4j.auth.BasicAuthorization +import collection.mutable.ArrayBuffer +import collection.JavaConversions._ + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String] + ) extends InputDStream[Status](ssc_) { + val statuses: ArrayBuffer[Status] = ArrayBuffer() + var twitterStream: TwitterStream = _ + + override def start() = { + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + statuses += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) {} + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + } + + override def stop() = { + twitterStream.shutdown() + } + + override def compute(validTime: Time): Option[RDD[Status]] = { + // Flush the current tweet buffer + val rdd = Some(ssc.sc.parallelize(statuses)) + statuses.foreach(x => statuses -= x) + rdd + } +} diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala new file mode 100644 index 0000000000..c7e380fbe1 --- /dev/null +++ b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala @@ -0,0 +1,37 @@ +package spark.streaming.examples + +import spark.streaming.StreamingContext._ +import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} + +object TwitterBasic { + def main(args: Array[String]) { + if (args.length != 3) { + System.err.println("Usage: TwitterBasic ") + System.exit(1) + } + + val Array(master, username, password) = args + + val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, Seq()) + ssc.graph.addInputStream(stream) + + val hashTags = stream.flatMap( + status => status.getText.split(" ").filter(_.startsWith("#"))) + + // Word count over hashtags + val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + + // TODO: Sorts on one node - should do with global sorting once streaming supports it + val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) + + topCounts.foreachRDD(rdd => { + val topList = rdd.take(1)(0) + println("\nPopular topics in last 60 seconds:") + topList.foreach(t => println("%s (%s tweets)".format(t._1, t._2))) + } + ) + + ssc.start() + } +} -- cgit v1.2.3