aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 16:54:35 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 16:54:35 -0800
commit64dceec2935e49ab7b5c18d9109d44b4b3fd06af (patch)
tree61ac5734f6e690dbaf8c022a8d51748068ff2738 /streaming/src
parentd808e1026ad488bed19886f3a3ed2f34a10e7d16 (diff)
parent518111573f685c8fad5ccfce27ef5704194691cb (diff)
downloadspark-64dceec2935e49ab7b5c18d9109d44b4b3fd06af.tar.gz
spark-64dceec2935e49ab7b5c18d9109d44b4b3fd06af.tar.bz2
spark-64dceec2935e49ab7b5c18d9109d44b4b3fd06af.zip
Merge branch 'streaming-merge' into dev-merge
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala46
2 files changed, 104 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
new file mode 100644
index 0000000000..adf1ed15c9
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
@@ -0,0 +1,58 @@
+package spark.streaming
+
+import spark.RDD
+import twitter4j._
+import twitter4j.auth.BasicAuthorization
+import collection.mutable.ArrayBuffer
+import collection.JavaConversions._
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*/
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ username: String,
+ password: String,
+ filters: Seq[String]
+ ) extends InputDStream[Status](ssc_) {
+ val statuses: ArrayBuffer[Status] = ArrayBuffer()
+ var twitterStream: TwitterStream = _
+
+ override def start() = {
+ twitterStream = new TwitterStreamFactory()
+ .getInstance(new BasicAuthorization(username, password))
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ statuses += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) {}
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ }
+
+ override def stop() = {
+ twitterStream.shutdown()
+ }
+
+ override def compute(validTime: Time): Option[RDD[Status]] = {
+ // Flush the current tweet buffer
+ val rdd = Some(ssc.sc.parallelize(statuses))
+ statuses.foreach(x => statuses -= x)
+ rdd
+ }
+}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
new file mode 100644
index 0000000000..19b3cad6ad
--- /dev/null
+++ b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
@@ -0,0 +1,46 @@
+package spark.streaming.examples
+
+import spark.streaming.StreamingContext._
+import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
+
+object TwitterBasic {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, filters)
+ ssc.graph.addInputStream(stream)
+
+ val hashTags = stream.flatMap(
+ status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ // Word count over hashtags
+ val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
+ // TODO: Sorts on one node - should do with global sorting once streaming supports it
+ val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
+
+ // Print popular hashtags
+ topCounts.foreachRDD(rdd => {
+ if (rdd.count() != 0) {
+ val topList = rdd.take(1)(0)
+ println("\nPopular topics in last 60 seconds:")
+ topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))}
+ }
+ })
+
+ // Print number of tweets in the window
+ stream.window(Seconds(60)).count().foreachRDD(rdd =>
+ if (rdd.count() != 0) {
+ println("Window size: %s tweets".format(rdd.take(1)(0)))
+ }
+ )
+ ssc.start()
+ }
+}