aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/main/scala/spark/stream/WordCount1.scala
blob: 501062b18d7070e2f0dc8a5ee2de5101e3f9740d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package spark.stream

import SparkStreamContext._

import scala.util.Sorting

import spark.SparkContext
import spark.storage.StorageLevel

object WordCount1 {
  var inputFile : String = null
  var HDFS : String = null
  var idealPartitions : Int = 0

  def main (args: Array[String]) {
    
    if (args.length != 4) {
      println ("Usage: WordCount <host> <HDFS> <Input file> <Ideal Partitions>")
      System.exit(1)
    }

    HDFS = args(1)
    inputFile = HDFS + args(2)
    idealPartitions = args(3).toInt
    println ("Input file: " + inputFile)
    
    val ssc = new SparkStreamContext(args(0), "WordCountWindow")
    
    SparkContext.idealPartitions = idealPartitions
    SparkContext.inputFile = inputFile
    
    val sentences = ssc.readNetworkStream[String]("Sentences", Array("localhost:55119"), 1000)
    //sentences.print

    val words = sentences.flatMap(_.split(" "))
   
    def add(v1: Int, v2: Int) = (v1 + v2) 
    def subtract(v1: Int, v2: Int) = (v1 - v2) 
    
    val windowedCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(add _, subtract _, Seconds(10), Seconds(1), 10)
    windowedCounts.persist(StorageLevel.MEMORY_ONLY_DESER, StorageLevel.MEMORY_ONLY_DESER, Seconds(1))
    windowedCounts.foreachRDD(_.collect)

    ssc.run
  }
}