From def8126d7788a8bd991ac6f9f9403de701a39dc5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 14 Feb 2013 17:49:43 -0800 Subject: Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags. --- .../streaming/examples/TwitterPopularTags.scala | 53 ++++++++++++++++ .../streaming/examples/twitter/TwitterBasic.scala | 60 ------------------ .../examples/twitter/TwitterInputDStream.scala | 71 ---------------------- 3 files changed, 53 insertions(+), 131 deletions(-) create mode 100644 examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala delete mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala delete mode 100644 examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala (limited to 'examples') diff --git a/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala new file mode 100644 index 0000000000..fdb3a4c73c --- /dev/null +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -0,0 +1,53 @@ +package spark.streaming.examples + +import spark.streaming.{Seconds, StreamingContext} +import StreamingContext._ +import spark.SparkContext._ + +/** + * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter + * stream. The stream is instantiated with credentials and optionally filters supplied by the + * command line arguments. + * + */ +object TwitterPopularTags { + def main(args: Array[String]) { + if (args.length < 3) { + System.err.println("Usage: TwitterPopularTags " + + " [filter1] [filter2] ... [filter n]") + System.exit(1) + } + + val Array(master, username, password) = args.slice(0, 3) + val filters = args.slice(3, args.length) + + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val stream = ssc.twitterStream(username, password, filters) + + val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) + + val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) + .map{case (topic, count) => (count, topic)} + .transform(_.sortByKey(false)) + + + // Print popular hashtags + topCounts60.foreach(rdd => { + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + }) + + topCounts10.foreach(rdd => { + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} + }) + + ssc.start() + } +} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala deleted file mode 100644 index 377bc0c98e..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala +++ /dev/null @@ -1,60 +0,0 @@ -package spark.streaming.examples.twitter - -import spark.streaming.StreamingContext._ -import spark.streaming.{Seconds, StreamingContext} -import spark.SparkContext._ -import spark.storage.StorageLevel - -/** - * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter - * stream. The stream is instantiated with credentials and optionally filters supplied by the - * command line arguments. - */ -object TwitterBasic { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: TwitterBasic " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - val Array(master, username, password) = args.slice(0, 3) - val filters = args.slice(3, args.length) - - val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) - - val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) - - val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)) - .map{case (topic, count) => (count, topic)} - .transform(_.sortByKey(false)) - - - // Print popular hashtags - topCounts60.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } - }) - - topCounts10.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } - }) - - ssc.start() - } - -} diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala deleted file mode 100644 index 99ed4cdc1c..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala +++ /dev/null @@ -1,71 +0,0 @@ -package spark.streaming.examples.twitter - -import spark._ -import spark.streaming._ -import dstream.{NetworkReceiver, NetworkInputDStream} -import storage.StorageLevel -import twitter4j._ -import twitter4j.auth.BasicAuthorization -import collection.JavaConversions._ - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied username and password to authenticate. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -*/ -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { - - override def createReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) - } -} - -class TwitterReceiver( - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { - - var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - blockGenerator += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) {} - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - logInfo("Twitter receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - twitterStream.shutdown() - logInfo("Twitter receiver stopped") - } -} -- cgit v1.2.3