summaryrefslogtreecommitdiff
path: root/test/files/continuations-run/t5472.scala
blob: 060d267544f8496c7ba321407bca586ea800961c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import scala.annotation._
import scala.util.continuations._
import java.util.concurrent.atomic._

@deprecated("Suppress warnings", since="2.11")
object Test {
  def main(args: Array[String]) {
    val map = Map("foo" -> 1, "bar" -> 2)
    reset {
      val mapped =
        for {
          (location, accessors) <- new ContinuationizedParallelIterable(map)
        } yield {
          shiftUnit0[Int, Unit](23)
        }
      println(mapped.toList)
    }
  }

final class ContinuationizedParallelIterable[+A](protected val underline: Iterable[A]) {
  def toList = underline.toList.sortBy(_.toString)

  final def filter(p: A => Boolean @suspendable): ContinuationizedParallelIterable[A] @suspendable =
    shift(
      new AtomicInteger(1) with ((ContinuationizedParallelIterable[A] => Unit) => Unit) {
        private val results = new AtomicReference[List[A]](Nil)

        @tailrec
        private def add(element: A) {
          val old = results.get
          if (!results.compareAndSet(old, element :: old)) {
            add(element)
          }
        }

        override final def apply(continue: ContinuationizedParallelIterable[A] => Unit) {
          for (element <- underline) {
            super.incrementAndGet()
            reset {
              val pass = p(element)
              if (pass) {
                add(element)
              }
              if (super.decrementAndGet() == 0) {
                continue(new ContinuationizedParallelIterable(results.get))
              }
            }
          }
          if (super.decrementAndGet() == 0) {
            continue(new ContinuationizedParallelIterable(results.get))
          }
        }
      })

  final def foreach[U](f: A => U @suspendable): Unit @suspendable =
    shift(
      new AtomicInteger(1) with ((Unit => Unit) => Unit) {
        override final def apply(continue: Unit => Unit) {
          for (element <- underline) {
            super.incrementAndGet()
            reset {
              f(element)
              if (super.decrementAndGet() == 0) {
                continue()
              }
            }
          }
          if (super.decrementAndGet() == 0) {
            continue()
          }
        }
      })

  final def map[B: Manifest](f: A => B @suspendable): ContinuationizedParallelIterable[B] @suspendable =
    shift(
      new AtomicInteger(underline.size) with ((ContinuationizedParallelIterable[B] => Unit) => Unit) {
        override final def apply(continue: ContinuationizedParallelIterable[B] => Unit) {
          val results = new Array[B](super.get)
          for ((element, i) <- underline.view.zipWithIndex) {
            reset {
              val result = f(element)
              results(i) = result
              if (super.decrementAndGet() == 0) {
                continue(new ContinuationizedParallelIterable(results))
              }
            }
          }
        }
      })
}
}