From 264cef8f9677c59395166da9be0af0bfe83abfa5 Mon Sep 17 00:00:00 2001 From: Paul Phillips Date: Thu, 3 May 2012 12:05:58 -0700 Subject: Test cases for SI-5472, SI-5399, SI-5685. --- test/files/continuations-run/t5472.scala | 90 ++++++++++++++++++++++++++++++++ 1 file changed, 90 insertions(+) create mode 100644 test/files/continuations-run/t5472.scala (limited to 'test/files/continuations-run/t5472.scala') diff --git a/test/files/continuations-run/t5472.scala b/test/files/continuations-run/t5472.scala new file mode 100644 index 0000000000..3e3c76b32a --- /dev/null +++ b/test/files/continuations-run/t5472.scala @@ -0,0 +1,90 @@ +import scala.annotation._ +import scala.util.continuations._ +import java.util.concurrent.atomic._ + +object Test { + def main(args: Array[String]) { + val map = Map("foo" -> 1, "bar" -> 2) + reset { + val mapped = + for { + (location, accessors) <- new ContinuationizedParallelIterable(map) + } yield { + shiftUnit0[Int, Unit](23) + } + println(mapped.toList) + } + } +} + +final class ContinuationizedParallelIterable[+A](protected val underline: Iterable[A]) { + def toList = underline.toList.sortBy(_.toString) + + final def filter(p: A => Boolean @suspendable): ContinuationizedParallelIterable[A] @suspendable = + shift( + new AtomicInteger(1) with ((ContinuationizedParallelIterable[A] => Unit) => Unit) { + private val results = new AtomicReference[List[A]](Nil) + + @tailrec + private def add(element: A) { + val old = results.get + if (!results.compareAndSet(old, element :: old)) { + add(element) + } + } + + override final def apply(continue: ContinuationizedParallelIterable[A] => Unit) { + for (element <- underline) { + super.incrementAndGet() + reset { + val pass = p(element) + if (pass) { + add(element) + } + if (super.decrementAndGet() == 0) { + continue(new ContinuationizedParallelIterable(results.get)) + } + } + } + if (super.decrementAndGet() == 0) { + continue(new ContinuationizedParallelIterable(results.get)) + } + } + }) + + final def foreach[U](f: A => U @suspendable): Unit @suspendable = + shift( + new AtomicInteger(1) with ((Unit => Unit) => Unit) { + override final def apply(continue: Unit => Unit) { + for (element <- underline) { + super.incrementAndGet() + reset { + f(element) + if (super.decrementAndGet() == 0) { + continue() + } + } + } + if (super.decrementAndGet() == 0) { + continue() + } + } + }) + + final def map[B: Manifest](f: A => B @suspendable): ContinuationizedParallelIterable[B] @suspendable = + shift( + new AtomicInteger(underline.size) with ((ContinuationizedParallelIterable[B] => Unit) => Unit) { + override final def apply(continue: ContinuationizedParallelIterable[B] => Unit) { + val results = new Array[B](super.get) + for ((element, i) <- underline.view zipWithIndex) { + reset { + val result = f(element) + results(i) = result + if (super.decrementAndGet() == 0) { + continue(new ContinuationizedParallelIterable(results)) + } + } + } + } + }) +} -- cgit v1.2.3