diff options
Diffstat (limited to 'streaming/src/main/scala/spark/stream/WordCount3.scala')
-rw-r--r-- | streaming/src/main/scala/spark/stream/WordCount3.scala | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/streaming/src/main/scala/spark/stream/WordCount3.scala b/streaming/src/main/scala/spark/stream/WordCount3.scala new file mode 100644 index 0000000000..455a8c9dbf --- /dev/null +++ b/streaming/src/main/scala/spark/stream/WordCount3.scala @@ -0,0 +1,49 @@ +package spark.stream + +import SparkStreamContext._ + +import scala.util.Sorting + +object WordCount3 { + + def main (args: Array[String]) { + + if (args.length < 1) { + println ("Usage: SparkStreamContext <host> [<temp directory>]") + System.exit(1) + } + + val ssc = new SparkStreamContext(args(0), "WordCount") + if (args.length > 1) { + ssc.setTempDir(args(1)) + } + val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000) + /*sentences.print*/ + + val words = sentences.flatMap(_.split(" ")) + + def add(v1: Int, v2: Int) = (v1 + v2) + def subtract(v1: Int, v2: Int) = (v1 - v2) + + /*val windowedCounts = words.map(x => (x, 1)).window(Seconds(5), Seconds(1)).reduceByKey(add _, 1)*/ + val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(5), Seconds(1), 1) + /*windowedCounts.print */ + + def topK(data: Seq[(String, Int)], k: Int): Array[(String, Int)] = { + implicit val countOrdering = new Ordering[(String, Int)] { + override def compare(count1: (String, Int), count2: (String, Int)): Int = { + count2._2 - count1._2 + } + } + val array = data.toArray + Sorting.quickSort(array) + array.take(k) + } + + val k = 10 + val topKWindowedCounts = windowedCounts.glom.flatMap(topK(_, k)).collect.flatMap(topK(_, k)) + topKWindowedCounts.print + + ssc.run + } +} |