aboutsummaryrefslogtreecommitdiff
path: root/streaming/src/test/scala/spark/streaming/DStreamBasicSuite.scala
blob: 9b953d9dae02cea10448a04443c05f64989c491d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
package spark.streaming

import spark.streaming.StreamingContext._
import scala.runtime.RichInt

class DStreamBasicSuite extends DStreamSuiteBase {

  test("map-like operations") {
    val input = Seq(1 to 4, 5 to 8, 9 to 12)
    
    // map
    testOperation(input, (r: DStream[Int]) => r.map(_.toString), input.map(_.map(_.toString)))
    
    // flatMap
    testOperation(
      input,
      (r: DStream[Int]) => r.flatMap(x => Seq(x, x * 2)),
      input.map(_.flatMap(x => Array(x, x * 2)))
    )
  }

  test("shuffle-based operations") {
    // reduceByKey
    testOperation(
      Seq(Seq("a", "a", "b"), Seq("", ""), Seq()),
      (s: DStream[String]) => s.map(x => (x, 1)).reduceByKey(_ + _),
      Seq(Seq(("a", 2), ("b", 1)), Seq(("", 2)), Seq()),
      true
    )

    // reduce
    testOperation(
      Seq(1 to 4, 5 to 8, 9 to 12),
      (s: DStream[Int]) => s.reduce(_ + _),
      Seq(Seq(10), Seq(26), Seq(42))
    )
  }

  test("stateful operations") {
    val inputData =
      Seq(
        Seq("a", "b", "c"),
        Seq("a", "b", "c"),
        Seq("a", "b", "c")
      )

    val outputData =
      Seq(
        Seq(("a", 1), ("b", 1), ("c", 1)),
        Seq(("a", 2), ("b", 2), ("c", 2)),
        Seq(("a", 3), ("b", 3), ("c", 3))
      )

    val updateStateOp = (s: DStream[String]) => {
      val updateFunc = (values: Seq[Int], state: RichInt) => {
        var newState = 0
        if (values != null) newState += values.reduce(_ + _)
        if (state != null) newState += state.self
        //println("values = " + values + ", state = " + state + ", " + " new state = " + newState)
        new RichInt(newState)
      }
      s.map(x => (x, 1)).updateStateByKey[RichInt](updateFunc).map(t => (t._1, t._2.self))
    }

    testOperation(inputData, updateStateOp, outputData, true)
  }
}