summaryrefslogtreecommitdiff
path: root/docs/examples/actors/Joins.scala
blob: 637cb5d09244366e660af3e18285951afa668f65 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package examples.actors

import scala.actors.Actor._

abstract class Producer[T] extends Iterator[T] {

  protected def produce(x: T): unit = coordinator !? HasValue(x)
  protected def produceValues: unit

  def hasNext: boolean = { setCurrent(); !current.isEmpty }
  def next: T = {
    setCurrent()
    val res = current.get
    current = (coordinator !? Next).asInstanceOf[Option[T]]
    res
  }

  private var current: Option[T] = null
  private def setCurrent() = if (current == null) current = (coordinator !? Next).asInstanceOf[Option[T]]

  private case class  HasValue(value: T)
  private case object Next
  private case object Done
  private case object Continue

  /** A thread-based coordinator */
  private val coordinator = actor {
    while (true) {
      receive {
        case Next =>
          reply {
            receive {
              case HasValue(v) =>
                reply()
                Some(v)
              case Done =>
                None
            }
          }
      }
    }
  }

  actor {
    produceValues
    coordinator !? Done
    ()
  }
}

object Joins extends Application {
  def from(m: int, n: int) = new Producer[int] {
    def produceValues = for (val i <- m until n) produce(i)
  }

  // note that it works from the main thread
  val it = from(1, 10)
  while (it.hasNext) {
    Console.println(it.next)
  }
}