diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 17:48:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 17:48:10 -0800 |
commit | 8c1b87251210bb5553e6a3b6f9648b178b221a3b (patch) | |
tree | 1891907d66e7b828c4822f36ee57200579b6c104 /examples | |
parent | 64dceec2935e49ab7b5c18d9109d44b4b3fd06af (diff) | |
download | spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.gz spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.bz2 spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.zip |
Moved Twitter example to the where the other examples are.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala | 43 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala | 62 |
2 files changed, 105 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala new file mode 100644 index 0000000000..22a927e87f --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala @@ -0,0 +1,43 @@ +package spark.streaming.examples.twitter + +import spark.streaming.{Seconds, StreamingContext} +import spark.streaming.StreamingContext._ + +object TwitterBasic { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) + val stream = new TwitterInputDStream(ssc, username, password, filters) + ssc.graph.addInputStream(stream) + + val hashTags = stream.flatMap( + status => status.getText.split(" ").filter(_.startsWith("#"))) + + // Word count over hashtags + val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + // TODO: Sorts on one node - should do with global sorting once streaming supports it + counts.foreach(rdd => { + val topList = rdd.collect().sortBy(-_._2).take(5) + if (!topList.isEmpty) { + println("\nPopular topics in last 60 seconds:") + topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))} + } + }) + + // Print number of tweets in the window + stream.window(Seconds(60)).count().foreach(rdd => + if (rdd.count() != 0) { + println("Window size: %s tweets".format(rdd.take(1)(0))) + } + ) + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala new file mode 100644 index 0000000000..1e842d2c8e --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala @@ -0,0 +1,62 @@ +package spark.streaming.examples.twitter + +import spark.RDD +import spark.streaming._ +import spark.streaming.dstream.InputDStream +import spark.streaming.StreamingContext._ + +import twitter4j._ +import twitter4j.auth.BasicAuthorization +import collection.mutable.ArrayBuffer +import collection.JavaConversions._ + +/* A stream of Twitter statuses, potentially filtered by one or more keywords. +* +* @constructor create a new Twitter stream using the supplied username and password to authenticate. +* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is +* such that this may return a sampled subset of all tweets during each interval. +*/ +class TwitterInputDStream( + @transient ssc_ : StreamingContext, + username: String, + password: String, + filters: Seq[String] + ) extends InputDStream[Status](ssc_) { + val statuses: ArrayBuffer[Status] = ArrayBuffer() + var twitterStream: TwitterStream = _ + + override def start() = { + twitterStream = new TwitterStreamFactory() + .getInstance(new BasicAuthorization(username, password)) + twitterStream.addListener(new StatusListener { + def onStatus(status: Status) = { + statuses += status + } + // Unimplemented + def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} + def onTrackLimitationNotice(i: Int) {} + def onScrubGeo(l: Long, l1: Long) {} + def onStallWarning(stallWarning: StallWarning) {} + def onException(e: Exception) {} + }) + + val query: FilterQuery = new FilterQuery + if (filters.size > 0) { + query.track(filters.toArray) + twitterStream.filter(query) + } else { + twitterStream.sample() + } + } + + override def stop() = { + twitterStream.shutdown() + } + + override def compute(validTime: Time): Option[RDD[Status]] = { + // Flush the current tweet buffer + val rdd = Some(ssc.sc.parallelize(statuses)) + statuses.foreach(x => statuses -= x) + rdd + } +} |