diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 17:49:43 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-02-14 17:49:43 -0800 |
commit | def8126d7788a8bd991ac6f9f9403de701a39dc5 (patch) | |
tree | aedd6ef98806069248149d0fdfd08e95dbc11e85 /examples | |
parent | 2eacf22401f75b956036fb0c32eb38baa16b224e (diff) | |
download | spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.gz spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.tar.bz2 spark-def8126d7788a8bd991ac6f9f9403de701a39dc5.zip |
Added TwitterInputDStream from example to StreamingContext. Renamed example TwitterBasic to TwitterPopularTags.
Diffstat (limited to 'examples')
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala (renamed from examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala) | 33 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala | 71 |
2 files changed, 13 insertions, 91 deletions
diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala index 377bc0c98e..fdb3a4c73c 100644 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala +++ b/examples/src/main/scala/spark/streaming/examples/TwitterPopularTags.scala @@ -1,19 +1,19 @@ -package spark.streaming.examples.twitter +package spark.streaming.examples -import spark.streaming.StreamingContext._ import spark.streaming.{Seconds, StreamingContext} +import StreamingContext._ import spark.SparkContext._ -import spark.storage.StorageLevel /** * Calculates popular hashtags (topics) over sliding 10 and 60 second windows from a Twitter * stream. The stream is instantiated with credentials and optionally filters supplied by the * command line arguments. + * */ -object TwitterBasic { +object TwitterPopularTags { def main(args: Array[String]) { if (args.length < 3) { - System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" + + System.err.println("Usage: TwitterPopularTags <master> <twitter_username> <twitter_password>" + " [filter1] [filter2] ... [filter n]") System.exit(1) } @@ -21,10 +21,8 @@ object TwitterBasic { val Array(master, username, password) = args.slice(0, 3) val filters = args.slice(3, args.length) - val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters, - StorageLevel.MEMORY_ONLY_SER) - ssc.registerInputStream(stream) + val ssc = new StreamingContext(master, "TwitterPopularTags", Seconds(2)) + val stream = ssc.twitterStream(username, password, filters) val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#"))) @@ -39,22 +37,17 @@ object TwitterBasic { // Print popular hashtags topCounts60.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) topCounts10.foreach(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(5) - println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) - topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} - } + val topList = rdd.take(5) + println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count())) + topList.foreach{case (count, tag) => println("%s (%s tweets)".format(tag, count))} }) ssc.start() } - } diff --git a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala deleted file mode 100644 index 99ed4cdc1c..0000000000 --- a/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala +++ /dev/null @@ -1,71 +0,0 @@ -package spark.streaming.examples.twitter - -import spark._ -import spark.streaming._ -import dstream.{NetworkReceiver, NetworkInputDStream} -import storage.StorageLevel -import twitter4j._ -import twitter4j.auth.BasicAuthorization -import collection.JavaConversions._ - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied username and password to authenticate. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -*/ -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkInputDStream[Status](ssc_) { - - override def createReceiver(): NetworkReceiver[Status] = { - new TwitterReceiver(username, password, filters, storageLevel) - } -} - -class TwitterReceiver( - username: String, - password: String, - filters: Seq[String], - storageLevel: StorageLevel - ) extends NetworkReceiver[Status] { - - var twitterStream: TwitterStream = _ - lazy val blockGenerator = new BlockGenerator(storageLevel) - - protected override def onStart() { - blockGenerator.start() - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - blockGenerator += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) {} - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - logInfo("Twitter receiver started") - } - - protected override def onStop() { - blockGenerator.stop() - twitterStream.shutdown() - logInfo("Twitter receiver stopped") - } -} |