summaryrefslogtreecommitdiff
path: root/test/files/jvm/reactor-producer-consumer.scala
blob: ec34febe01a8d328e3d4f2fdc9e70c6c9de7a8de (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
@deprecated("Suppress warnings", since="2.11")
object Test {
  import scala.actors.Reactor
  case class Stop()
  case class Get(from: Reactor[Any])
  case class Put(x: Int)

  class UnboundedBuffer extends Reactor[Any] {
    def act() {
      try {
      react {
        case Stop() =>
        case Get(from) =>
          val consumer = from
          react {
            case msg @ Put(x) =>
              consumer ! x
              act()
          }
      }
      } catch {
        case e: Throwable if !e.isInstanceOf[scala.util.control.ControlThrowable] =>
          e.printStackTrace()
      }
    }
  }

  class Producer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor[Any]) extends Reactor[Any] {
    def act() {
      try {
      var i = 0
      while (i < n) {
        i += 1
        if (delay > 0) Thread.sleep(delay)
        buf ! Put(42)
      }
      parent ! Stop()
      } catch {
        case e: Throwable if !e.isInstanceOf[scala.util.control.ControlThrowable] =>
          e.printStackTrace()
      }
    }
  }

  class Consumer(buf: UnboundedBuffer, n: Int, delay: Long, parent: Reactor[Any]) extends Reactor[Any] {
    val step = n / 10
    var i = 0
    def act() {
      try {
      if (i < n) {
        i += 1
        if (delay > 0) Thread.sleep(delay)
        buf ! Get(this)
        react {
          case res =>
            if (i % step == 0)
              println(res)
            act()
        }
      } else {
        parent ! Stop()
      }
      } catch {
        case e: Throwable if !e.isInstanceOf[scala.util.control.ControlThrowable] =>
          e.printStackTrace()
      }
    }
  }

  def main(args: Array[String]) {
    val parent = new Reactor[Any] {
      def act() {
        try {
        val buffer = new UnboundedBuffer
        buffer.start()
        val producer = new Producer(buffer, 10000, 0, this)
        producer.start()
        val consumer = new Consumer(buffer, 10000, 0, this)
        consumer.start()
        react {
          case Stop() =>
            react {
              case Stop() =>
                buffer ! Stop()
            }
        }
        } catch {
          case e: Throwable if !e.isInstanceOf[scala.util.control.ControlThrowable] =>
            e.printStackTrace()
        }
      }
    }
    parent.start()
  }
}