diff options
author | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 17:48:10 -0800 |
---|---|---|
committer | Tathagata Das <tathagata.das1565@gmail.com> | 2013-01-07 17:48:10 -0800 |
commit | 8c1b87251210bb5553e6a3b6f9648b178b221a3b (patch) | |
tree | 1891907d66e7b828c4822f36ee57200579b6c104 | |
parent | 64dceec2935e49ab7b5c18d9109d44b4b3fd06af (diff) | |
download | spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.gz spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.tar.bz2 spark-8c1b87251210bb5553e6a3b6f9648b178b221a3b.zip |
Moved Twitter example to the where the other examples are.
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala (renamed from streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala) | 15 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala) | 6 |
2 files changed, 11 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala index 19b3cad6ad..22a927e87f 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala @@ -1,7 +1,7 @@ -package spark.streaming.examples +package spark.streaming.examples.twitter +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} object TwitterBasic { def main(args: Array[String]) { @@ -24,19 +24,16 @@ object TwitterBasic { // Word count over hashtags val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) // TODO: Sorts on one node - should do with global sorting once streaming supports it - val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) - - // Print popular hashtags - topCounts.foreachRDD(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(1)(0) + counts.foreach(rdd => { + val topList = rdd.collect().sortBy(-_._2).take(5) + if (!topList.isEmpty) { println("\nPopular topics in last 60 seconds:") topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))} } }) // Print number of tweets in the window - stream.window(Seconds(60)).count().foreachRDD(rdd => + stream.window(Seconds(60)).count().foreach(rdd => if (rdd.count() != 0) { println("Window size: %s tweets".format(rdd.take(1)(0))) } diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala index adf1ed15c9..1e842d2c8e 100644 --- a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala @@ -1,6 +1,10 @@ -package spark.streaming +package spark.streaming.examples.twitter import spark.RDD +import spark.streaming._ +import spark.streaming.dstream.InputDStream +import spark.streaming.StreamingContext._ + import twitter4j._ import twitter4j.auth.BasicAuthorization import collection.mutable.ArrayBuffer |