aboutsummaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 17:48:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 17:48:10 -0800
commit8c1b87251210bb5553e6a3b6f9648b178b221a3b (patch)
tree1891907d66e7b828c4822f36ee57200579b6c104 /examples
parent64dceec2935e49ab7b5c18d9109d44b4b3fd06af (diff)
downloadspark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.gz
spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.bz2
spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.zip
Moved Twitter example to the where the other examples are.
Diffstat (limited to 'examples')
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala43
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala62
2 files changed, 105 insertions, 0 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
new file mode 100644
index 0000000000..22a927e87f
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
@@ -0,0 +1,43 @@
+package spark.streaming.examples.twitter
+
+import spark.streaming.{Seconds, StreamingContext}
+import spark.streaming.StreamingContext._
+
+object TwitterBasic {
+ def main(args: Array[String]) {
+ if (args.length < 3) {
+ System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
+ " [filter1] [filter2] ... [filter n]")
+ System.exit(1)
+ }
+
+ val Array(master, username, password) = args.slice(0, 3)
+ val filters = args.slice(3, args.length)
+
+ val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
+ val stream = new TwitterInputDStream(ssc, username, password, filters)
+ ssc.graph.addInputStream(stream)
+
+ val hashTags = stream.flatMap(
+ status => status.getText.split(" ").filter(_.startsWith("#")))
+
+ // Word count over hashtags
+ val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
+ // TODO: Sorts on one node - should do with global sorting once streaming supports it
+ counts.foreach(rdd => {
+ val topList = rdd.collect().sortBy(-_._2).take(5)
+ if (!topList.isEmpty) {
+ println("\nPopular topics in last 60 seconds:")
+ topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))}
+ }
+ })
+
+ // Print number of tweets in the window
+ stream.window(Seconds(60)).count().foreach(rdd =>
+ if (rdd.count() != 0) {
+ println("Window size: %s tweets".format(rdd.take(1)(0)))
+ }
+ )
+ ssc.start()
+ }
+}
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
new file mode 100644
index 0000000000..1e842d2c8e
--- /dev/null
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
@@ -0,0 +1,62 @@
+package spark.streaming.examples.twitter
+
+import spark.RDD
+import spark.streaming._
+import spark.streaming.dstream.InputDStream
+import spark.streaming.StreamingContext._
+
+import twitter4j._
+import twitter4j.auth.BasicAuthorization
+import collection.mutable.ArrayBuffer
+import collection.JavaConversions._
+
+/* A stream of Twitter statuses, potentially filtered by one or more keywords.
+*
+* @constructor create a new Twitter stream using the supplied username and password to authenticate.
+* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
+* such that this may return a sampled subset of all tweets during each interval.
+*/
+class TwitterInputDStream(
+ @transient ssc_ : StreamingContext,
+ username: String,
+ password: String,
+ filters: Seq[String]
+ ) extends InputDStream[Status](ssc_) {
+ val statuses: ArrayBuffer[Status] = ArrayBuffer()
+ var twitterStream: TwitterStream = _
+
+ override def start() = {
+ twitterStream = new TwitterStreamFactory()
+ .getInstance(new BasicAuthorization(username, password))
+ twitterStream.addListener(new StatusListener {
+ def onStatus(status: Status) = {
+ statuses += status
+ }
+ // Unimplemented
+ def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
+ def onTrackLimitationNotice(i: Int) {}
+ def onScrubGeo(l: Long, l1: Long) {}
+ def onStallWarning(stallWarning: StallWarning) {}
+ def onException(e: Exception) {}
+ })
+
+ val query: FilterQuery = new FilterQuery
+ if (filters.size > 0) {
+ query.track(filters.toArray)
+ twitterStream.filter(query)
+ } else {
+ twitterStream.sample()
+ }
+ }
+
+ override def stop() = {
+ twitterStream.shutdown()
+ }
+
+ override def compute(validTime: Time): Option[RDD[Status]] = {
+ // Flush the current tweet buffer
+ val rdd = Some(ssc.sc.parallelize(statuses))
+ statuses.foreach(x => statuses -= x)
+ rdd
+ }
+}