diff options
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala (renamed from streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala) | 15 | ||||
-rw-r--r-- | examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala) | 6 |
2 files changed, 11 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala index 19b3cad6ad..22a927e87f 100644 --- a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala @@ -1,7 +1,7 @@ -package spark.streaming.examples +package spark.streaming.examples.twitter +import spark.streaming.{Seconds, StreamingContext} import spark.streaming.StreamingContext._ -import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext} object TwitterBasic { def main(args: Array[String]) { @@ -24,19 +24,16 @@ object TwitterBasic { // Word count over hashtags val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)) // TODO: Sorts on one node - should do with global sorting once streaming supports it - val topCounts = counts.collect().map(_.sortBy(-_._2).take(5)) - - // Print popular hashtags - topCounts.foreachRDD(rdd => { - if (rdd.count() != 0) { - val topList = rdd.take(1)(0) + counts.foreach(rdd => { + val topList = rdd.collect().sortBy(-_._2).take(5) + if (!topList.isEmpty) { println("\nPopular topics in last 60 seconds:") topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))} } }) // Print number of tweets in the window - stream.window(Seconds(60)).count().foreachRDD(rdd => + stream.window(Seconds(60)).count().foreach(rdd => if (rdd.count() != 0) { println("Window size: %s tweets".format(rdd.take(1)(0))) } diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala index adf1ed15c9..1e842d2c8e 100644 --- a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala +++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala @@ -1,6 +1,10 @@ -package spark.streaming +package spark.streaming.examples.twitter import spark.RDD +import spark.streaming._ +import spark.streaming.dstream.InputDStream +import spark.streaming.StreamingContext._ + import twitter4j._ import twitter4j.auth.BasicAuthorization import collection.mutable.ArrayBuffer |