blob: 7ea2a50e113572ddb02323f1af39902c0f59a8ab (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
package examples.actors
import scala.actors.Actor._
abstract class Producer[T] extends Iterator[T] {
protected def produce(x: T): unit = coordinator !? HasValue(x)
protected def produceValues: unit
def hasNext: boolean = { setCurrent(); !current.isEmpty }
def next: T = {
setCurrent()
val res = current.get
current = (coordinator !? Next()).asInstanceOf[Option[T]]
res
}
private var current: Option[T] = null
private def setCurrent() = if (current == null) current = (coordinator !? Next()).asInstanceOf[Option[T]]
private case class HasValue(value: T)
private case class Next
private case class Done
private case class Continue
/** A thread-based coordinator */
private val coordinator = actor {
while (true) {
receive {
case Next() =>
reply {
receive {
case HasValue(v) =>
reply()
Some(v)
case Done() =>
None
}
}
}
}
}
actor {
produceValues
coordinator !? Done()
}
}
object Joins extends Application {
def from(m: int, n: int) = new Producer[int] {
def produceValues = for (val i <- m until n) produce(i)
}
// note that it works from the main thread
val it = from(1, 10)
while (it.hasNext) {
Console.println(it.next)
}
}
|