summaryrefslogtreecommitdiff
path: root/docs/examples/actors/Joins.scala
blob: 85105ec178eef1fe1f7816c543ef59de54d941a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package examples.actors

import scala.actors.Actor._

abstract class Producer[T] extends Iterator[T] {
  protected def produce(x: T): unit = coordinator ! HasValue(x)
  protected def produceValues: unit

  def hasNext = { setCurrent(); !current.isEmpty }

  def next: T = {
    setCurrent()
    val res = current.get
    current = (coordinator !? Next).asInstanceOf[Option[T]]
    res
  }

  private var current: Option[T] = null
  private def setCurrent() =
    if (current == null) current = (coordinator !? Next).asInstanceOf[Option[T]]

  private case class  HasValue(value: T)
  private case object Next
  private case object Done
  private case object Continue

  private val coordinator = actor {
    while (true)
      receive {
        case Next =>
          reply {
            receive {
              case HasValue(v) =>
                Some(v)
              case Done =>
                None
            }
          }
      }
  }

  actor {
    produceValues
    coordinator ! Done
  }
}

object Joins extends Application {
  def from(m: int, n: int) = new Producer[int] {
    def produceValues = for (val i <- m until n) produce(i)
  }

  // note that it works from the main thread
  val it = from(1, 10)
  while (it.hasNext) {
    Console.println(it.next)
  }
}