aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/spark/stream/WordCount3.scala
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/src/main/scala/spark/stream/WordCount3.scala')
-rw-r--r--streaming/src/main/scala/spark/stream/WordCount3.scala49
1 files changed, 49 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/stream/WordCount3.scala b/streaming/src/main/scala/spark/stream/WordCount3.scala
new file mode 100644
index 0000000000..455a8c9dbf
--- /dev/null
+++ b/streaming/src/main/scala/spark/stream/WordCount3.scala
@@ -0,0 +1,49 @@
+package spark.stream
+
+import SparkStreamContext._
+
+import scala.util.Sorting
+
+object WordCount3 {
+
+ def main (args: Array[String]) {
+
+ if (args.length < 1) {
+ println ("Usage: SparkStreamContext <host> [<temp directory>]")
+ System.exit(1)
+ }
+
+ val ssc = new SparkStreamContext(args(0), "WordCount")
+ if (args.length > 1) {
+ ssc.setTempDir(args(1))
+ }
+ val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
+ /*sentences.print*/
+
+ val words = sentences.flatMap(_.split(" "))
+
+ def add(v1: Int, v2: Int) = (v1 + v2)
+ def subtract(v1: Int, v2: Int) = (v1 - v2)
+
+ /*val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1)*/
+ val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), 1)
+ /*windowedCounts.print */
+
+ def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = {
+ implicit val countOrdering = new Ordering[(String, Int)] {
+ override def compare(count1: (String, Int), count2: (String, Int)): Int = {
+ count2._2 - count1._2
+ }
+ }
+ val array = data.toArray
+ Sorting.quickSort(array)
+ array.take(k)
+ }
+
+ val k = 10
+ val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k))
+ topKWindowedCounts.print
+
+ ssc.run
+ }
+}