From 8c1b87251210bb5553e6a3b6f9648b178b221a3b Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Mon, 7 Jan 2013 17:48:10 -0800 Subject: Moved Twitter example to the where the other examples are. --- .../spark/streaming/TwitterInputDStream.scala | 58 ---------------------- .../spark/streaming/examples/TwitterBasic.scala | 46 ----------------- 2 files changed, 104 deletions(-) delete mode 100644 streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala delete mode 100644 streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala (limited to 'streaming/src') diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala deleted file mode 100644 index adf1ed15c9..0000000000 --- a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala +++ /dev/null @@ -1,58 +0,0 @@ -package spark.streaming - -import spark.RDD -import twitter4j._ -import twitter4j.auth.BasicAuthorization -import collection.mutable.ArrayBuffer -import collection.JavaConversions._ - -/* A stream of Twitter statuses, potentially filtered by one or more keywords. -* -* @constructor create a new Twitter stream using the supplied username and password to authenticate. -* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is -* such that this may return a sampled subset of all tweets during each interval. -*/ -class TwitterInputDStream( - @transient ssc_ : StreamingContext, - username: String, - password: String, - filters: Seq[String] - ) extends InputDStream[Status](ssc_) { - val statuses: ArrayBuffer[Status] = ArrayBuffer() - var twitterStream: TwitterStream = _ - - override def start() = { - twitterStream = new TwitterStreamFactory() - .getInstance(new BasicAuthorization(username, password)) - twitterStream.addListener(new StatusListener { - def onStatus(status: Status) = { - statuses += status - } - // Unimplemented - def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {} - def onTrackLimitationNotice(i: Int) {} - def onScrubGeo(l: Long, l1: Long) {} - def onStallWarning(stallWarning: StallWarning) {} - def onException(e: Exception) {} - }) - - val query: FilterQuery = new FilterQuery - if (filters.size > 0) { - query.track(filters.toArray) - twitterStream.filter(query) - } else { - twitterStream.sample() - } - } - - override def stop() = { - twitterStream.shutdown() - } - - override def compute(validTime: Time): Option[RDD[Status]] = { - // Flush the current tweet buffer - val rdd = Some(ssc.sc.parallelize(statuses)) - statuses.foreach(x => statuses -= x) - rdd - } -} diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala deleted file mode 100644 index 19b3cad6ad..0000000000 --- a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala +++ /dev/null @@ -1,46 +0,0 @@ -package spark.streaming.examples - -import spark.streaming.StreamingContext._ -import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} - -object TwitterBasic { - def main(args: Array[String]) { - if (args.length < 3) { - System.err.println("Usage: TwitterBasic " + - " [filter1] [filter2] ... [filter n]") - System.exit(1) - } - - val Array(master, username, password) = args.slice(0, 3) - val filters = args.slice(3, args.length) - - val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2)) - val stream = new TwitterInputDStream(ssc, username, password, filters) - ssc.graph.addInputStream(stream) - - val hashTags = stream.flatMap( - status => status.getText.split(" ").filter(_.startsWith("#"))) - - // Word count over hashtags - val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) - // TODO: Sorts on one node - should do with global sorting once streaming supports it - val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) - - // Print popular hashtags - topCounts.foreachRDD(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(1)(0) - println("\nPopular topics in last 60 seconds:") - topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))} - } - }) - - // Print number of tweets in the window - stream.window(Seconds(60)).count().foreachRDD(rdd => - if (rdd.count() != 0) { - println("Window size: %s tweets".format(rdd.take(1)(0))) - } - ) - ssc.start() - } -} -- cgit v1.2.3