blob: 3e3c76b32a8cf70c1070d04fa609b4b5e5f5dd29 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
|
import scala.annotation._
import scala.util.continuations._
import java.util.concurrent.atomic._
object Test {
def main(args: Array[String]) {
val map = Map("foo" -> 1, "bar" -> 2)
reset {
val mapped =
for {
(location, accessors) <- new ContinuationizedParallelIterable(map)
} yield {
shiftUnit0[Int, Unit](23)
}
println(mapped.toList)
}
}
}
final class ContinuationizedParallelIterable[+A](protected val underline: Iterable[A]) {
def toList = underline.toList.sortBy(_.toString)
final def filter(p: A => Boolean @suspendable): ContinuationizedParallelIterable[A] @suspendable =
shift(
new AtomicInteger(1) with ((ContinuationizedParallelIterable[A] => Unit) => Unit) {
private val results = new AtomicReference[List[A]](Nil)
@tailrec
private def add(element: A) {
val old = results.get
if (!results.compareAndSet(old, element :: old)) {
add(element)
}
}
override final def apply(continue: ContinuationizedParallelIterable[A] => Unit) {
for (element <- underline) {
super.incrementAndGet()
reset {
val pass = p(element)
if (pass) {
add(element)
}
if (super.decrementAndGet() == 0) {
continue(new ContinuationizedParallelIterable(results.get))
}
}
}
if (super.decrementAndGet() == 0) {
continue(new ContinuationizedParallelIterable(results.get))
}
}
})
final def foreach[U](f: A => U @suspendable): Unit @suspendable =
shift(
new AtomicInteger(1) with ((Unit => Unit) => Unit) {
override final def apply(continue: Unit => Unit) {
for (element <- underline) {
super.incrementAndGet()
reset {
f(element)
if (super.decrementAndGet() == 0) {
continue()
}
}
}
if (super.decrementAndGet() == 0) {
continue()
}
}
})
final def map[B: Manifest](f: A => B @suspendable): ContinuationizedParallelIterable[B] @suspendable =
shift(
new AtomicInteger(underline.size) with ((ContinuationizedParallelIterable[B] => Unit) => Unit) {
override final def apply(continue: ContinuationizedParallelIterable[B] => Unit) {
val results = new Array[B](super.get)
for ((element, i) <- underline.view zipWithIndex) {
reset {
val result = f(element)
results(i) = result
if (super.decrementAndGet() == 0) {
continue(new ContinuationizedParallelIterable(results))
}
}
}
}
})
}
|