aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala (renamed from streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala)15
-rw-r--r--examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala (renamed from streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala)6
2 files changed, 11 insertions, 10 deletions
diff --git a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
index 19b3cad6ad..22a927e87f 100644
--- a/streaming/src/main/scala/spark/streaming/examples/TwitterBasic.scala
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterBasic.scala
@@ -1,7 +1,7 @@
-package spark.streaming.examples
+package spark.streaming.examples.twitter
+import spark.streaming.{Seconds, StreamingContext}
import spark.streaming.StreamingContext._
-import spark.streaming.{TwitterInputDStream, Seconds, StreamingContext}
object TwitterBasic {
def main(args: Array[String]) {
@@ -24,19 +24,16 @@ object TwitterBasic {
// Word count over hashtags
val counts = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60))
// TODO: Sorts on one node - should do with global sorting once streaming supports it
- val topCounts = counts.collect().map(_.sortBy(-_._2).take(5))
-
- // Print popular hashtags
- topCounts.foreachRDD(rdd => {
- if (rdd.count() != 0) {
- val topList = rdd.take(1)(0)
+ counts.foreach(rdd => {
+ val topList = rdd.collect().sortBy(-_._2).take(5)
+ if (!topList.isEmpty) {
println("\nPopular topics in last 60 seconds:")
topList.foreach{case (tag, count) => println("%s (%s tweets)".format(tag, count))}
}
})
// Print number of tweets in the window
- stream.window(Seconds(60)).count().foreachRDD(rdd =>
+ stream.window(Seconds(60)).count().foreach(rdd =>
if (rdd.count() != 0) {
println("Window size: %s tweets".format(rdd.take(1)(0)))
}
diff --git a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
index adf1ed15c9..1e842d2c8e 100644
--- a/streaming/src/main/scala/spark/streaming/TwitterInputDStream.scala
+++ b/examples/src/main/scala/spark/streaming/examples/twitter/TwitterInputDStream.scala
@@ -1,6 +1,10 @@
-package spark.streaming
+package spark.streaming.examples.twitter
import spark.RDD
+import spark.streaming._
+import spark.streaming.dstream.InputDStream
+import spark.streaming.StreamingContext._
+
import twitter4j._
import twitter4j.auth.BasicAuthorization
import collection.mutable.ArrayBuffer