aboutsummaryrefslogtreecommitdiff
path: root/streaming/src
diff options
context:
space:
mode:
authorTathagata Das <tathagata.das1565@gmail.com>2013-01-07 17:48:10 -0800
committerTathagata Das <tathagata.das1565@gmail.com>2013-01-07 17:48:10 -0800
commit8c1b87251210bb5553e6a3b6f9648b178b221a3b (patch)
tree1891907d66e7b828c4822f36ee57200579b6c104 /streaming/src
parent64dceec2935e49ab7b5c18d9109d44b4b3fd06af (diff)
downloadspark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.gz
spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.bz2
spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.zip
Moved Twitter example to the where the other examples are.
Diffstat (limited to 'streaming/src')
-rw-r--r--streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala58
-rw-r--r--streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala46
2 files changed, 0 insertions, 104 deletions
diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
deleted file mode 100644
index adf1ed15c9..0000000000
--- a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package spark.streaming
-
-import spark.RDD
-import twitter4j._
-import twitter4j.auth.BasicAuthorization
-import collection.mutable.ArrayBuffer
-import collection.JavaConversions._
-
-/* A stream of Twitter statuses, potentially filtered by one or more keywords.
-*
-* @constructor create a new Twitter stream using the supplied username and password to authenticate.
-* An optional set of string filters can be used to restrict the set of tweets. The Twitter API is
-* such that this may return a sampled subset of all tweets during each interval.
-*/
-class TwitterInputDStream(
- @transient ssc_ : StreamingContext,
- username: String,
- password: String,
- filters: Seq[String]
- ) extends InputDStream[Status](ssc_) {
- val statuses: ArrayBuffer[Status] = ArrayBuffer()
- var twitterStream: TwitterStream = _
-
- override def start() = {
- twitterStream = new TwitterStreamFactory()
- .getInstance(new BasicAuthorization(username, password))
- twitterStream.addListener(new StatusListener {
- def onStatus(status: Status) = {
- statuses += status
- }
- // Unimplemented
- def onDeletionNotice(statusDeletionNotice: StatusDeletionNotice) {}
- def onTrackLimitationNotice(i: Int) {}
- def onScrubGeo(l: Long, l1: Long) {}
- def onStallWarning(stallWarning: StallWarning) {}
- def onException(e: Exception) {}
- })
-
- val query: FilterQuery = new FilterQuery
- if (filters.size > 0) {
- query.track(filters.toArray)
- twitterStream.filter(query)
- } else {
- twitterStream.sample()
- }
- }
-
- override def stop() = {
- twitterStream.shutdown()
- }
-
- override def compute(validTime: Time): Option[RDD[Status]] = {
- // Flush the current tweet buffer
- val rdd = Some(ssc.sc.parallelize(statuses))
- statuses.foreach(x => statuses -= x)
- rdd
- }
-}
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
deleted file mode 100644
index 19b3cad6ad..0000000000
--- a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package spark.streaming.examples
-
-import spark.streaming.StreamingContext._
-import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
-
-object TwitterBasic {
- def main(args: Array[String]) {
- if (args.length < 3) {
- System.err.println("Usage: TwitterBasic <master> <twitter_username> <twitter_password>" +
- " [filter1] [filter2] ... [filter n]")
- System.exit(1)
- }
-
- val Array(master, username, password) = args.slice(0, 3)
- val filters = args.slice(3, args.length)
-
- val ssc = new StreamingContext(master, "TwitterBasic", Seconds(2))
- val stream = new TwitterInputDStream(ssc, username, password, filters)
- ssc.graph.addInputStream(stream)
-
- val hashTags = stream.flatMap(
- status => status.getText.split(" ").filter(_.startsWith("#")))
-
- // Word count over hashtags
- val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
- // TODO: Sorts on one node - should do with global sorting once streaming supports it
- val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
-
- // Print popular hashtags
- topCounts.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(1)(0)
- println("\nPopular topics in last 60 seconds:")
- topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))}
- }
- })
-
- // Print number of tweets in the window
- stream.window(Seconds(60)).count().foreachRDD(rdd =>
- if (rdd.count() != 0) {
- println("Window size: %s tweets".format(rdd.take(1)(0)))
- }
- )
- ssc.start()
- }
-}