From 5ccc928f7ec2e79c793ee1b31ca8e91321688749 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 7 Dec 2011 16:53:42 +0100 Subject: Add tests for future callbacks --- test/files/jvm/concurrent-future.check | 16 +++++ test/files/jvm/concurrent-future.scala | 118 +++++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+) create mode 100644 test/files/jvm/concurrent-future.check create mode 100644 test/files/jvm/concurrent-future.scala (limited to 'test/files/jvm') diff --git a/test/files/jvm/concurrent-future.check b/test/files/jvm/concurrent-future.check new file mode 100644 index 0000000000..c55e824818 --- /dev/null +++ b/test/files/jvm/concurrent-future.check @@ -0,0 +1,16 @@ +test1: hai world +test1: kthxbye +test2: hai world +test2: awsum thx +test2: kthxbye +test3: hai world +test4: hai world +test4: kthxbye +test5: hai world +test5: kthxbye +test6: hai world +test6: kthxbye +test7: hai world +test7: kthxbye +test8: hai world +test8: im in yr loop diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala new file mode 100644 index 0000000000..9c2f04fb07 --- /dev/null +++ b/test/files/jvm/concurrent-future.scala @@ -0,0 +1,118 @@ +import scala.concurrent.{ executionContext, FutureTimeoutException, ExecutionException, SyncVar } +import executionContext._ + +object Test extends App { + + def once(body: (() => Unit) => Unit) { + val sv = new SyncVar[Boolean] + body(() => sv put true) + sv.take() + } + + def output(num: Int, msg: String) { + println("test" + num + ": " + msg) + } + + def testOnSuccess(): Unit = once { + done => + val f = future { + output(1, "hai world") + } + f onSuccess { _ => + output(1, "kthxbye") + done() + } + } + + def testOnSuccessWhenCompleted(): Unit = once { + done => + val f = future { + output(2, "hai world") + } + f onSuccess { _ => + output(2, "awsum thx") + f onSuccess { _ => + output(2, "kthxbye") + done() + } + } + } + + def testOnSuccessWhenFailed(): Unit = once { + done => + val f = future { + output(3, "hai world") + done() + throw new Exception + } + f onSuccess { _ => + output(3, "onoes") + } + } + + def testOnFailure(): Unit = once { + done => + val f = future { + output(4, "hai world") + throw new Exception + } + f onSuccess { _ => + output(4, "onoes") + done() + } + f onFailure { _ => + output(4, "kthxbye") + done() + } + } + + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { + done => + val f = future { + output(num, "hai world") + throw cause + } + f onSuccess { _ => + output(num, "onoes") + done() + } + f onFailure { + case e: ExecutionException if (e.getCause == cause) => + output(num, "kthxbye") + done() + case _ => + output(num, "onoes") + done() + } + } + + def testOnFailureWhenFutureTimeoutException(): Unit = once { + done => + val f = future { + output(8, "hai world") + throw new FutureTimeoutException(null) + } + f onSuccess { _ => + output(8, "onoes") + done() + } + f onFailure { + case e: FutureTimeoutException => + output(8, "im in yr loop") + done() + case other => + output(8, "onoes: " + other) + done() + } + } + + testOnSuccess() + testOnSuccessWhenCompleted() + testOnSuccessWhenFailed() + testOnFailure() + testOnFailureWhenSpecialThrowable(5, new Error) + testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) + testOnFailureWhenSpecialThrowable(7, new InterruptedException) + testOnFailureWhenFutureTimeoutException() + +} -- cgit v1.2.3 From 6f5338c2aa423a5d2aa49f6fbdbe6ae3f3724a48 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Fri, 9 Dec 2011 16:18:38 +0100 Subject: Fix future tests --- test/files/jvm/concurrent-future.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala index 9c2f04fb07..30f604d5c1 100644 --- a/test/files/jvm/concurrent-future.scala +++ b/test/files/jvm/concurrent-future.scala @@ -40,7 +40,7 @@ object Test extends App { def testOnSuccessWhenFailed(): Unit = once { done => - val f = future { + val f = future[Unit] { output(3, "hai world") done() throw new Exception @@ -52,7 +52,7 @@ object Test extends App { def testOnFailure(): Unit = once { done => - val f = future { + val f = future[Unit] { output(4, "hai world") throw new Exception } @@ -60,7 +60,7 @@ object Test extends App { output(4, "onoes") done() } - f onFailure { _ => + f onFailure { case _ => output(4, "kthxbye") done() } @@ -68,7 +68,7 @@ object Test extends App { def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { done => - val f = future { + val f = future[Unit] { output(num, "hai world") throw cause } @@ -88,7 +88,7 @@ object Test extends App { def testOnFailureWhenFutureTimeoutException(): Unit = once { done => - val f = future { + val f = future[Unit] { output(8, "hai world") throw new FutureTimeoutException(null) } -- cgit v1.2.3 From 7021aef3fd8a20c8f730af36f229e7bb2cfe8fb5 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Mon, 12 Dec 2011 18:15:41 +0100 Subject: Syntax changes for the scala.concurrent package and some cleanup. block on { } is now changed to: block(timeout) { } --- .../scala/concurrent/ExecutionContext.scala | 4 +- src/library/scala/concurrent/Future.scala | 19 +++----- .../scala/concurrent/default/TaskImpl.scala | 10 +++- src/library/scala/concurrent/package.scala | 56 +++++++++++----------- test/files/jvm/concurrent-future.scala | 15 ++++-- 5 files changed, 55 insertions(+), 49 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 207e190e95..5d802d71b2 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -12,6 +12,8 @@ trait ExecutionContext { def execute(runnable: Runnable): Unit + def execute[U](body: () => U): Unit + def promise[T]: Promise[T] def future[T](body: Callable[T]): Future[T] = future(body.call()) @@ -19,7 +21,7 @@ trait ExecutionContext { def future[T](body: => T): Future[T] /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](body: Blockable[T]): T + def blockingCall[T](timeout: Timeout, body: Blockable[T]): T } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index dc2a92ebd4..1f44b50018 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -26,11 +26,6 @@ import scala.collection.generic.CanBuildFrom /** The trait that represents futures. * - * @define futureTimeout - * The timeout of the future is: - * - if this future was obtained from a task (i.e. by calling `task.future`), the timeout associated with that task - * - if this future was obtained from a promise (i.e. by calling `promise.future`), the timeout associated with that promise - * * @define multipleCallbacks * Multiple callbacks may be registered; there is no guarantee that they will be * executed in a particular order. @@ -75,8 +70,6 @@ self => * If the future has already been completed with a value, * this will either be applied immediately or be scheduled asynchronously. * - * Will not be called in case of an exception (this includes the FutureTimeoutException). - * * $multipleCallbacks */ def onSuccess[U](func: T => U): this.type = onComplete { @@ -94,15 +87,14 @@ self => * * Will not be called in case that the future is completed with a value. * - * Will be called if the future is completed with a FutureTimeoutException. - * * $multipleCallbacks */ def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete { - case Left(t) if t.isInstanceOf[FutureTimeoutException] || isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t) + case Left(t) if isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t) case Right(v) => // do nothing } + /* To be removed /** When this future times out, apply the provided function. * * If the future has already timed out, @@ -114,6 +106,7 @@ self => case Left(te: FutureTimeoutException) => callback(te) case Right(v) => // do nothing } + */ /** When this future is completed, either through an exception, a timeout, or a value, * apply the provided function. @@ -136,11 +129,13 @@ self => */ def newPromise[S]: Promise[S] = executionContext promise + /* /** Tests whether this `Future`'s timeout has expired. * * $futureTimeout */ def isTimedout: Boolean + */ /* Projections */ @@ -166,7 +161,6 @@ self => } this } - def isTimedout = self.isTimedout def block()(implicit canblock: CanBlock) = try { val res = self.block() throw noSuchElem(res) @@ -177,6 +171,7 @@ self => new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) } + /* /** A timed out projection of this future. * * The timed out projection is a future holding a value of type `FutureTimeoutException`. @@ -215,7 +210,7 @@ self => private def noSuchElemThrowable(v: Throwable) = new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v) } - + */ /* Monadic operations */ diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index ca88735266..b4c08ba710 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -244,6 +244,11 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { executeTask(action) } + def execute[U](body: () => U) { + val action = new RecursiveAction { def compute() { body() } } + executeTask(action) + } + def task[T](body: => T): Task[T] = { new TaskImpl(this, body) } @@ -255,9 +260,10 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { } def promise[T]: Promise[T] = - null + null // TODO - def blockingCall[T](b: Blockable[T]): T = b match { + // TODO fix the timeout + def blockingCall[T](timeout: Timeout, b: Blockable[T]): T = b match { case fj: TaskImpl[_] if fj.executionContext.pool eq pool => fj.block() case _ => diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index fed7f7caf8..dbe2f90f18 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -12,6 +12,9 @@ package scala +import scala.util.{ Timeout, Duration } + + /** This package object contains primitives for parallel programming. */ @@ -42,43 +45,38 @@ package object concurrent { /* concurrency constructs */ - def future[T](body: =>T): Future[T] = - executionContext future body + def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = + execCtx future body - def promise[T]: Promise[T] = - executionContext promise + def promise[T](implicit execCtx: ExecutionContext = executionContext): Promise[T] = + execCtx promise - /** The keyword used to block on a piece of code which potentially blocks. + /** Used to block on a piece of code which potentially blocks. * - * @define mayThrow + * @param body A piece of code which contains potentially blocking or long running calls. + * + * Calling this method may throw the following exceptions: + * - CancellationException - if the computation was cancelled + * - InterruptedException - in the case that a wait within the blockable object was interrupted + * - TimeoutException - in the case that the blockable object timed out + */ + def block[T](timeout: Timeout)(body: =>T): T = block(timeout, new Blockable[T] { + def block()(implicit cb: CanBlock) = body + }) + + /** Blocks on a blockable object. + * + * @param blockable An object with a `block` method which runs potentially blocking or long running calls. + * * Calling this method may throw the following exceptions: * - CancellationException - if the computation was cancelled * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - object block { - - /** Blocks on a piece of code. - * - * @param body A piece of code which contains potentially blocking or long running calls. - * - * $mayThrow - */ - def on[T](body: =>T): T = on(new Blockable[T] { - def block()(implicit cb: CanBlock) = body - }) - - /** Blocks on a blockable object. - * - * @param blockable An object with a `block` method which runs potentially blocking or long running calls. - * - * $mayThrow - */ - def on[T](blockable: Blockable[T]): T = { - currentExecutionContext.get match { - case null => blockable.block()(null) // outside - case x => x.blockingCall(blockable) // inside an execution context thread - } + def block[T](timeout: Timeout, blockable: Blockable[T]): T = { + currentExecutionContext.get match { + case null => blockable.block()(null) // outside - TODO - fix timeout case + case x => x.blockingCall(timeout, blockable) // inside an execution context thread } } diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala index 9c2f04fb07..8fb237eb0a 100644 --- a/test/files/jvm/concurrent-future.scala +++ b/test/files/jvm/concurrent-future.scala @@ -1,5 +1,9 @@ -import scala.concurrent.{ executionContext, FutureTimeoutException, ExecutionException, SyncVar } -import executionContext._ + + + +import scala.concurrent._ + + object Test extends App { @@ -60,9 +64,10 @@ object Test extends App { output(4, "onoes") done() } - f onFailure { _ => - output(4, "kthxbye") - done() + f onFailure { + case _ => + output(4, "kthxbye") + done() } } -- cgit v1.2.3 From 72424e397135bb9701e2fd4e36b2184b03586bd5 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Mon, 12 Dec 2011 18:57:33 +0100 Subject: Adding the tck test file. --- test/files/jvm/scala-concurrent-tck.scala | 166 ++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) create mode 100644 test/files/jvm/scala-concurrent-tck.scala (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala new file mode 100644 index 0000000000..e1b8d2763c --- /dev/null +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -0,0 +1,166 @@ + + + +import scala.concurrent.{ + Future, + TimeoutException, + SyncVar, + ExecutionException +} + + + +trait TestBase { + + def future[T](body: =>T): Future[T] + + def once(body: (() => Unit) => Unit) { + val sv = new SyncVar[Boolean] + body(() => sv put true) + sv.take() + } + +} + + +trait FutureCallbacks extends TestBase { + + def testOnSuccess(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { _ => + done() + assert(x == 1) + } + } + + def testOnSuccessWhenCompleted(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { _ => + assert(x == 1) + x = 2 + f onSuccess { _ => + assert(x == 2) + done() + } + } + } + + def testOnSuccessWhenFailed(): Unit = once { + done => + val f = future[Unit] { + done() + throw new Exception + } + f onSuccess { _ => + assert(false) + } + } + + def testOnFailure(): Unit = once { + done => + var x = 0 + val f = future[Unit] { + x = 1 + throw new Exception + } + f onSuccess { _ => + done() + assert(false) + } + f onFailure { + case _ => + done() + assert(x == 1) + } + } + + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { + done => + val f = future[Unit] { + throw cause + } + f onSuccess { _ => + done() + assert(false) + } + f onFailure { + case e: ExecutionException if (e.getCause == cause) => + done() + case _ => + done() + assert(false) + } + } + + def testOnFailureWhenTimeoutException(): Unit = once { + done => + val f = future[Unit] { + throw new TimeoutException() + } + f onSuccess { _ => + done() + assert(false) + } + f onFailure { + case e: TimeoutException => + done() + case other => + done() + assert(false) + } + } + + testOnSuccess() + testOnSuccessWhenCompleted() + testOnSuccessWhenFailed() + testOnFailure() + testOnFailureWhenSpecialThrowable(5, new Error) + testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) + testOnFailureWhenSpecialThrowable(7, new InterruptedException) + testOnFailureWhenTimeoutException() + +} + + +trait FutureCombinators extends TestBase { + +} + + +trait FutureProjections extends TestBase { + +} + + +trait Promises extends TestBase { + +} + + +trait Exceptions extends TestBase { + +} + + +object Test +extends App +with FutureCallbacks +with FutureCombinators +with FutureProjections +with Promises +with Exceptions +{ + + def future[T](body: =>T) = scala.concurrent.future(body) + +} + + -- cgit v1.2.3 From 5fcd1e067767b56ae323b0a37b169bf4e0a937b6 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 09:03:30 +0100 Subject: Fix default.ExecutionContextImpl.promise. Add promise test. --- .../scala/concurrent/default/TaskImpl.scala | 6 +++--- test/files/jvm/scala-concurrent-tck.scala | 25 +++++++++++++++++++++- 2 files changed, 27 insertions(+), 4 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index b4c08ba710..ea465225c8 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -91,7 +91,7 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) * * $promiseCompletion */ - def fulfill(value: T): Unit = { + def success(value: T): Unit = { val cbs = tryCompleteState(Success(value)) if (cbs == null) throw new IllegalStateException @@ -109,7 +109,7 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) * * $promiseCompletion */ - def break(t: Throwable): Unit = { + def failure(t: Throwable): Unit = { val wrapped = wrap(t) val cbs = tryCompleteState(Failure(wrapped)) if (cbs == null) @@ -260,7 +260,7 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { } def promise[T]: Promise[T] = - null // TODO + new PromiseImpl[T](this) // TODO fix the timeout def blockingCall[T](timeout: Timeout, b: Blockable[T]): T = b match { diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index e1b8d2763c..705675ae43 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -3,6 +3,7 @@ import scala.concurrent.{ Future, + Promise, TimeoutException, SyncVar, ExecutionException @@ -14,6 +15,8 @@ trait TestBase { def future[T](body: =>T): Future[T] + def promise[T]: Promise[T] + def once(body: (() => Unit) => Unit) { val sv = new SyncVar[Boolean] body(() => sv put true) @@ -141,7 +144,25 @@ trait FutureProjections extends TestBase { trait Promises extends TestBase { - + + def testSuccess(): Unit = once { + done => + val p = promise[Int] + val f = p.future + + f.onSuccess { x => + done() + assert(x == 5) + } onFailure { case any => + done() + assert(false) + } + + p.success(5) + } + + testSuccess() + } @@ -161,6 +182,8 @@ with Exceptions def future[T](body: =>T) = scala.concurrent.future(body) + def promise[T] = scala.concurrent.promise[T] + } -- cgit v1.2.3 From d9981460533949182691e2d83079af364d2dfcde Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 09:14:39 +0100 Subject: Add stubs for future combinator tests. --- test/files/jvm/scala-concurrent-tck.scala | 68 ++++++++++++++++++++++++++++++- 1 file changed, 67 insertions(+), 1 deletion(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 705675ae43..6e291c396b 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -134,7 +134,73 @@ trait FutureCallbacks extends TestBase { trait FutureCombinators extends TestBase { - + + // map: stub + def testMapSuccess(): Unit = once { + done => + done() + } + + def testMapFailure(): Unit = once { + done => + done() + } + + // flatMap: stub + def testFlatMapSuccess(): Unit = once { + done => + done() + } + + def testFlatMapFailure(): Unit = once { + done => + done() + } + + // filter: stub + def testFilterSuccess(): Unit = once { + done => + done() + } + + def testFilterFailure(): Unit = once { + done => + done() + } + + // foreach: stub + def testForeachSuccess(): Unit = once { + done => + done() + } + + def testForeachFailure(): Unit = once { + done => + done() + } + + // recover: stub + def testRecoverSuccess(): Unit = once { + done => + done() + } + + def testRecoverFailure(): Unit = once { + done => + done() + } + + testMapSuccess() + testMapFailure() + testFlatMapSuccess() + testFlatMapFailure() + testFilterSuccess() + testFilterFailure() + testForeachSuccess() + testForeachFailure() + testRecoverSuccess() + testRecoverFailure() + } -- cgit v1.2.3 From 65b27aa3ab58b39a3b68999f4c6b9d3af7a81f85 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Tue, 13 Dec 2011 13:31:06 +0100 Subject: Rename block->await. Add more tck test cases. --- src/library/scala/concurrent/Awaitable.scala | 25 ++++++ src/library/scala/concurrent/Blockable.scala | 24 ------ .../scala/concurrent/ExecutionContext.scala | 2 +- src/library/scala/concurrent/Future.scala | 6 +- .../scala/concurrent/default/TaskImpl.scala | 12 +-- src/library/scala/concurrent/package.scala | 14 ++-- test/files/jvm/scala-concurrent-tck.scala | 95 ++++++++++++++++++++-- 7 files changed, 129 insertions(+), 49 deletions(-) create mode 100644 src/library/scala/concurrent/Awaitable.scala delete mode 100644 src/library/scala/concurrent/Blockable.scala (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala new file mode 100644 index 0000000000..85546718d2 --- /dev/null +++ b/src/library/scala/concurrent/Awaitable.scala @@ -0,0 +1,25 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import scala.annotation.implicitNotFound +import scala.util.Timeout + + + +trait Awaitable[+T] { + @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.") + def await(timeout: Timeout)(implicit canblock: CanBlock): T +} + + + + diff --git a/src/library/scala/concurrent/Blockable.scala b/src/library/scala/concurrent/Blockable.scala deleted file mode 100644 index d5c45a9e4e..0000000000 --- a/src/library/scala/concurrent/Blockable.scala +++ /dev/null @@ -1,24 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - - - -import scala.annotation.implicitNotFound - - - -trait Blockable[+T] { - @implicitNotFound(msg = "Blocking must be done by calling `block on b`, where `b` is the Blockable object.") - def block()(implicit canblock: CanBlock): T -} - - - - diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 77d805b19c..b7b3e901e6 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -23,7 +23,7 @@ trait ExecutionContext { def future[T](body: => T): Future[T] /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](timeout: Timeout, body: Blockable[T]): T + def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index f653a8a47d..9937d43b23 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -59,7 +59,7 @@ import scala.collection.generic.CanBuildFrom * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} */ -trait Future[+T] extends Blockable[T] { +trait Future[+T] extends Awaitable[T] { self => /* Callbacks */ @@ -161,8 +161,8 @@ self => } this } - def block()(implicit canblock: CanBlock) = try { - val res = self.block() + def await(timeout: Timeout)(implicit canblock: CanBlock) = try { + val res = self.await(timeout) throw noSuchElem(res) } catch { case t: Throwable => t diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index ea465225c8..dac6400b45 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -122,7 +122,7 @@ private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) } } - def block()(implicit canblock: scala.concurrent.CanBlock): T = getState match { + def await(timeout: Timeout)(implicit canblock: scala.concurrent.CanBlock): T = getState match { case Success(res) => res case Failure(t) => throw t case _ => @@ -196,8 +196,8 @@ private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) def tryCancel(): Unit = tryUnfork() - def block()(implicit canblock: CanBlock): T = { - join() + def await(timeout: Timeout)(implicit canblock: CanBlock): T = { + join() // TODO handle timeout also (updater.get(this): @unchecked) match { case Success(r) => r case Failure(t) => throw t @@ -263,16 +263,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { new PromiseImpl[T](this) // TODO fix the timeout - def blockingCall[T](timeout: Timeout, b: Blockable[T]): T = b match { + def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match { case fj: TaskImpl[_] if fj.executionContext.pool eq pool => - fj.block() + fj.await(timeout) case _ => var res: T = null.asInstanceOf[T] @volatile var blockingDone = false // TODO add exception handling here! val mb = new ForkJoinPool.ManagedBlocker { def block() = { - res = b.block()(CanBlockEvidence) + res = b.await(timeout)(CanBlockEvidence) blockingDone = true true } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index d93d5b04ba..d9923d6d56 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -59,7 +59,7 @@ package object concurrent { execCtx promise /** Used to block on a piece of code which potentially blocks. - * + * * @param body A piece of code which contains potentially blocking or long running calls. * * Calling this method may throw the following exceptions: @@ -67,23 +67,23 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def block[T](timeout: Timeout)(body: =>T): T = block(timeout, new Blockable[T] { - def block()(implicit cb: CanBlock) = body + def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] { + def await(timeout: Timeout)(implicit cb: CanBlock) = body }) /** Blocks on a blockable object. * - * @param blockable An object with a `block` method which runs potentially blocking or long running calls. + * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. * * Calling this method may throw the following exceptions: * - CancellationException - if the computation was cancelled * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def block[T](timeout: Timeout, blockable: Blockable[T]): T = { + def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = { currentExecutionContext.get match { - case null => blockable.block()(null) // outside - TODO - fix timeout case - case x => x.blockingCall(timeout, blockable) // inside an execution context thread + case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(timeout, awaitable) // inside an execution context thread } } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 6e291c396b..774d4236b7 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -8,15 +8,14 @@ import scala.concurrent.{ SyncVar, ExecutionException } +import scala.concurrent.future +import scala.concurrent.promise +import scala.concurrent.await trait TestBase { - def future[T](body: =>T): Future[T] - - def promise[T]: Promise[T] - def once(body: (() => Unit) => Unit) { val sv = new SyncVar[Boolean] body(() => sv put true) @@ -206,6 +205,90 @@ trait FutureCombinators extends TestBase { trait FutureProjections extends TestBase { + def testFailedFailureOnComplete(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onComplete { + case Right(t) => + assert(t == cause) + done() + case Left(t) => + assert(false) + } + } + + def testFailedFailureOnSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onSuccess { + t => + assert(t == cause) + done() + } + } + + def testFailedSuccessOnComplete(): Unit = once { + done => + val f = future { 0 } + f.failed onComplete { + case Right(t) => + assert(false) + case Left(t) => + assert(t.isInstanceOf[NoSuchElementException]) + done() + } + } + + def testFailedSuccessOnFailure(): Unit = once { + done => + val f = future { 0 } + f.failed onFailure { + case nsee: NoSuchElementException => + done() + } + } + + def testFailedFailureAwait(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + assert(await(0, f.failed) == cause) + done() + } + + def testFailedSuccessAwait(): Unit = once { + done => + val f = future { 0 } + try { + println(await(0, f.failed)) + assert(false) + } catch { + case nsee: NoSuchElementException => done() + } + } + + testFailedFailureOnComplete() + testFailedFailureOnSuccess() + testFailedSuccessOnComplete() + testFailedSuccessOnFailure() + testFailedFailureAwait() + //testFailedSuccessAwait() + +} + + +trait Blocking extends TestBase { + + // TODO + } @@ -246,10 +329,6 @@ with Promises with Exceptions { - def future[T](body: =>T) = scala.concurrent.future(body) - - def promise[T] = scala.concurrent.promise[T] - } -- cgit v1.2.3 From c3477895c08397234a4a103911a4b55517a440b6 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Tue, 13 Dec 2011 15:26:12 +0100 Subject: Add test cases for blocking. Fix in the failed projection. --- src/library/scala/concurrent/Future.scala | 14 +++++++++----- src/library/scala/concurrent/package.scala | 2 +- test/files/jvm/scala-concurrent-tck.scala | 30 +++++++++++++++++++++++++++--- 3 files changed, 37 insertions(+), 9 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 9937d43b23..36126056c9 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -161,11 +161,15 @@ self => } this } - def await(timeout: Timeout)(implicit canblock: CanBlock) = try { - val res = self.await(timeout) - throw noSuchElem(res) - } catch { - case t: Throwable => t + def await(timeout: Timeout)(implicit canblock: CanBlock): Throwable = { + var t: Throwable = null + try { + val res = self.await(timeout) + t = noSuchElem(res) + } catch { + case t: Throwable => return t + } + throw t } private def noSuchElem(v: T) = new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index d9923d6d56..0cdb52fb69 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -33,7 +33,7 @@ package object concurrent { */ lazy val scheduler = new default.SchedulerImpl - + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { override protected def initialValue = null } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 774d4236b7..ffe23de756 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -268,7 +268,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - println(await(0, f.failed)) + await(0, f.failed) assert(false) } catch { case nsee: NoSuchElementException => done() @@ -280,14 +280,38 @@ trait FutureProjections extends TestBase { testFailedSuccessOnComplete() testFailedSuccessOnFailure() testFailedFailureAwait() - //testFailedSuccessAwait() + testFailedSuccessAwait() } trait Blocking extends TestBase { - // TODO + def testAwaitSuccess(): Unit = once { + done => + val f = future { 0 } + await(0, f) + done() + } + + def testAwaitFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + try { + await(0, f) + assert(false) + } catch { + case t => + assert(t == cause) + done() + } + } + + testAwaitSuccess() + testAwaitFailure() } -- cgit v1.2.3 From 24a681ccfa11447ded29653c115b223ba0d5f253 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 15:53:35 +0100 Subject: Add Akka version of Futures TCK. --- test/files/jvm/scala-concurrent-tck-akka.scala | 338 +++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100644 test/files/jvm/scala-concurrent-tck-akka.scala (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck-akka.scala b/test/files/jvm/scala-concurrent-tck-akka.scala new file mode 100644 index 0000000000..9fd709d05e --- /dev/null +++ b/test/files/jvm/scala-concurrent-tck-akka.scala @@ -0,0 +1,338 @@ + + +import akka.dispatch.{ + Future => future, + Promise => promise +} + +import scala.concurrent.{ + TimeoutException, + SyncVar, + ExecutionException +} +//import scala.concurrent.future +//import scala.concurrent.promise +//import scala.concurrent.await + + + +trait TestBase { + + implicit val disp = akka.actor.ActorSystem().dispatcher + + def once(body: (() => Unit) => Unit) { + val sv = new SyncVar[Boolean] + body(() => sv put true) + sv.take() + } + +} + + +trait FutureCallbacks extends TestBase { + + def testOnSuccess(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { case any => + done() + assert(x == 1) + } + } + + def testOnSuccessWhenCompleted(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { case any => + assert(x == 1) + x = 2 + f onSuccess { case any => + assert(x == 2) + done() + } + } + } + + def testOnSuccessWhenFailed(): Unit = once { + done => + val f = future[Unit] { + done() + throw new Exception + } + f onSuccess { case any => + assert(false) + } + } + + def testOnFailure(): Unit = once { + done => + var x = 0 + val f = future[Unit] { + x = 1 + throw new Exception + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case _ => + done() + assert(x == 1) + } + } + + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { + done => + val f = future[Unit] { + throw cause + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case e: ExecutionException if (e.getCause == cause) => + done() + case _ => + done() + assert(false) + } + } + + def testOnFailureWhenTimeoutException(): Unit = once { + done => + val f = future[Unit] { + throw new TimeoutException() + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case e: TimeoutException => + done() + case other => + done() + assert(false) + } + } + + testOnSuccess() + testOnSuccessWhenCompleted() + testOnSuccessWhenFailed() + testOnFailure() + testOnFailureWhenSpecialThrowable(5, new Error) + testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) + testOnFailureWhenSpecialThrowable(7, new InterruptedException) + testOnFailureWhenTimeoutException() + +} + + +trait FutureCombinators extends TestBase { + + // map: stub + def testMapSuccess(): Unit = once { + done => + done() + } + + def testMapFailure(): Unit = once { + done => + done() + } + + // flatMap: stub + def testFlatMapSuccess(): Unit = once { + done => + done() + } + + def testFlatMapFailure(): Unit = once { + done => + done() + } + + // filter: stub + def testFilterSuccess(): Unit = once { + done => + done() + } + + def testFilterFailure(): Unit = once { + done => + done() + } + + // foreach: stub + def testForeachSuccess(): Unit = once { + done => + done() + } + + def testForeachFailure(): Unit = once { + done => + done() + } + + // recover: stub + def testRecoverSuccess(): Unit = once { + done => + done() + } + + def testRecoverFailure(): Unit = once { + done => + done() + } + + testMapSuccess() + testMapFailure() + testFlatMapSuccess() + testFlatMapFailure() + testFilterSuccess() + testFilterFailure() + testForeachSuccess() + testForeachFailure() + testRecoverSuccess() + testRecoverFailure() + +} + +/* +trait FutureProjections extends TestBase { + + def testFailedFailureOnComplete(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onComplete { + case Right(t) => + assert(t == cause) + done() + case Left(t) => + assert(false) + } + } + + def testFailedFailureOnSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onSuccess { + t => + assert(t == cause) + done() + } + } + + def testFailedSuccessOnComplete(): Unit = once { + done => + val f = future { 0 } + f.failed onComplete { + case Right(t) => + assert(false) + case Left(t) => + assert(t.isInstanceOf[NoSuchElementException]) + done() + } + } + + def testFailedSuccessOnFailure(): Unit = once { + done => + val f = future { 0 } + f.failed onFailure { + case nsee: NoSuchElementException => + done() + } + } + + def testFailedFailureAwait(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + assert(await(0, f.failed) == cause) + done() + } + + def testFailedSuccessAwait(): Unit = once { + done => + val f = future { 0 } + try { + println(await(0, f.failed)) + assert(false) + } catch { + case nsee: NoSuchElementException => done() + } + } + + testFailedFailureOnComplete() + testFailedFailureOnSuccess() + testFailedSuccessOnComplete() + testFailedSuccessOnFailure() + testFailedFailureAwait() + //testFailedSuccessAwait() + +} +*/ + +trait Blocking extends TestBase { + + // TODO + +} + +/* +trait Promises extends TestBase { + + def testSuccess(): Unit = once { + done => + val p = promise[Int]() + val f = p.future + + f.onSuccess { x => + done() + assert(x == 5) + } onFailure { case any => + done() + assert(false) + } + + p.success(5) + } + + testSuccess() + +} +*/ + +trait Exceptions extends TestBase { + +} + + +object Test +extends App +with FutureCallbacks +with FutureCombinators +/*with FutureProjections*/ +/*with Promises*/ +with Exceptions +{ + +} + + -- cgit v1.2.3 From a458396b461669129d28f45e92265560a584619b Mon Sep 17 00:00:00 2001 From: aleksandar Date: Tue, 13 Dec 2011 16:53:33 +0100 Subject: Change promise method signature. --- src/library/scala/concurrent/package.scala | 2 +- test/files/jvm/scala-concurrent-tck.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 0cdb52fb69..33e1b65993 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -55,7 +55,7 @@ package object concurrent { def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = execCtx future body - def promise[T](implicit execCtx: ExecutionContext = executionContext): Promise[T] = + def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] = execCtx promise /** Used to block on a piece of code which potentially blocks. diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index ffe23de756..7d73e6cf7b 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -320,7 +320,7 @@ trait Promises extends TestBase { def testSuccess(): Unit = once { done => - val p = promise[Int] + val p = promise[Int]() val f = p.future f.onSuccess { x => -- cgit v1.2.3 From 07aa03f9425afa4c91fdfc51070f3162f876abd1 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 18:24:31 +0100 Subject: Add tests for blocking/await to Akka version of the TCK --- test/files/jvm/scala-concurrent-tck-akka.scala | 43 ++++++++++++++++++++++---- 1 file changed, 37 insertions(+), 6 deletions(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck-akka.scala b/test/files/jvm/scala-concurrent-tck-akka.scala index 9fd709d05e..1ef248bda3 100644 --- a/test/files/jvm/scala-concurrent-tck-akka.scala +++ b/test/files/jvm/scala-concurrent-tck-akka.scala @@ -4,6 +4,12 @@ import akka.dispatch.{ Future => future, Promise => promise } +import akka.dispatch.Await.{result => await} + +// Duration required for await +import akka.util.Duration +import java.util.concurrent.TimeUnit +import TimeUnit._ import scala.concurrent.{ TimeoutException, @@ -128,10 +134,10 @@ trait FutureCallbacks extends TestBase { testOnSuccessWhenCompleted() testOnSuccessWhenFailed() testOnFailure() - testOnFailureWhenSpecialThrowable(5, new Error) - testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) - testOnFailureWhenSpecialThrowable(7, new InterruptedException) - testOnFailureWhenTimeoutException() +// testOnFailureWhenSpecialThrowable(5, new Error) +// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) +// testOnFailureWhenSpecialThrowable(7, new InterruptedException) +// testOnFailureWhenTimeoutException() } @@ -291,7 +297,31 @@ trait FutureProjections extends TestBase { trait Blocking extends TestBase { - // TODO + def testAwaitSuccess(): Unit = once { + done => + val f = future { 0 } + await(f, Duration(500, "ms")) + done() + } + + def testAwaitFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + try { + await(f, Duration(500, "ms")) + assert(false) + } catch { + case t => + assert(t == cause) + done() + } + } + + testAwaitSuccess() + testAwaitFailure() } @@ -330,9 +360,10 @@ with FutureCallbacks with FutureCombinators /*with FutureProjections*/ /*with Promises*/ +with Blocking with Exceptions { - + System.exit(0) } -- cgit v1.2.3 From 2d3731c69b69e6a3c2d68c0b18ffa803709d36cd Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 18:26:54 +0100 Subject: Force output of exception in assert. Add tests for recover. --- test/files/jvm/scala-concurrent-tck.scala | 35 +++++++++++++++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 7d73e6cf7b..bf3fa57da9 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -22,6 +22,14 @@ trait TestBase { sv.take() } + def assert(cond: => Boolean) { + try { + Predef.assert(cond) + } catch { + case e => e.printStackTrace() + } + } + } @@ -181,12 +189,35 @@ trait FutureCombinators extends TestBase { // recover: stub def testRecoverSuccess(): Unit = once { done => - done() + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case re: RuntimeException => + "recovered" + } onSuccess { x => + done() + assert(x == "recovered") + } onFailure { case any => + done() + assert(false) + } } def testRecoverFailure(): Unit = once { done => - done() + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case te: TimeoutException => "timeout" + } onSuccess { x => + done() + assert(false) + } onFailure { case any => + done() + assert(any == cause) + } } testMapSuccess() -- cgit v1.2.3 From 500f451ecef9d64814025b260ae1ab7adc541d75 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 18:33:12 +0100 Subject: Add test for recover to Akka TCK --- test/files/jvm/scala-concurrent-tck-akka.scala | 28 +++++++++++++++++++++++--- test/files/jvm/scala-concurrent-tck.scala | 1 - 2 files changed, 25 insertions(+), 4 deletions(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck-akka.scala b/test/files/jvm/scala-concurrent-tck-akka.scala index 1ef248bda3..dfd906e59e 100644 --- a/test/files/jvm/scala-concurrent-tck-akka.scala +++ b/test/files/jvm/scala-concurrent-tck-akka.scala @@ -188,15 +188,37 @@ trait FutureCombinators extends TestBase { done() } - // recover: stub def testRecoverSuccess(): Unit = once { done => - done() + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case re: RuntimeException => + "recovered" + } onSuccess { case x => + done() + assert(x == "recovered") + } onFailure { case any => + done() + assert(false) + } } def testRecoverFailure(): Unit = once { done => - done() + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case te: TimeoutException => "timeout" + } onSuccess { case x => + done() + assert(false) + } onFailure { case any => + done() + assert(any == cause) + } } testMapSuccess() diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index bf3fa57da9..4261638e1f 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -186,7 +186,6 @@ trait FutureCombinators extends TestBase { done() } - // recover: stub def testRecoverSuccess(): Unit = once { done => val cause = new RuntimeException -- cgit v1.2.3 From 5a4b555c378d79d57b59dfd6edafd2b9a59866bb Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 13 Dec 2011 18:37:31 +0100 Subject: Make base TCK more uniform with Akka TCK --- test/files/jvm/scala-concurrent-tck.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 4261638e1f..ccf1162e19 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -12,6 +12,7 @@ import scala.concurrent.future import scala.concurrent.promise import scala.concurrent.await +import scala.util.Duration trait TestBase { @@ -320,7 +321,7 @@ trait Blocking extends TestBase { def testAwaitSuccess(): Unit = once { done => val f = future { 0 } - await(0, f) + await(Duration(500, "ms"), f) done() } @@ -331,7 +332,7 @@ trait Blocking extends TestBase { throw cause } try { - await(0, f) + await(Duration(500, "ms"), f) assert(false) } catch { case t => @@ -382,7 +383,7 @@ with FutureProjections with Promises with Exceptions { - + System.exit(0) } -- cgit v1.2.3 From d595324efe2be1c552bad8201aaef9ce383e5c95 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 12 Jan 2012 16:20:25 +0100 Subject: Refactor await calls for awaitable objects to ready and result calls. --- src/library/scala/concurrent/Future.scala | 123 ++++++++++--------------- src/library/scala/concurrent/akka/Future.scala | 16 ++++ src/library/scala/concurrent/package.scala | 15 ++- test/files/jvm/scala-concurrent-tck.scala | 86 +++++++++-------- 4 files changed, 123 insertions(+), 117 deletions(-) create mode 100644 src/library/scala/concurrent/akka/Future.scala (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index ada6736132..748d08be9f 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -1,29 +1,32 @@ - -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ package scala.concurrent -//import akka.AkkaException (replaced with Exception) -//import akka.event.Logging.Error (removed all logging) + + import scala.util.{ Timeout, Duration } import scala.Option -//import akka.japi.{ Procedure, Function ⇒ JFunc, Option ⇒ JOption } (commented methods) import java.util.concurrent.{ ConcurrentLinkedQueue, TimeUnit, Callable } -import java.util.concurrent.TimeUnit.{ NANOSECONDS ⇒ NANOS, MILLISECONDS ⇒ MILLIS } -import java.lang.{ Iterable ⇒ JIterable } -import java.util.{ LinkedList ⇒ JLinkedList } +import java.util.concurrent.TimeUnit.{ NANOSECONDS => NANOS, MILLISECONDS ⇒ MILLIS } +import java.lang.{ Iterable => JIterable } +import java.util.{ LinkedList => JLinkedList } import scala.annotation.tailrec import scala.collection.mutable.Stack -//import akka.util.Switch (commented method) -import java.{ lang ⇒ jl } +import java.{ lang => jl } import java.util.concurrent.atomic.{ AtomicReferenceFieldUpdater, AtomicInteger, AtomicBoolean } import scala.collection.mutable.Builder import scala.collection.generic.CanBuildFrom + + /** The trait that represents futures. * * @define multipleCallbacks @@ -95,20 +98,6 @@ self => case Right(v) => // do nothing } - /* To be removed - /** When this future times out, apply the provided function. - * - * If the future has already timed out, - * this will either be applied immediately or be scheduled asynchronously. - * - * $multipleCallbacks - */ - def onTimeout[U](callback: FutureTimeoutException => U): this.type = onComplete { - case Left(te: FutureTimeoutException) => callback(te) - case Right(v) => // do nothing - } - */ - /** When this future is completed, either through an exception, a timeout, or a value, * apply the provided function. * @@ -130,14 +119,6 @@ self => */ def newPromise[S]: Promise[S] = executionContext promise - /* - /** Tests whether this `Future`'s timeout has expired. - * - * $futureTimeout - */ - def isTimedout: Boolean - */ - /* Projections */ @@ -176,46 +157,6 @@ self => new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v) } - /* - /** A timed out projection of this future. - * - * The timed out projection is a future holding a value of type `FutureTimeoutException`. - * - * It is completed with a value which is a `FutureTimeoutException` of the original future - * in case the original future is timed out. - * - * It is failed with a `NoSuchElementException` if the original future is completed successfully. - * It is failed with the original exception otherwise. - * - * Blocking on this future returns a value only if the original future timed out, and a - * corresponding exception otherwise. - */ - def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] { - def executionContext = self.executionContext - def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = { - self.onComplete { - case Left(te: FutureTimeoutException) => func(Right(te)) - case Left(t) => func(Left(noSuchElemThrowable(t))) - case Right(v) => func(Left(noSuchElemValue(v))) - } - this - } - def isTimedout = self.isTimedout - def block()(implicit canblock: CanBlock) = try { - val res = self.block() - throw noSuchElemValue(res) - } catch { - case ft: FutureTimeoutException => - ft - case t: Throwable => - throw noSuchElemThrowable(t) - } - private def noSuchElemValue(v: T) = - new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v) - private def noSuchElemThrowable(v: Throwable) = - new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v) - } - */ /* Monadic operations */ @@ -299,7 +240,7 @@ self => * Example: * {{{ * val f = future { 5 } - * val g = g filter { _ % 2 == 1 } + * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } * block on g // evaluates to 5 * block on h // throw a NoSuchElementException @@ -316,6 +257,38 @@ self => p.future } + /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. + * + * + * If the current future contains a value for which the partial function is defined, the new future will also hold that value. + * Otherwise, the resulting future will fail with a `NoSuchElementException`. + * + * If the current future fails or times out, the resulting future also fails or times out, respectively. + * + * Example: + * {{{ + * val f = future { -5 } + * val g = f collect { + * case x if x < 0 => -x + * } + * val h = f collect { + * case x if x > 0 => x * 2 + * } + * block on g // evaluates to 5 + * block on h // throw a NoSuchElementException + * }}} + */ + def collect[S](pf: PartialFunction[T, S]): Future[S] = { + val p = newPromise[S] + + onComplete { + case Left(t) => p failure t + case Right(v) => if (pf.isDefinedAt(v)) p success pf(v) else p failure new NoSuchElementException("Future.collect partial function is not defined at: " + v) + } + + p.future + } + } diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala new file mode 100644 index 0000000000..e359456736 --- /dev/null +++ b/src/library/scala/concurrent/akka/Future.scala @@ -0,0 +1,16 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package akka + + + + + + diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 61137fbc6e..666e12456d 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -67,9 +67,9 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](timeout: Timeout)(body: =>T): T = await(timeout, new Awaitable[T] { + def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] { def await(timeout: Timeout)(implicit cb: CanAwait) = body - }) + }, atMost) /** Blocks on a blockable object. * @@ -80,13 +80,18 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](timeout: Timeout, awaitable: Awaitable[T]): T = { + def result[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { - case null => awaitable.await(timeout)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(timeout, awaitable) // inside an execution context thread + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(atMost, awaitable) // inside an execution context thread } } + def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { + result(awaitable, atMost) + awaitable + } + } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index ccf1162e19..d62561c92d 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -11,6 +11,8 @@ import scala.concurrent.{ import scala.concurrent.future import scala.concurrent.promise import scala.concurrent.await +import scala.concurrent.result +import scala.concurrent.ready import scala.util.Duration @@ -42,9 +44,10 @@ trait FutureCallbacks extends TestBase { val f = future { x = 1 } - f onSuccess { _ => - done() - assert(x == 1) + f onSuccess { + case _ => + done() + assert(x == 1) } } @@ -54,12 +57,14 @@ trait FutureCallbacks extends TestBase { val f = future { x = 1 } - f onSuccess { _ => + f onSuccess { + case _ => assert(x == 1) x = 2 - f onSuccess { _ => - assert(x == 2) - done() + f onSuccess { + case _ => + assert(x == 2) + done() } } } @@ -70,8 +75,8 @@ trait FutureCallbacks extends TestBase { done() throw new Exception } - f onSuccess { _ => - assert(false) + f onSuccess { + case _ => assert(false) } } @@ -82,9 +87,10 @@ trait FutureCallbacks extends TestBase { x = 1 throw new Exception } - f onSuccess { _ => - done() - assert(false) + f onSuccess { + case _ => + done() + assert(false) } f onFailure { case _ => @@ -98,9 +104,10 @@ trait FutureCallbacks extends TestBase { val f = future[Unit] { throw cause } - f onSuccess { _ => - done() - assert(false) + f onSuccess { + case _ => + done() + assert(false) } f onFailure { case e: ExecutionException if (e.getCause == cause) => @@ -116,9 +123,10 @@ trait FutureCallbacks extends TestBase { val f = future[Unit] { throw new TimeoutException() } - f onSuccess { _ => - done() - assert(false) + f onSuccess { + case _ => + done() + assert(false) } f onFailure { case e: TimeoutException => @@ -195,9 +203,10 @@ trait FutureCombinators extends TestBase { } recover { case re: RuntimeException => "recovered" - } onSuccess { x => - done() - assert(x == "recovered") + } onSuccess { + case x => + done() + assert(x == "recovered") } onFailure { case any => done() assert(false) @@ -211,9 +220,10 @@ trait FutureCombinators extends TestBase { throw cause } recover { case te: TimeoutException => "timeout" - } onSuccess { x => - done() - assert(false) + } onSuccess { + case x => + done() + assert(false) } onFailure { case any => done() assert(any == cause) @@ -258,9 +268,9 @@ trait FutureProjections extends TestBase { throw cause } f.failed onSuccess { - t => - assert(t == cause) - done() + case t => + assert(t == cause) + done() } } @@ -291,7 +301,7 @@ trait FutureProjections extends TestBase { val f = future { throw cause } - assert(await(0, f.failed) == cause) + assert(result(f.failed, Duration(500, "ms")) == cause) done() } @@ -299,7 +309,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - await(0, f.failed) + ready(f.failed, Duration(0, "ms")) assert(false) } catch { case nsee: NoSuchElementException => done() @@ -321,7 +331,7 @@ trait Blocking extends TestBase { def testAwaitSuccess(): Unit = once { done => val f = future { 0 } - await(Duration(500, "ms"), f) + ready(f, Duration(500, "ms")) done() } @@ -332,7 +342,7 @@ trait Blocking extends TestBase { throw cause } try { - await(Duration(500, "ms"), f) + ready(f, Duration(500, "ms")) assert(false) } catch { case t => @@ -354,12 +364,14 @@ trait Promises extends TestBase { val p = promise[Int]() val f = p.future - f.onSuccess { x => - done() - assert(x == 5) - } onFailure { case any => - done() - assert(false) + f.onSuccess { + case x => + done() + assert(x == 5) + } onFailure { + case any => + done() + assert(false) } p.success(5) -- cgit v1.2.3 From 5d2acb2b3d6b2880ba36f039bbf98c583ce85a21 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Thu, 12 Jan 2012 19:55:50 +0100 Subject: Port of akka Future implementation in progress. --- src/library/scala/concurrent/Future.scala | 75 +++++++++-- src/library/scala/concurrent/Promise.scala | 27 +++- src/library/scala/concurrent/akka/Future.scala | 177 ++++++++++++++++++++++++- src/library/scala/concurrent/package.scala | 6 + test/files/jvm/scala-concurrent-tck.scala | 15 ++- 5 files changed, 283 insertions(+), 17 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 748d08be9f..d074dbfaaa 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -61,6 +61,9 @@ import scala.collection.generic.CanBuildFrom * {{{ * f flatMap { (x: Int) => g map { (y: Int) => x + y } } * }}} + * + * @define nonDeterministic + * Note: using this method yields nondeterministic dataflow programs. */ trait Future[+T] extends Awaitable[T] { self => @@ -113,11 +116,11 @@ self => /** The execution context of the future. */ - def executionContext: ExecutionContext + def executor: ExecutionContext /** Creates a new promise. */ - def newPromise[S]: Promise[S] = executionContext promise + def newPromise[S]: Promise[S] = executor promise /* Projections */ @@ -135,7 +138,7 @@ self => * and throws a corresponding exception if the original future fails. */ def failed: Future[Throwable] = new Future[Throwable] { - def executionContext = self.executionContext + def executor = self.executor def onComplete[U](func: Either[Throwable, Throwable] => U) = { self.onComplete { case Left(t) => func(Right(t)) @@ -242,8 +245,8 @@ self => * val f = future { 5 } * val g = f filter { _ % 2 == 1 } * val h = f filter { _ % 2 == 0 } - * block on g // evaluates to 5 - * block on h // throw a NoSuchElementException + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException * }}} */ def filter(pred: T => Boolean): Future[T] = { @@ -258,7 +261,6 @@ self => } /** Creates a new future by mapping the value of the current future if the given partial function is defined at that value. - * * * If the current future contains a value for which the partial function is defined, the new future will also hold that value. * Otherwise, the resulting future will fail with a `NoSuchElementException`. @@ -274,8 +276,8 @@ self => * val h = f collect { * case x if x > 0 => x * 2 * } - * block on g // evaluates to 5 - * block on h // throw a NoSuchElementException + * await(0) g // evaluates to 5 + * await(0) h // throw a NoSuchElementException * }}} */ def collect[S](pf: PartialFunction[T, S]): Future[S] = { @@ -289,14 +291,68 @@ self => p.future } + /** Creates a new future which holds the result of this future if it was completed successfully, or, if not, + * the result of the `that` future if `that` is completed successfully. + * If both futures are failed, the resulting future holds the throwable object of the first future. + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to 5 + * }}} + */ + def orElse[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + onComplete { + case Left(t) => that onComplete { + case Left(_) => p failure t + case Right(v) => p success v + } + case Right(v) => p success v + } + + p.future + } + + /** Creates a new future which holds the result of either this future or `that` future, depending on + * which future was completed first. + * + * $nonDeterministic + * + * Example: + * {{{ + * val f = future { sys.error("failed") } + * val g = future { 5 } + * val h = f orElse g + * await(0) h // evaluates to either 5 or throws a runtime exception + * }}} + */ + def or[U >: T](that: Future[U]): Future[U] = { + val p = newPromise[U] + + val completePromise: PartialFunction[Either[Throwable, T], _] = { + case Left(t) => p tryFailure t + case Right(v) => p trySuccess v + } + this onComplete completePromise + this onComplete completePromise + + p.future + } + } object Future { + /* + // TODO make more modular by encoding this within the execution context def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = { val builder = cbf(futures) - val p: Promise[Coll[T]] = executionContext.promise[Coll[T]] + val p: Promise[Coll[T]] = executor.promise[Coll[T]] if (futures.size == 1) futures.head onComplete { case Left(t) => p failure t @@ -317,6 +373,7 @@ object Future { p.future } + */ @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body) diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index aae0135af4..f6ea252f73 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -25,6 +25,9 @@ import scala.util.Timeout * If the throwable used to fail this promise is an error, a control exception * or an interrupted exception, it will be wrapped as a cause within an * `ExecutionException` which will fail the promise. + * + * @define nonDeterministic + * Note: Using this method may result in non-deterministic concurrent programs. */ trait Promise[T] { @@ -38,7 +41,15 @@ trait Promise[T] { * * $promiseCompletion */ - def success(value: T): Unit + def success(v: T): this.type = if (trySuccess(v)) this else throw new IllegalStateException("Promise already completed.") + + /** Tries to complete the promise with a value. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def trySuccess(value: T): Boolean /** Completes the promise with an exception. * @@ -48,8 +59,16 @@ trait Promise[T] { * * $promiseCompletion */ - def failure(t: Throwable): Unit - + def failure(t: Throwable): this.type = if (tryFailure(t)) this else throw new IllegalStateException("Promise already completed.") + + /** Tries to complete the promise with an exception. + * + * $nonDeterministic + * + * @return If the promise has already been completed returns `false`, or `true` otherwise. + */ + def tryFailure(t: Throwable): Boolean + /** Wraps a `Throwable` in an `ExecutionException` if necessary. * * $allowedThrowables @@ -58,7 +77,7 @@ trait Promise[T] { case t: Throwable if isFutureThrowable(t) => t case _ => new ExecutionException(t) } - + } diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala index e359456736..2b41c0c62e 100644 --- a/src/library/scala/concurrent/akka/Future.scala +++ b/src/library/scala/concurrent/akka/Future.scala @@ -1,4 +1,4 @@ -/* __ *\ +/*/* __ *\ ** ________ ___ / / ___ Scala API ** ** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** ** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** @@ -6,11 +6,182 @@ ** |/ ** \* */ -package scala.concurrent -package akka +package scala.concurrent.akka +sealed trait Future[+T] extends scala.concurrent.Future with Awaitable[T] { + + implicit def executor: ExecutionContext + + /** + * For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + */ + def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T ⇒ Future[Any])) + + /** + * Tests whether this Future has been completed. + */ + final def isCompleted: Boolean = value.isDefined + + /** + * The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. + */ + def value: Option[Either[Throwable, T]] + + def onComplete(func: Either[Throwable, T] => Unit): this.type + + /** + * Creates a Future that will be the result of the first completed Future of this and the Future that was passed into this. + * This is semantically the same as: Future.firstCompletedOf(Seq(this, that)) + */ + //FIXME implement as the result of any of the Futures, or if both failed, the first failure + def orElse[A >: T](that: Future[A]): Future[A] = Future.firstCompletedOf(List(this, that)) //TODO Optimize + + final def recover[A >: T](pf: PartialFunction[Throwable, A]): Future[A] = { + val future = Promise[A]() + onComplete { + case Left(e) if pf isDefinedAt e ⇒ future.complete(try { Right(pf(e)) } catch { case x: Exception ⇒ Left(x) }) + case otherwise ⇒ future complete otherwise + } + future + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future. If this Future is completed with an exception then the new + * Future will also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor ? "Hello" // returns 5
+   *   b: String <- actor ? a       // returns "10"
+   *   c: String <- actor ? 7       // returns "14"
+   * } yield b + "-" + c
+   * 
+ */ + final def map[A](f: T ⇒ A): Future[A] = { + val future = Promise[A]() + onComplete { + case l: Left[_, _] ⇒ future complete l.asInstanceOf[Either[Throwable, A]] + case Right(res) ⇒ + future complete (try { + Right(f(res)) + } catch { + case e ⇒ + logError("Future.map", e) + Left(e) + }) + } + future + } + + /** + * Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[A](implicit m: Manifest[A]): Future[A] = { + val fa = Promise[A]() + onComplete { + case l: Left[_, _] ⇒ fa complete l.asInstanceOf[Either[Throwable, A]] + case Right(t) ⇒ + fa complete (try { + Right(BoxedType(m.erasure).cast(t).asInstanceOf[A]) + } catch { + case e: ClassCastException ⇒ Left(e) + }) + } + fa + } + + /** + * Creates a new Future by applying a function to the successful result of + * this Future, and returns the result of the function as the new Future. + * If this Future is completed with an exception then the new Future will + * also contain this exception. + * Example: + *
+   * val future1 = for {
+   *   a: Int    <- actor ? "Hello" // returns 5
+   *   b: String <- actor ? a       // returns "10"
+   *   c: String <- actor ? 7       // returns "14"
+   * } yield b + "-" + c
+   * 
+ */ + final def flatMap[A](f: T ⇒ Future[A]): Future[A] = { + val p = Promise[A]() + + onComplete { + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, A]] + case Right(r) ⇒ + try { + p completeWith f(r) + } catch { + case e ⇒ + p complete Left(e) + logError("Future.flatMap", e) + } + } + p + } + + /** + * Same as onSuccess { case r => f(r) } but is also used in for-comprehensions + */ + final def foreach(f: T ⇒ Unit): Unit = onComplete { + case Right(r) ⇒ f(r) + case _ ⇒ + } + + /** + * Used by for-comprehensions + */ + final def withFilter(p: T ⇒ Boolean) = new FutureWithFilter[T](this, p) + + final class FutureWithFilter[+A](self: Future[A], p: A ⇒ Boolean) { + def foreach(f: A ⇒ Unit): Unit = self filter p foreach f + def map[B](f: A ⇒ B): Future[B] = self filter p map f + def flatMap[B](f: A ⇒ Future[B]): Future[B] = self filter p flatMap f + def withFilter(q: A ⇒ Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) + } + + /** + * Returns a new Future that will hold the successful result of this Future if it matches + * the given predicate, if it doesn't match, the resulting Future will be a failed Future + * with a MatchError, of if this Future fails, that failure will be propagated to the returned Future + */ + final def filter(pred: T ⇒ Boolean): Future[T] = { + val p = Promise[T]() + onComplete { + case l: Left[_, _] ⇒ p complete l.asInstanceOf[Either[Throwable, T]] + case r @ Right(res) ⇒ p complete (try { + if (pred(res)) r else Left(new MatchError(res)) + } catch { + case e ⇒ + logError("Future.filter", e) + Left(e) + }) + } + p + } + + protected def logError(msg: String, problem: Throwable): Unit = { + executor match { + case m: MessageDispatcher ⇒ m.prerequisites.eventStream.publish(Error(problem, msg, problem.getMessage)) + case other ⇒ problem.printStackTrace() + } + } +} + + + +*/ diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 666e12456d..c35ece5668 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -50,6 +50,12 @@ package object concurrent { case _ => true } + private[concurrent] def resolveThrowable[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { + case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) + case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) + case _ => source + } + /* concurrency constructs */ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] = diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index d62561c92d..abd363cedf 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -184,6 +184,17 @@ trait FutureCombinators extends TestBase { done() } + // collect: stub + def testCollectSuccess(): Unit = once { + done => + done() + } + + def testCollectFailure(): Unit = once { + done => + done() + } + // foreach: stub def testForeachSuccess(): Unit = once { done => @@ -229,13 +240,15 @@ trait FutureCombinators extends TestBase { assert(any == cause) } } - + testMapSuccess() testMapFailure() testFlatMapSuccess() testFlatMapFailure() testFilterSuccess() testFilterFailure() + testCollectSuccess() + testCollectFailure() testForeachSuccess() testForeachFailure() testRecoverSuccess() -- cgit v1.2.3 From 7993ec04baf28cd12009d15979c2c904afad89d3 Mon Sep 17 00:00:00 2001 From: aleksandar Date: Fri, 13 Jan 2012 19:32:48 +0100 Subject: Migrate akka promises. Changes to some of the interfaces. --- src/library/scala/concurrent/Awaitable.scala | 7 +- .../scala/concurrent/ExecutionContext.scala | 2 +- src/library/scala/concurrent/Future.scala | 24 ++- src/library/scala/concurrent/Promise.scala | 18 +-- .../scala/concurrent/akka/AbstractPromise.java | 21 +++ src/library/scala/concurrent/akka/Promise.scala | 165 ++++++++++++++++++++- src/library/scala/concurrent/akka/package.scala | 3 + .../scala/concurrent/default/TaskImpl.scala | 17 +-- src/library/scala/concurrent/package.scala | 32 ++-- test/files/jvm/scala-concurrent-tck.scala | 12 +- 10 files changed, 250 insertions(+), 51 deletions(-) create mode 100644 src/library/scala/concurrent/akka/AbstractPromise.java (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala index 52fd3b9516..c38e668f30 100644 --- a/src/library/scala/concurrent/Awaitable.scala +++ b/src/library/scala/concurrent/Awaitable.scala @@ -11,15 +11,14 @@ package scala.concurrent import scala.annotation.implicitNotFound -import scala.util.Timeout +import scala.util.Duration trait Awaitable[+T] { - @implicitNotFound(msg = "Waiting must be done by calling `await(timeout) b`, where `b` is the `Awaitable` object.") - def await(timeout: Timeout)(implicit canawait: CanAwait): T + @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.") + def await(atMost: Duration)(implicit canawait: CanAwait): T } - diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index ebeeca995e..38a28044e1 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -23,7 +23,7 @@ trait ExecutionContext { def future[T](body: => T): Future[T] /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](timeout: Timeout, body: Awaitable[T]): T + def blockingCall[T](body: Awaitable[T]): T } diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index 4002239fc4..e6edaea87a 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -28,6 +28,18 @@ import scala.collection.generic.CanBuildFrom /** The trait that represents futures. + * + * Asynchronous computations that yield futures are created with the `future` call: + * + * {{{ + * val s = "Hello" + * val f: Future[String] = future { + * s + " future!" + * } + * f onSuccess { + * case msg => println(msg) + * } + * }}} * * @define multipleCallbacks * Multiple callbacks may be registered; there is no guarantee that they will be @@ -37,12 +49,14 @@ import scala.collection.generic.CanBuildFrom * The future may contain a throwable object and this means that the future failed. * Futures obtained through combinators have the same exception as the future they were obtained from. * The following throwable objects are not contained in the future: - * - Error - errors are not contained within futures - * - scala.util.control.ControlThrowable - not contained within futures - * - InterruptedException - not contained within futures + * - `Error` - errors are not contained within futures + * - `InterruptedException` - not contained within futures + * - all `scala.util.control.ControlThrowable` except `NonLocalReturnControl` - not contained within futures * * Instead, the future is completed with a ExecutionException with one of the exceptions above * as the cause. + * If a future is failed with a `scala.runtime.NonLocalReturnControl`, + * it is completed with a value instead from that throwable instead instead. * * @define forComprehensionExamples * Example: @@ -146,10 +160,10 @@ self => } this } - def await(timeout: Timeout)(implicit canawait: CanAwait): Throwable = { + def await(atMost: Duration)(implicit canawait: CanAwait): Throwable = { var t: Throwable = null try { - val res = self.await(timeout) + val res = self.await(atMost) t = noSuchElem(res) } catch { case t: Throwable => return t diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala index e5557ae1c3..c3fa92053b 100644 --- a/src/library/scala/concurrent/Promise.scala +++ b/src/library/scala/concurrent/Promise.scala @@ -78,7 +78,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def trySuccess(value: T): Boolean + def trySuccess(value: T): Boolean = tryComplete(Right(value)) /** Completes the promise with an exception. * @@ -96,7 +96,7 @@ trait Promise[T] { * * @return If the promise has already been completed returns `false`, or `true` otherwise. */ - def tryFailure(t: Throwable): Boolean + def tryFailure(t: Throwable): Boolean = tryComplete(Left(t)) /** Wraps a `Throwable` in an `ExecutionException` if necessary. * @@ -112,15 +112,7 @@ trait Promise[T] { object Promise { - /* - /** - * Creates a non-completed, new, Promise with the supplied timeout in milliseconds - */ - def apply[A](timeout: Timeout)(implicit dispatcher: MessageDispatcher): Promise[A] = DefaultPromise[A](timeout) - - /** - * Creates a non-completed, new, Promise with the default timeout (akka.actor.timeout in conf) - */ - def apply[A]()(implicit dispatcher: MessageDispatcher, timeout: Timeout): Promise[A] = apply(timeout) - */ + + + } diff --git a/src/library/scala/concurrent/akka/AbstractPromise.java b/src/library/scala/concurrent/akka/AbstractPromise.java new file mode 100644 index 0000000000..38c74edf2f --- /dev/null +++ b/src/library/scala/concurrent/akka/AbstractPromise.java @@ -0,0 +1,21 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.akka; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class AbstractPromise { + private volatile Object _ref = null; + protected final static AtomicReferenceFieldUpdater updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index a47dee48e2..d3b93b9573 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -10,14 +10,19 @@ package scala.concurrent.akka -import scala.concurrent.{ExecutionContext, resolver} +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.{Awaitable, ExecutionContext, resolver, blocking, CanAwait, TimeoutException} import scala.util.continuations._ +import scala.util.Duration +import scala.annotation.tailrec trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { // TODO refine answer and return types here from Any to type parameters + // then move this up in the hierarchy final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => @@ -61,3 +66,161 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { } + +object Promise { + + def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] + + /** Represents the internal state. + */ + sealed trait FState[+T] { def value: Option[Either[Throwable, T]] } + + case class Pending[T](listeners: List[Either[Throwable, T] ⇒ Unit] = Nil) extends FState[T] { + def value: Option[Either[Throwable, T]] = None + } + + case class Success[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def result: T = value.get.right.get + } + + case class Failure[T](value: Option[Either[Throwable, T]] = None) extends FState[T] { + def exception: Throwable = value.get.left.get + } + + private val emptyPendingValue = Pending[Nothing](Nil) + + /* default promise implementation */ + abstract class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] { + self => + + updater.set(this, Promise.EmptyPending()) + + protected final def tryAwait(atMost: Duration): Boolean = { + @tailrec + def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOSECONDS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec + val start = System.nanoTime() + try { synchronized { if (value.isEmpty) wait(ms, ns) } } catch { case e: InterruptedException ⇒ } + + awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + } else + value.isDefined + } + + executor.blockingCall(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost)))) + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + if (value.isDefined || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + + def await(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { + case Left(e) => throw e + case Right(r) => r + } + + def value: Option[Either[Throwable, T]] = getState.value + + @inline + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + + @inline + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + + @inline + protected final def getState: FState[T] = updater.get(this) + + /* + def tryComplete(value: Either[Throwable, T]): Boolean = { + val callbacks: List[Either[Throwable, T] => Unit] = { + try { + @tailrec + def tryComplete(v: Either[Throwable, T]): List[Either[Throwable, T] => Unit] = { + getState match { + case cur @ Pending(listeners) => + if (updateState(cur, if (v.isLeft) Failure(Some(v)) else Success(Some(v)))) listeners + else tryComplete(v) + case _ => null + } + } + tryComplete(resolve(value)) + } finally { + synchronized { notifyAll() } //Notify any evil blockers + } + } + + callbacks match { + case null => false + case cs if cs.isEmpty => true + case cs => Future.dispatchTask(() => cs.foreach(f => notifyCompleted(f, value))); true + } + } + + def onComplete(func: Either[Throwable, T] => Unit): this.type = { + @tailrec //Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = getState + cur match { + case _: Success[_] | _: Failure[_] => true + case p: Pending[_] => + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + } + } + + if (tryAddCallback()) { + val result = value.get + Future.dispatchTask(() => notifyCompleted(func, result)) + } + + this + } + + private final def notifyCompleted(func: Either[Throwable, T] => Unit, result: Either[Throwable, T]) { + try { func(result) } catch { case e => logError("Future onComplete-callback raised an exception", e) } + } + */ + } + + /* + /** + * An already completed Future is seeded with it's result at creation, is useful for when you are participating in + * a Future-composition but you already have a value to contribute. + */ + final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] { + val value = Some(resolve(suppliedValue)) + + def tryComplete(value: Either[Throwable, T]): Boolean = false + def onComplete(func: Either[Throwable, T] => Unit): this.type = { + val completedAs = value.get + Future dispatchTask (() => func(completedAs)) + this + } + + def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + case Left(e) => throw e + case Right(r) => r + } + } + */ +} + + + + + + + + + + + + + + + + diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala index 59eda5a3b4..8c059b8e71 100644 --- a/src/library/scala/concurrent/akka/package.scala +++ b/src/library/scala/concurrent/akka/package.scala @@ -11,6 +11,7 @@ package scala.concurrent import java.{lang => jl} +import scala.util.Duration @@ -31,6 +32,8 @@ package object akka { if (c.isPrimitive) toBoxed(c) else c } + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue + } diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index 716b9c02f1..a38541df5d 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -5,7 +5,7 @@ package default import java.util.concurrent.atomic.AtomicReferenceFieldUpdater import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } -import scala.util.{ Timeout, Duration } +import scala.util.Duration import scala.annotation.tailrec @@ -90,7 +90,7 @@ extends Promise[T] with Future[T] with Completable[T] { case Right(v) => trySuccess(v) } - def trySuccess(value: T): Boolean = { + override def trySuccess(value: T): Boolean = { val cbs = tryCompleteState(Success(value)) if (cbs == null) false @@ -103,7 +103,7 @@ extends Promise[T] with Future[T] with Completable[T] { } } - def tryFailure(t: Throwable): Boolean = { + override def tryFailure(t: Throwable): Boolean = { val wrapped = wrap(t) val cbs = tryCompleteState(Failure(wrapped)) if (cbs == null) @@ -117,7 +117,7 @@ extends Promise[T] with Future[T] with Completable[T] { } } - def await(timeout: Timeout)(implicit canawait: scala.concurrent.CanAwait): T = getState match { + def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { case Success(res) => res case Failure(t) => throw t case _ => @@ -191,7 +191,7 @@ extends RecursiveAction with Task[T] with Future[T] with Completable[T] { def tryCancel(): Unit = tryUnfork() - def await(timeout: Timeout)(implicit canawait: CanAwait): T = { + def await(atMost: Duration)(implicit canawait: CanAwait): T = { join() // TODO handle timeout also (updater.get(this): @unchecked) match { case Success(r) => r @@ -257,17 +257,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { def promise[T]: Promise[T] = new PromiseImpl[T](this) - // TODO fix the timeout - def blockingCall[T](timeout: Timeout, b: Awaitable[T]): T = b match { + def blockingCall[T](b: Awaitable[T]): T = b match { case fj: TaskImpl[_] if fj.executor.pool eq pool => - fj.await(timeout) + fj.await(Duration.fromNanos(0)) case _ => var res: T = null.asInstanceOf[T] @volatile var blockingDone = false // TODO add exception handling here! val mb = new ForkJoinPool.ManagedBlocker { def block() = { - res = b.await(timeout)(CanAwaitEvidence) + res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) blockingDone = true true } diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index ce22c53c72..7552100af2 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -52,15 +52,17 @@ package object concurrent { private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match { case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T]) + case Left(t: scala.util.control.ControlThrowable) => Left(new ExecutionException("Boxed ControlThrowable", t)) case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t)) - case Left(e: Error) => throw e + case Left(e: Error) => Left(new ExecutionException("Boxed Error", e)) case _ => source } private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = { case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value) + case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t)) case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t)) - case e: Error => throw e + case e: Error => Left(new ExecutionException("Boxed Error", e)) case t => Left(t) } @@ -83,9 +85,12 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def await[T](atMost: Duration)(body: =>T): T = result(new Awaitable[T] { - def await(timeout: Timeout)(implicit cb: CanAwait) = body - }, atMost) + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + /** Wraps a block of code into an awaitable object. */ + def body2awaitable[T](body: =>T) = new Awaitable[T] { + def await(atMost: Duration)(implicit cb: CanAwait) = body + } /** Blocks on a blockable object. * @@ -96,18 +101,23 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def result[T](awaitable: Awaitable[T], atMost: Duration): T = { + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(atMost, awaitable) // inside an execution context thread + case x => x.blockingCall(awaitable) // inside an execution context thread } } - def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { - result(awaitable, atMost) - awaitable + object await { + def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { + try blocking(awaitable, atMost) + catch { case _ => } + awaitable + } + + def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable, atMost) } - + } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index abd363cedf..a951c09da2 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -10,9 +10,7 @@ import scala.concurrent.{ } import scala.concurrent.future import scala.concurrent.promise -import scala.concurrent.await -import scala.concurrent.result -import scala.concurrent.ready +import scala.concurrent.blocking import scala.util.Duration @@ -314,7 +312,7 @@ trait FutureProjections extends TestBase { val f = future { throw cause } - assert(result(f.failed, Duration(500, "ms")) == cause) + assert(blocking(f.failed, Duration(500, "ms")) == cause) done() } @@ -322,7 +320,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - ready(f.failed, Duration(0, "ms")) + blocking(f.failed, Duration(0, "ms")) assert(false) } catch { case nsee: NoSuchElementException => done() @@ -344,7 +342,7 @@ trait Blocking extends TestBase { def testAwaitSuccess(): Unit = once { done => val f = future { 0 } - ready(f, Duration(500, "ms")) + blocking(f, Duration(500, "ms")) done() } @@ -355,7 +353,7 @@ trait Blocking extends TestBase { throw cause } try { - ready(f, Duration(500, "ms")) + blocking(f, Duration(500, "ms")) assert(false) } catch { case t => -- cgit v1.2.3 From c3d19c58d8a94b7232718321f6994c001257cc96 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Thu, 2 Feb 2012 14:05:26 +0100 Subject: Incorporate Ctrie into standard library. Implemented Ctrie serialization. Improved hashcode computation. --- src/library/scala/collection/mutable/Ctrie.scala | 103 ++++++++++++++++++++--- test/files/jvm/serialization.check | 4 + test/files/jvm/serialization.scala | 7 +- test/files/run/ctries/lnode.scala | 5 +- 4 files changed, 106 insertions(+), 13 deletions(-) (limited to 'test/files/jvm') diff --git a/src/library/scala/collection/mutable/Ctrie.scala b/src/library/scala/collection/mutable/Ctrie.scala index d02e0ce178..84cceb44eb 100644 --- a/src/library/scala/collection/mutable/Ctrie.scala +++ b/src/library/scala/collection/mutable/Ctrie.scala @@ -6,12 +6,14 @@ ** |/ ** \* */ -package scala.collection.mutable +package scala.collection +package mutable import java.util.concurrent.atomic._ import collection.immutable.{ ListMap => ImmutableListMap } +import generic._ import annotation.tailrec import annotation.switch @@ -425,7 +427,7 @@ extends MainNode[K, V] { if (updmap.size > 1) new LNode(updmap) else { val (k, v) = updmap.iterator.next - new TNode(k, v, k.hashCode) // create it tombed so that it gets compressed on subsequent accesses + new TNode(k, v, Ctrie.computeHash(k)) // create it tombed so that it gets compressed on subsequent accesses } } def get(k: K) = listmap.get(k) @@ -568,10 +570,26 @@ private[mutable] case class RDCSS_Descriptor[K, V](old: INode[K, V], expectedmai } -class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef]) +/** A concurrent hash-trie or Ctrie is a concurrent thread-safe lock-free + * implementation of a hash array mapped trie. It is used to implement the + * concurrent map abstraction. It has particularly scalable concurrent insert + * and remove operations and is memory-efficient. It supports O(1), atomic, + * lock-free snapshots which are used to implement linearizable lock-free size, + * iterator and clear operations. The cost of evaluating the (lazy) snapshot is + * distributed across subsequent updates, thus making snapshot evaluation horizontally scalable. + * + * @author Aleksandar Prokopec + * @since 2.10 + */ +@SerialVersionUID(0L - 6402774413839597105L) +final class Ctrie[K, V] private (r: AnyRef, rtupd: AtomicReferenceFieldUpdater[Ctrie[K, V], AnyRef]) extends ConcurrentMap[K, V] + with MapLike[K, V, Ctrie[K, V]] + with Serializable { - private val rootupdater = rtupd + import Ctrie.computeHash + + private var rootupdater = rtupd @volatile var root = r def this() = this( @@ -581,6 +599,31 @@ extends ConcurrentMap[K, V] /* internal methods */ + private def writeObject(out: java.io.ObjectOutputStream) { + val it = iterator + while (it.hasNext) { + val (k, v) = it.next() + out.writeObject(k) + out.writeObject(v) + } + out.writeObject(CtrieSerializationEnd) + } + + private def readObject(in: java.io.ObjectInputStream) { + root = INode.newRootNode + rootupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[Ctrie[K, V]], classOf[AnyRef], "root") + + var obj: AnyRef = null + do { + obj = in.readObject() + if (obj != CtrieSerializationEnd) { + val k = obj.asInstanceOf[K] + val v = in.readObject().asInstanceOf[V] + update(k, v) + } + } while (obj != CtrieSerializationEnd) + } + @inline final def CAS_ROOT(ov: AnyRef, nv: AnyRef) = rootupdater.compareAndSet(this, ov, nv) @inline final def RDCSS_READ_ROOT(abort: Boolean = false): INode[K, V] = { @@ -623,10 +666,6 @@ extends ConcurrentMap[K, V] } else false } - @inline private def computeHash(k: K): Int = { - k.hashCode - } - @tailrec private def inserthc(k: K, hc: Int, v: V) { val r = RDCSS_READ_ROOT() if (!r.rec_insert(k, v, hc, 0, null, r.gen, this)) inserthc(k, hc, v) @@ -647,7 +686,7 @@ extends ConcurrentMap[K, V] else res } - /* + /* slower: //@tailrec private def lookuphc(k: K, hc: Int): AnyRef = { val r = RDCSS_READ_ROOT() @@ -671,10 +710,21 @@ extends ConcurrentMap[K, V] /* public methods */ + override def empty: Ctrie[K, V] = new Ctrie[K, V] + @inline final def isReadOnly = rootupdater eq null @inline final def nonReadOnly = rootupdater ne null + /** Returns a snapshot of this Ctrie. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * in the snapshot or this Ctrie are accessed, they are rewritten. + * This means that the work of rebuilding both the snapshot and this + * Ctrie is distributed across all the threads doing updates or accesses + * subsequent to the snapshot creation. + */ @tailrec final def snapshot(): Ctrie[K, V] = { val r = RDCSS_READ_ROOT() val expmain = r.GCAS_READ(this) @@ -682,6 +732,18 @@ extends ConcurrentMap[K, V] else snapshot() } + /** Returns a read-only snapshot of this Ctrie. + * This operation is lock-free and linearizable. + * + * The snapshot is lazily updated - the first time some branch + * of this Ctrie are accessed, it is rewritten. The work of creating + * the snapshot is thus distributed across subsequent updates + * and accesses on this Ctrie by all threads. + * Note that the snapshot itself is never rewritten unlike when calling + * the `snapshot` method, but the obtained snapshot cannot be modified. + * + * This method is used by other methods such as `size` and `iterator`. + */ @tailrec final def readOnlySnapshot(): collection.Map[K, V] = { val r = RDCSS_READ_ROOT() val expmain = r.GCAS_READ(this) @@ -760,11 +822,25 @@ extends ConcurrentMap[K, V] if (nonReadOnly) readOnlySnapshot().iterator else new CtrieIterator(this) + override def stringPrefix = "Ctrie" + } -object Ctrie { - val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[AnyRef], "mainnode") +object Ctrie extends MutableMapFactory[Ctrie] { + val inodeupdater = AtomicReferenceFieldUpdater.newUpdater(classOf[INodeBase[_, _]], classOf[MainNode[_, _]], "mainnode") + + implicit def canBuildFrom[K, V]: CanBuildFrom[Coll, (K, V), Ctrie[K, V]] = new MapCanBuildFrom[K, V] + + def empty[K, V]: Ctrie[K, V] = new Ctrie[K, V] + + @inline final def computeHash[K](k: K): Int = { + var hcode = k.hashCode + hcode = hcode * 0x9e3775cd + hcode = java.lang.Integer.reverseBytes(hcode) + hcode * 0x9e3775cd + } + } @@ -877,6 +953,11 @@ private[mutable] class CtrieIterator[K, V](ct: Ctrie[K, V], mustInit: Boolean = private[mutable] object RestartException extends util.control.ControlThrowable +/** Only used for ctrie serialization. */ +@SerialVersionUID(0L - 7237891413820527142L) +private[mutable] case object CtrieSerializationEnd + + private[mutable] object Debug { import collection._ diff --git a/test/files/jvm/serialization.check b/test/files/jvm/serialization.check index f58f763a76..cdfc100e0d 100644 --- a/test/files/jvm/serialization.check +++ b/test/files/jvm/serialization.check @@ -192,6 +192,10 @@ x = TreeSet(1, 2, 3) y = TreeSet(1, 2, 3) x equals y: true, y equals x: true +x = Ctrie(1 -> one, 2 -> two, 3 -> three) +y = Ctrie(1 -> one, 2 -> two, 3 -> three) +x equals y: true, y equals x: true + x = xml:src="hello" y = xml:src="hello" x equals y: true, y equals x: true diff --git a/test/files/jvm/serialization.scala b/test/files/jvm/serialization.scala index 73bed2d46b..4e1ff368ab 100644 --- a/test/files/jvm/serialization.scala +++ b/test/files/jvm/serialization.scala @@ -286,7 +286,7 @@ object Test3_mutable { import scala.collection.mutable.{ ArrayBuffer, ArrayBuilder, ArraySeq, ArrayStack, BitSet, DoubleLinkedList, HashMap, HashSet, History, LinkedList, ListBuffer, Publisher, Queue, - Stack, StringBuilder, WrappedArray, TreeSet} + Stack, StringBuilder, WrappedArray, TreeSet, Ctrie} // in alphabetic order try { @@ -385,6 +385,11 @@ object Test3_mutable { val ts1 = TreeSet[Int]() ++= Array(1, 2, 3) val _ts1: TreeSet[Int] = read(write(ts1)) check(ts1, _ts1) + + // Ctrie + val ct1 = Ctrie[Int, String]() ++= Array(1 -> "one", 2 -> "two", 3 -> "three") + val _ct1: Ctrie[Int, String] = read(write(ct1)) + check(ct1, _ct1) } catch { case e: Exception => diff --git a/test/files/run/ctries/lnode.scala b/test/files/run/ctries/lnode.scala index 28da4cc62f..88cbeed1f6 100644 --- a/test/files/run/ctries/lnode.scala +++ b/test/files/run/ctries/lnode.scala @@ -25,7 +25,10 @@ object LNodeSpec extends Spec { "remove elements with the same hash codes" in { val ct = new Ctrie[DumbHash, Int] for (i <- 0 until initsz) ct.update(new DumbHash(i), i) - for (i <- 0 until initsz) assert(ct.remove(new DumbHash(i)) == Some(i)) + for (i <- 0 until initsz) { + val remelem = ct.remove(new DumbHash(i)) + assert(remelem == Some(i), "removing " + i + " yields " + remelem) + } for (i <- 0 until initsz) assert(ct.get(new DumbHash(i)) == None) } -- cgit v1.2.3 From a015c08fda8b8556345a802d60557a3ecd627ccc Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Fri, 3 Feb 2012 16:58:54 +0100 Subject: Add tests for parallel Ctrie. Changed parameters in some tests to speed them up. --- .../scala/collection/parallel/Combiner.scala | 1 - test/files/jvm/serialization.check | 4 + test/files/jvm/serialization.scala | 5 ++ test/files/run/ctries/iterator.scala | 14 ++-- test/files/scalacheck/avl.scala | 18 ++-- .../parallel-collections/ParallelCtrieCheck.scala | 98 ++++++++++++++++++++++ .../files/scalacheck/parallel-collections/pc.scala | 3 + 7 files changed, 126 insertions(+), 17 deletions(-) create mode 100644 test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala (limited to 'test/files/jvm') diff --git a/src/library/scala/collection/parallel/Combiner.scala b/src/library/scala/collection/parallel/Combiner.scala index a2cab7eb5d..e304be92ae 100644 --- a/src/library/scala/collection/parallel/Combiner.scala +++ b/src/library/scala/collection/parallel/Combiner.scala @@ -34,7 +34,6 @@ import scala.collection.generic.Sizing */ trait Combiner[-Elem, +To] extends Builder[Elem, To] with Sizing with Parallel { //self: EnvironmentPassingCombiner[Elem, To] => - private[collection] final val tasksupport = getTaskSupport /** Combines the contents of the receiver builder and the `other` builder, * producing a new builder containing both their elements. diff --git a/test/files/jvm/serialization.check b/test/files/jvm/serialization.check index cdfc100e0d..67b77639a2 100644 --- a/test/files/jvm/serialization.check +++ b/test/files/jvm/serialization.check @@ -287,6 +287,10 @@ x = ParHashMap(1 -> 2, 2 -> 4) y = ParHashMap(1 -> 2, 2 -> 4) x equals y: true, y equals x: true +x = ParCtrie(1 -> 2, 2 -> 4) +y = ParCtrie(1 -> 2, 2 -> 4) +x equals y: true, y equals x: true + x = ParHashSet(1, 2, 3) y = ParHashSet(1, 2, 3) x equals y: true, y equals x: true diff --git a/test/files/jvm/serialization.scala b/test/files/jvm/serialization.scala index 4e1ff368ab..75daa8903d 100644 --- a/test/files/jvm/serialization.scala +++ b/test/files/jvm/serialization.scala @@ -613,6 +613,11 @@ object Test9_parallel { val _mpm: mutable.ParHashMap[Int, Int] = read(write(mpm)) check(mpm, _mpm) + // mutable.ParCtrie + val mpc = mutable.ParCtrie(1 -> 2, 2 -> 4) + val _mpc: mutable.ParCtrie[Int, Int] = read(write(mpc)) + check(mpc, _mpc) + // mutable.ParHashSet val mps = mutable.ParHashSet(1, 2, 3) val _mps: mutable.ParHashSet[Int] = read(write(mps)) diff --git a/test/files/run/ctries/iterator.scala b/test/files/run/ctries/iterator.scala index 1cef4f66ea..4bbf9009f0 100644 --- a/test/files/run/ctries/iterator.scala +++ b/test/files/run/ctries/iterator.scala @@ -141,8 +141,8 @@ object IteratorSpec extends Spec { "be consistent when taken with concurrent modifications" in { val sz = 25000 - val W = 25 - val S = 10 + val W = 15 + val S = 5 val checks = 5 val ct = new Ctrie[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) @@ -182,8 +182,8 @@ object IteratorSpec extends Spec { "be consistent with a concurrent removal with a well defined order" in { val sz = 150000 - val sgroupsize = 40 - val sgroupnum = 20 + val sgroupsize = 10 + val sgroupnum = 5 val removerslowdown = 50 val ct = new Ctrie[Wrap, Int] for (i <- 0 until sz) ct.put(new Wrap(i), i) @@ -201,7 +201,7 @@ object IteratorSpec extends Spec { def consistentIteration(it: Iterator[(Wrap, Int)]) = { class Iter extends Thread { override def run() { - val elems = it.toSeq + val elems = it.toBuffer if (elems.nonEmpty) { val minelem = elems.minBy((x: (Wrap, Int)) => x._1.i)._1.i assert(elems.forall(_._1.i >= minelem)) @@ -224,8 +224,8 @@ object IteratorSpec extends Spec { "be consistent with a concurrent insertion with a well defined order" in { val sz = 150000 - val sgroupsize = 30 - val sgroupnum = 30 + val sgroupsize = 10 + val sgroupnum = 10 val inserterslowdown = 50 val ct = new Ctrie[Wrap, Int] diff --git a/test/files/scalacheck/avl.scala b/test/files/scalacheck/avl.scala index 51fb1fe8c3..af79ad49e3 100644 --- a/test/files/scalacheck/avl.scala +++ b/test/files/scalacheck/avl.scala @@ -47,21 +47,21 @@ package scala.collection.mutable { } } - def genInput: Gen[(Int, List[AVLTree[Int]])] = for { - size <- Gen.choose(20, 25) - elements <- Gen.listOfN(size, Gen.choose(0, 1000)) - selected <- Gen.choose(0, 1000) + def genInput: org.scalacheck.Gen[(Int, List[AVLTree[Int]])] = for { + size <- org.scalacheck.Gen.choose(20, 25) + elements <- org.scalacheck.Gen.listOfN(size, org.scalacheck.Gen.choose(0, 1000)) + selected <- org.scalacheck.Gen.choose(0, 1000) } yield { // selected mustn't be in elements already val list = makeAllBalancedTree(elements.sorted.distinct.map(_*2)) (selected*2+1, list) } - def genInputDelete: Gen[(Int, List[AVLTree[Int]])] = for { - size <- Gen.choose(20, 25) - elements <- Gen.listOfN(size, Gen.choose(0, 1000)) + def genInputDelete: org.scalacheck.Gen[(Int, List[AVLTree[Int]])] = for { + size <- org.scalacheck.Gen.choose(20, 25) + elements <- org.scalacheck.Gen.listOfN(size, org.scalacheck.Gen.choose(0, 1000)) e = elements.sorted.distinct - selected <- Gen.choose(0, e.size-1) + selected <- org.scalacheck.Gen.choose(0, e.size-1) } yield { // selected must be in elements already val list = makeAllBalancedTree(e) @@ -111,4 +111,4 @@ package scala.collection.mutable { object Test extends Properties("AVL") { include(scala.collection.mutable.TestInsert) include(scala.collection.mutable.TestRemove) -} \ No newline at end of file +} diff --git a/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala b/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala new file mode 100644 index 0000000000..d1924f0ada --- /dev/null +++ b/test/files/scalacheck/parallel-collections/ParallelCtrieCheck.scala @@ -0,0 +1,98 @@ +package scala.collection.parallel +package mutable + + + +import org.scalacheck._ +import org.scalacheck.Gen +import org.scalacheck.Gen._ +import org.scalacheck.Prop._ +import org.scalacheck.Properties +import org.scalacheck.Arbitrary._ + +import scala.collection._ +import scala.collection.parallel.ops._ + + + +abstract class ParallelCtrieCheck[K, V](tp: String) extends ParallelMapCheck[K, V]("mutable.ParCtrie[" + tp + "]") { + // ForkJoinTasks.defaultForkJoinPool.setMaximumPoolSize(Runtime.getRuntime.availableProcessors * 2) + // ForkJoinTasks.defaultForkJoinPool.setParallelism(Runtime.getRuntime.availableProcessors * 2) + + type CollType = ParCtrie[K, V] + + def isCheckingViews = false + + def hasStrictOrder = false + + def ofSize(vals: Seq[Gen[(K, V)]], sz: Int) = { + val ct = new mutable.Ctrie[K, V] + val gen = vals(rnd.nextInt(vals.size)) + for (i <- 0 until sz) ct += sample(gen) + ct + } + + def fromTraversable(t: Traversable[(K, V)]) = { + val pct = new ParCtrie[K, V] + var i = 0 + for (kv <- t.toList) { + pct += kv + i += 1 + } + pct + } + +} + + +object IntIntParallelCtrieCheck extends ParallelCtrieCheck[Int, Int]("Int, Int") +with PairOperators[Int, Int] +with PairValues[Int, Int] +{ + def intvalues = new IntValues {} + def kvalues = intvalues.values + def vvalues = intvalues.values + + val intoperators = new IntOperators {} + def voperators = intoperators + def koperators = intoperators + + override def printDataStructureDebugInfo(ds: AnyRef) = ds match { + case pm: ParCtrie[k, v] => + println("Mutable parallel ctrie") + case _ => + println("could not match data structure type: " + ds.getClass) + } + + override def checkDataStructureInvariants(orig: Traversable[(Int, Int)], ds: AnyRef) = ds match { + // case pm: ParHashMap[k, v] if 1 == 0 => // disabled this to make tests faster + // val invs = pm.brokenInvariants + + // val containsall = (for ((k, v) <- orig) yield { + // if (pm.asInstanceOf[ParHashMap[Int, Int]].get(k) == Some(v)) true + // else { + // println("Does not contain original element: " + (k, v)) + // false + // } + // }).foldLeft(true)(_ && _) + + + // if (invs.isEmpty) containsall + // else { + // println("Invariants broken:\n" + invs.mkString("\n")) + // false + // } + case _ => true + } + +} + + + + + + + + + + diff --git a/test/files/scalacheck/parallel-collections/pc.scala b/test/files/scalacheck/parallel-collections/pc.scala index cc0382303a..8a0dba3c25 100644 --- a/test/files/scalacheck/parallel-collections/pc.scala +++ b/test/files/scalacheck/parallel-collections/pc.scala @@ -25,6 +25,9 @@ class ParCollProperties extends Properties("Parallel collections") { // parallel mutable hash maps (tables) include(mutable.IntIntParallelHashMapCheck) + // parallel ctrie + include(mutable.IntIntParallelCtrieCheck) + // parallel mutable hash sets (tables) include(mutable.IntParallelHashSetCheck) -- cgit v1.2.3 From f0f5ad3c81431eba27d590f80872306f60d01505 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Wed, 15 Feb 2012 20:50:25 +0100 Subject: Apply the fix for si-5293 to hash maps. This fix was previously only applied to hash sets. --- .../scala/collection/mutable/HashTable.scala | 32 ++++++-- .../collection/parallel/mutable/ParHashMap.scala | 10 ++- test/files/jvm/serialization.check | 8 +- test/files/run/t5293-map.scala | 88 ++++++++++++++++++++++ 4 files changed, 122 insertions(+), 16 deletions(-) create mode 100644 test/files/run/t5293-map.scala (limited to 'test/files/jvm') diff --git a/src/library/scala/collection/mutable/HashTable.scala b/src/library/scala/collection/mutable/HashTable.scala index cdf1b78f29..5b3e07b826 100644 --- a/src/library/scala/collection/mutable/HashTable.scala +++ b/src/library/scala/collection/mutable/HashTable.scala @@ -52,6 +52,10 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU */ @transient protected var sizemap: Array[Int] = null + @transient var seedvalue: Int = tableSizeSeed + + protected def tableSizeSeed = Integer.bitCount(table.length - 1) + protected def initialSize: Int = HashTable.initialSize private def lastPopulatedIndex = { @@ -70,14 +74,16 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU private[collection] def init[B](in: java.io.ObjectInputStream, f: (A, B) => Entry) { in.defaultReadObject - _loadFactor = in.readInt + _loadFactor = in.readInt() assert(_loadFactor > 0) - val size = in.readInt + val size = in.readInt() tableSize = 0 assert(size >= 0) - - val smDefined = in.readBoolean + + seedvalue = in.readInt() + + val smDefined = in.readBoolean() table = new Array(capacity(sizeForThreshold(_loadFactor, size))) threshold = newThreshold(_loadFactor, table.size) @@ -86,7 +92,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU var index = 0 while (index < size) { - addEntry(f(in.readObject.asInstanceOf[A], in.readObject.asInstanceOf[B])) + addEntry(f(in.readObject().asInstanceOf[A], in.readObject().asInstanceOf[B])) index += 1 } } @@ -103,6 +109,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU out.defaultWriteObject out.writeInt(_loadFactor) out.writeInt(tableSize) + out.writeInt(seedvalue) out.writeBoolean(isSizeMapDefined) foreachEntry { entry => out.writeObject(entry.key) @@ -314,7 +321,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU // this is of crucial importance when populating the table in parallel protected final def index(hcode: Int) = { val ones = table.length - 1 - val improved = improve(hcode) + val improved = improve(hcode, seedvalue) val shifted = (improved >> (32 - java.lang.Integer.bitCount(ones))) & ones shifted } @@ -325,6 +332,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU table = c.table tableSize = c.tableSize threshold = c.threshold + seedvalue = c.seedvalue sizemap = c.sizemap } if (alwaysInitSizeMap && sizemap == null) sizeMapInitAndRebuild @@ -335,6 +343,7 @@ trait HashTable[A, Entry >: Null <: HashEntry[A, Entry]] extends HashTable.HashU table, tableSize, threshold, + seedvalue, sizemap ) } @@ -368,7 +377,7 @@ private[collection] object HashTable { protected def elemHashCode(key: KeyType) = key.## - protected final def improve(hcode: Int) = { + protected final def improve(hcode: Int, seed: Int) = { /* Murmur hash * m = 0x5bd1e995 * r = 24 @@ -396,7 +405,7 @@ private[collection] object HashTable { * */ var i = hcode * 0x9e3775cd i = java.lang.Integer.reverseBytes(i) - i * 0x9e3775cd + i = i * 0x9e3775cd // a slower alternative for byte reversal: // i = (i << 16) | (i >> 16) // i = ((i >> 8) & 0x00ff00ff) | ((i << 8) & 0xff00ff00) @@ -420,6 +429,11 @@ private[collection] object HashTable { // h = h ^ (h >>> 14) // h = h + (h << 4) // h ^ (h >>> 10) + + // the rest of the computation is due to SI-5293 + val rotation = seed % 32 + val rotated = (i >>> rotation) | (i << (32 - rotation)) + rotated } } @@ -442,6 +456,7 @@ private[collection] object HashTable { val table: Array[HashEntry[A, Entry]], val tableSize: Int, val threshold: Int, + val seedvalue: Int, val sizemap: Array[Int] ) { import collection.DebugUtils._ @@ -452,6 +467,7 @@ private[collection] object HashTable { append("Table: [" + arrayString(table, 0, table.length) + "]") append("Table size: " + tableSize) append("Load factor: " + loadFactor) + append("Seedvalue: " + seedvalue) append("Threshold: " + threshold) append("Sizemap: [" + arrayString(sizemap, 0, sizemap.length) + "]") } diff --git a/src/library/scala/collection/parallel/mutable/ParHashMap.scala b/src/library/scala/collection/parallel/mutable/ParHashMap.scala index 15ffd3fdd2..21a5b05749 100644 --- a/src/library/scala/collection/parallel/mutable/ParHashMap.scala +++ b/src/library/scala/collection/parallel/mutable/ParHashMap.scala @@ -160,10 +160,11 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr import collection.parallel.tasksupport._ private var mask = ParHashMapCombiner.discriminantmask private var nonmasklen = ParHashMapCombiner.nonmasklength + private var seedvalue = 27 def +=(elem: (K, V)) = { sz += 1 - val hc = improve(elemHashCode(elem._1)) + val hc = improve(elemHashCode(elem._1), seedvalue) val pos = (hc >>> nonmasklen) if (buckets(pos) eq null) { // initialize bucket @@ -176,7 +177,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr def result: ParHashMap[K, V] = if (size >= (ParHashMapCombiner.numblocks * sizeMapBucketSize)) { // 1024 // construct table - val table = new AddingHashTable(size, tableLoadFactor) + val table = new AddingHashTable(size, tableLoadFactor, seedvalue) val bucks = buckets.map(b => if (b ne null) b.headPtr else null) val insertcount = executeAndWaitResult(new FillBlocks(bucks, table, 0, bucks.length)) table.setSize(insertcount) @@ -210,11 +211,12 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr * and true if the key was successfully inserted. It does not update the number of elements * in the table. */ - private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int) extends HashTable[K, DefaultEntry[K, V]] { + private[ParHashMapCombiner] class AddingHashTable(numelems: Int, lf: Int, _seedvalue: Int) extends HashTable[K, DefaultEntry[K, V]] { import HashTable._ _loadFactor = lf table = new Array[HashEntry[K, DefaultEntry[K, V]]](capacity(sizeForThreshold(_loadFactor, numelems))) tableSize = 0 + seedvalue = _seedvalue threshold = newThreshold(_loadFactor, table.length) sizeMapInit(table.length) def setSize(sz: Int) = tableSize = sz @@ -285,7 +287,7 @@ extends collection.parallel.BucketCombiner[(K, V), ParHashMap[K, V], DefaultEntr insertcount } private def assertCorrectBlock(block: Int, k: K) { - val hc = improve(elemHashCode(k)) + val hc = improve(elemHashCode(k), seedvalue) if ((hc >>> nonmasklen) != block) { println(hc + " goes to " + (hc >>> nonmasklen) + ", while expected block is " + block) assert((hc >>> nonmasklen) == block) diff --git a/test/files/jvm/serialization.check b/test/files/jvm/serialization.check index 67b77639a2..81b68f0f5d 100644 --- a/test/files/jvm/serialization.check +++ b/test/files/jvm/serialization.check @@ -156,8 +156,8 @@ x = BitSet(0, 8, 9) y = BitSet(0, 8, 9) x equals y: true, y equals x: true -x = Map(C -> 3, B -> 2, A -> 1) -y = Map(C -> 3, A -> 1, B -> 2) +x = Map(A -> 1, C -> 3, B -> 2) +y = Map(A -> 1, C -> 3, B -> 2) x equals y: true, y equals x: true x = Set(buffers, title, layers) @@ -283,8 +283,8 @@ x = ParArray(abc, def, etc) y = ParArray(abc, def, etc) x equals y: true, y equals x: true -x = ParHashMap(1 -> 2, 2 -> 4) -y = ParHashMap(1 -> 2, 2 -> 4) +x = ParHashMap(2 -> 4, 1 -> 2) +y = ParHashMap(2 -> 4, 1 -> 2) x equals y: true, y equals x: true x = ParCtrie(1 -> 2, 2 -> 4) diff --git a/test/files/run/t5293-map.scala b/test/files/run/t5293-map.scala new file mode 100644 index 0000000000..9e186894fc --- /dev/null +++ b/test/files/run/t5293-map.scala @@ -0,0 +1,88 @@ + + + +import scala.collection.JavaConverters._ + + + +object Test extends App { + + def bench(label: String)(body: => Unit): Long = { + val start = System.nanoTime + + 0.until(10).foreach(_ => body) + + val end = System.nanoTime + + //println("%s: %s ms".format(label, (end - start) / 1000.0 / 1000.0)) + + end - start + } + + def benchJava(values: java.util.Map[Int, Int]) = { + bench("Java Map") { + val m = new java.util.HashMap[Int, Int] + + m.putAll(values) + } + } + + def benchScala(values: Iterable[(Int, Int)]) = { + bench("Scala Map") { + val m = new scala.collection.mutable.HashMap[Int, Int] + + m ++= values + } + } + + def benchScalaSorted(values: Iterable[(Int, Int)]) = { + bench("Scala Map sorted") { + val m = new scala.collection.mutable.HashMap[Int, Int] + + m ++= values.toArray.sorted + } + } + + def benchScalaPar(values: Iterable[(Int, Int)]) = { + bench("Scala ParMap") { + val m = new scala.collection.parallel.mutable.ParHashMap[Int, Int] map { x => x } + + m ++= values + } + } + + val total = 50000 + val values = (0 until total) zip (0 until total) + val map = scala.collection.mutable.HashMap.empty[Int, Int] + + map ++= values + + // warmup + for (x <- 0 until 5) { + benchJava(map.asJava) + benchScala(map) + benchScalaPar(map) + benchJava(map.asJava) + benchScala(map) + benchScalaPar(map) + } + + val javamap = benchJava(map.asJava) + val scalamap = benchScala(map) + val scalaparmap = benchScalaPar(map) + + // println(javamap) + // println(scalamap) + // println(scalaparmap) + + assert(scalamap < (javamap * 4)) + assert(scalaparmap < (javamap * 4)) +} + + + + + + + + -- cgit v1.2.3 From ab84c8d9a97b41728e77f7808eda2748d052ca06 Mon Sep 17 00:00:00 2001 From: Aleksandar Prokopec Date: Thu, 16 Feb 2012 13:47:59 +0100 Subject: Disable execution context and futures implementation in the default package. Fixed some tests so that they work now. The Transactions.scala test was failing when defined in scala.concurrent package, reporting that type `_$1` is defined twice. Until we figure out the reason for this, the package name in that test is renamed. --- .../scala/concurrent/akka/AbstractPromise.java | 21 -- .../concurrent/akka/ExecutionContextImpl.scala | 134 ------- src/library/scala/concurrent/akka/Future.scala | 77 ---- src/library/scala/concurrent/akka/Promise.scala | 250 ------------- src/library/scala/concurrent/akka/package.scala | 39 -- .../scala/concurrent/default/SchedulerImpl.scala | 44 --- .../default/SchedulerImpl.scala.disabled | 44 +++ .../scala/concurrent/default/TaskImpl.scala | 313 ----------------- .../concurrent/default/TaskImpl.scala.disabled | 313 +++++++++++++++++ .../scala/concurrent/impl/AbstractPromise.java | 21 ++ .../concurrent/impl/ExecutionContextImpl.scala | 134 +++++++ src/library/scala/concurrent/impl/Future.scala | 77 ++++ src/library/scala/concurrent/impl/Promise.scala | 251 +++++++++++++ src/library/scala/concurrent/impl/package.scala | 39 ++ src/library/scala/concurrent/package.scala | 6 +- test/disabled/jvm/scala-concurrent-tck-akka.scala | 391 +++++++++++++++++++++ test/files/jvm/concurrent-future.scala | 14 +- test/files/jvm/scala-concurrent-tck-akka.scala | 391 --------------------- test/files/jvm/scala-concurrent-tck.scala | 23 +- test/files/pos/Transactions.scala | 2 +- 20 files changed, 1293 insertions(+), 1291 deletions(-) delete mode 100644 src/library/scala/concurrent/akka/AbstractPromise.java delete mode 100644 src/library/scala/concurrent/akka/ExecutionContextImpl.scala delete mode 100644 src/library/scala/concurrent/akka/Future.scala delete mode 100644 src/library/scala/concurrent/akka/Promise.scala delete mode 100644 src/library/scala/concurrent/akka/package.scala delete mode 100644 src/library/scala/concurrent/default/SchedulerImpl.scala create mode 100644 src/library/scala/concurrent/default/SchedulerImpl.scala.disabled delete mode 100644 src/library/scala/concurrent/default/TaskImpl.scala create mode 100644 src/library/scala/concurrent/default/TaskImpl.scala.disabled create mode 100644 src/library/scala/concurrent/impl/AbstractPromise.java create mode 100644 src/library/scala/concurrent/impl/ExecutionContextImpl.scala create mode 100644 src/library/scala/concurrent/impl/Future.scala create mode 100644 src/library/scala/concurrent/impl/Promise.scala create mode 100644 src/library/scala/concurrent/impl/package.scala create mode 100644 test/disabled/jvm/scala-concurrent-tck-akka.scala delete mode 100644 test/files/jvm/scala-concurrent-tck-akka.scala (limited to 'test/files/jvm') diff --git a/src/library/scala/concurrent/akka/AbstractPromise.java b/src/library/scala/concurrent/akka/AbstractPromise.java deleted file mode 100644 index 38c74edf2f..0000000000 --- a/src/library/scala/concurrent/akka/AbstractPromise.java +++ /dev/null @@ -1,21 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.akka; - - - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; - - - -abstract class AbstractPromise { - private volatile Object _ref = null; - protected final static AtomicReferenceFieldUpdater updater = - AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); -} diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala deleted file mode 100644 index 2bc846ba3c..0000000000 --- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala +++ /dev/null @@ -1,134 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.akka - - - -import java.util.concurrent.{Callable, ExecutorService} -import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} -import scala.util.{ Duration, Try, Success, Failure } -import scala.collection.mutable.Stack - - - -class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { - import ExecutionContextImpl._ - - def execute(runnable: Runnable): Unit = executorService match { - // case fj: ForkJoinPool => - // TODO fork if more applicable - // executorService execute runnable - case _ => - executorService execute runnable - } - - def execute[U](body: () => U): Unit = execute(new Runnable { - def run() = body() - }) - - def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) - - def future[T](body: =>T): Future[T] = { - val p = promise[T] - - dispatchFuture { - () => - p complete { - try { - Success(body) - } catch { - case e => resolver(e) - } - } - } - - p.future - } - - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x => x.blockingCall(awaitable) // inside an execution context thread - } - } - - def reportFailure(t: Throwable) = t match { - case e: Error => throw e // rethrow serious errors - case t => t.printStackTrace() - } - - /** Only callable from the tasks running on the same execution context. */ - private def blockingCall[T](body: Awaitable[T]): T = { - releaseStack() - - // TODO see what to do with timeout - body.await(Duration.fromNanos(0))(CanAwaitEvidence) - } - - // an optimization for batching futures - // TODO we should replace this with a public queue, - // so that it can be stolen from - // OR: a push to the local task queue should be so cheap that this is - // not even needed, but stealing is still possible - private val _taskStack = new ThreadLocal[Stack[() => Unit]]() - - private def releaseStack(): Unit = - _taskStack.get match { - case stack if (stack ne null) && stack.nonEmpty => - val tasks = stack.elems - stack.clear() - _taskStack.remove() - dispatchFuture(() => _taskStack.get.elems = tasks, true) - case null => - // do nothing - there is no local batching stack anymore - case _ => - _taskStack.remove() - } - - private[akka] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = - _taskStack.get match { - case stack if (stack ne null) && !force => stack push task - case _ => this.execute( - new Runnable { - def run() { - try { - val taskStack = Stack[() => Unit](task) - _taskStack set taskStack - while (taskStack.nonEmpty) { - val next = taskStack.pop() - try { - next.apply() - } catch { - case e => - // TODO catching all and continue isn't good for OOME - reportFailure(e) - } - } - } finally { - _taskStack.remove() - } - } - } - ) - } - -} - - -object ExecutionContextImpl { - - private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] { - override protected def initialValue = null - } - -} - - diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala deleted file mode 100644 index 2633e751bd..0000000000 --- a/src/library/scala/concurrent/akka/Future.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.akka - - - -import scala.concurrent.{Awaitable, ExecutionContext} -import scala.util.{ Try, Success, Failure } -//import scala.util.continuations._ - - - -trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - - implicit def executor: ExecutionContextImpl - - /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. - * - * Returns the result of this Future without blocking, by suspending execution and storing it as a - * continuation until the result is available. - */ - //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) - - /** Tests whether this Future has been completed. - */ - final def isCompleted: Boolean = value.isDefined - - /** The contained value of this Future. Before this Future is completed - * the value will be None. After completion the value will be Some(Right(t)) - * if it contains a valid result, or Some(Left(error)) if it contains - * an exception. - */ - def value: Option[Try[T]] - - def onComplete[U](func: Try[T] => U): this.type - - /** Creates a new Future[A] which is completed with this Future's result if - * that conforms to A's erased type or a ClassCastException otherwise. - */ - final def mapTo[T](implicit m: Manifest[T]) = { - val p = executor.promise[T] - - onComplete { - case f @ Failure(t) => p complete f.asInstanceOf[Try[T]] - case Success(v) => - p complete (try { - Success(boxedType(m.erasure).cast(v).asInstanceOf[T]) - } catch { - case e: ClassCastException ⇒ Failure(e) - }) - } - - p.future - } - - /** Used by for-comprehensions. - */ - final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) - - final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) { - def foreach(f: A => Unit): Unit = self filter p foreach f - def map[B](f: A => B) = self filter p map f - def flatMap[B](f: A => Future[B]) = self filter p flatMap f - def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) - } - -} - - - - diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala deleted file mode 100644 index 8ecffec2aa..0000000000 --- a/src/library/scala/concurrent/akka/Promise.scala +++ /dev/null @@ -1,250 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent.akka - - - -import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater -import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} -//import scala.util.continuations._ -import scala.util.Duration -import scala.util.Try -import scala.util -import scala.annotation.tailrec -//import scala.concurrent.NonDeterministic - - -trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { - - def future = this - - def newPromise[S]: Promise[S] = executor promise - - // TODO refine answer and return types here from Any to type parameters - // then move this up in the hierarchy - /* - final def <<(value: T): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] => Future[Any]) => - cont(complete(Right(value))) - } - - final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { - cont: (Future[T] => Future[Any]) => - val p = executor.promise[Any] - val thisPromise = this - - thisPromise completeWith other - thisPromise onComplete { v => - try { - p completeWith cont(thisPromise) - } catch { - case e => p complete resolver(e) - } - } - - p.future - } - */ - // TODO finish this once we introduce something like dataflow streams - - /* - final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => - val fr = executor.promise[Any] - val f = stream.dequeue(this) - f.onComplete { _ => - try { - fr completeWith cont(f) - } catch { - case e => - fr failure e - } - } - fr - } - */ - -} - - -object Promise { - - def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] - - /** Represents the internal state. - */ - sealed trait FState[+T] { def value: Option[Try[T]] } - - case class Pending[T](listeners: List[Try[T] => Any] = Nil) extends FState[T] { - def value: Option[Try[T]] = None - } - - case class Success[T](value: Option[util.Success[T]] = None) extends FState[T] { - def result: T = value.get.get - } - - case class Failure[T](value: Option[util.Failure[T]] = None) extends FState[T] { - def exception: Throwable = value.get.exception - } - - private val emptyPendingValue = Pending[Nothing](Nil) - - /** Default promise implementation. - */ - class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { - self => - - updater.set(this, Promise.EmptyPending()) - - protected final def tryAwait(atMost: Duration): Boolean = { - @tailrec - def awaitUnsafe(waitTimeNanos: Long): Boolean = { - if (value.isEmpty && waitTimeNanos > 0) { - val ms = NANOSECONDS.toMillis(waitTimeNanos) - val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec - val start = System.nanoTime() - try { - synchronized { - while (value.isEmpty) wait(ms, ns) - } - } catch { - case e: InterruptedException => - } - - awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) - } else - value.isDefined - } - - executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) - } - - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = - if (value.isDefined || tryAwait(atMost)) this - else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") - - def await(atMost: Duration)(implicit permit: CanAwait): T = - ready(atMost).value.get match { - case util.Failure(e) => throw e - case util.Success(r) => r - } - - def value: Option[Try[T]] = getState.value - - @inline - private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] - - @inline - protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) - - @inline - protected final def getState: FState[T] = updater.get(this) - - def tryComplete(value: Try[T]): Boolean = { - val callbacks: List[Try[T] => Any] = { - try { - @tailrec - def tryComplete(v: Try[T]): List[Try[T] => Any] = { - getState match { - case cur @ Pending(listeners) => - if (updateState(cur, if (v.isFailure) Failure(Some(v.asInstanceOf[util.Failure[T]])) else Success(Some(v.asInstanceOf[util.Success[T]])))) listeners - else tryComplete(v) - case _ => null - } - } - tryComplete(resolve(value)) - } finally { - synchronized { notifyAll() } // notify any blockers from `tryAwait` - } - } - - callbacks match { - case null => false - case cs if cs.isEmpty => true - case cs => - executor dispatchFuture { - () => cs.foreach(f => notifyCompleted(f, value)) - } - true - } - } - - def onComplete[U](func: Try[T] => U): this.type = { - @tailrec // Returns whether the future has already been completed or not - def tryAddCallback(): Boolean = { - val cur = getState - cur match { - case _: Success[_] | _: Failure[_] => true - case p: Pending[_] => - val pt = p.asInstanceOf[Pending[T]] - if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() - } - } - - if (tryAddCallback()) { - val result = value.get - executor dispatchFuture { - () => notifyCompleted(func, result) - } - } - - this - } - - private final def notifyCompleted(func: Try[T] => Any, result: Try[T]) { - try { - func(result) - } catch { - case e => executor.reportFailure(e) - } - } - } - - /** An already completed Future is given its result at creation. - * - * Useful in Future-composition when a value to contribute is already available. - */ - final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { - val value = Some(resolve(suppliedValue)) - - def tryComplete(value: Try[T]): Boolean = false - - def onComplete[U](func: Try[T] => U): this.type = { - val completedAs = value.get - executor dispatchFuture { - () => func(completedAs) - } - this - } - - private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this - - def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { - case util.Failure(e) => throw e - case util.Success(r) => r - } - } - -} - - - - - - - - - - - - - - - - diff --git a/src/library/scala/concurrent/akka/package.scala b/src/library/scala/concurrent/akka/package.scala deleted file mode 100644 index 8c059b8e71..0000000000 --- a/src/library/scala/concurrent/akka/package.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent - - - -import java.{lang => jl} -import scala.util.Duration - - - -package object akka { - - private val toBoxed = Map[Class[_], Class[_]]( - classOf[Boolean] -> classOf[jl.Boolean], - classOf[Byte] -> classOf[jl.Byte], - classOf[Char] -> classOf[jl.Character], - classOf[Short] -> classOf[jl.Short], - classOf[Int] -> classOf[jl.Integer], - classOf[Long] -> classOf[jl.Long], - classOf[Float] -> classOf[jl.Float], - classOf[Double] -> classOf[jl.Double], - classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) - - def boxedType(c: Class[_]): Class[_] = { - if (c.isPrimitive) toBoxed(c) else c - } - - def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue - -} - - diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala b/src/library/scala/concurrent/default/SchedulerImpl.scala deleted file mode 100644 index 745d2d1a15..0000000000 --- a/src/library/scala/concurrent/default/SchedulerImpl.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent -package default - -import scala.util.Duration - -private[concurrent] final class SchedulerImpl extends Scheduler { - private val timer = - new java.util.Timer(true) // the associated thread runs as a daemon - - def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ??? - - def scheduleOnce(delay: Duration, task: Runnable): Cancellable = { - val timerTask = new java.util.TimerTask { - def run(): Unit = - task.run() - } - timer.schedule(timerTask, delay.toMillis) - new Cancellable { - def cancel(): Unit = - timerTask.cancel() - } - } - - def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = { - val timerTask = new java.util.TimerTask { - def run(): Unit = - task - } - timer.schedule(timerTask, delay.toMillis) - new Cancellable { - def cancel(): Unit = - timerTask.cancel() - } - } - -} diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled new file mode 100644 index 0000000000..745d2d1a15 --- /dev/null +++ b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled @@ -0,0 +1,44 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent +package default + +import scala.util.Duration + +private[concurrent] final class SchedulerImpl extends Scheduler { + private val timer = + new java.util.Timer(true) // the associated thread runs as a daemon + + def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ??? + + def scheduleOnce(delay: Duration, task: Runnable): Cancellable = { + val timerTask = new java.util.TimerTask { + def run(): Unit = + task.run() + } + timer.schedule(timerTask, delay.toMillis) + new Cancellable { + def cancel(): Unit = + timerTask.cancel() + } + } + + def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = { + val timerTask = new java.util.TimerTask { + def run(): Unit = + task + } + timer.schedule(timerTask, delay.toMillis) + new Cancellable { + def cancel(): Unit = + timerTask.cancel() + } + } + +} diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala deleted file mode 100644 index 94e54cb372..0000000000 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ /dev/null @@ -1,313 +0,0 @@ -package scala.concurrent -package default - - - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater -import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } -import scala.util.Try -import scala.util -import scala.util.Duration -import scala.annotation.tailrec - - - -private[concurrent] trait Completable[T] { -self: Future[T] => - - val executor: ExecutionContextImpl - - def newPromise[S]: Promise[S] = executor promise - - type Callback = Try[T] => Any - - def getState: State[T] - - def casState(oldv: State[T], newv: State[T]): Boolean - - protected def dispatch[U](r: Runnable) = executionContext execute r - - protected def processCallbacks(cbs: List[Callback], r: Try[T]) = - for (cb <- cbs) dispatch(new Runnable { - override def run() = cb(r) - }) - - def future: Future[T] = self - - def onComplete[U](callback: Try[T] => U): this.type = { - @tailrec def tryAddCallback(): Try[T] = { - getState match { - case p @ Pending(lst) => - val pt = p.asInstanceOf[Pending[T]] - if (casState(pt, Pending(callback :: pt.callbacks))) null - else tryAddCallback() - case Success(res) => util.Success(res) - case Failure(t) => util.Failure(t) - } - } - - val res = tryAddCallback() - if (res != null) dispatch(new Runnable { - override def run() = - try callback(res) - catch handledFutureException andThen { - t => Console.err.println(t) - } - }) - - this - } - - def isTimedout: Boolean = getState match { - case Failure(ft: FutureTimeoutException) => true - case _ => false - } - -} - -private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) -extends Promise[T] with Future[T] with Completable[T] { - - val executor: scala.concurrent.default.ExecutionContextImpl = context - - @volatile private var state: State[T] = _ - - val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state") - - updater.set(this, Pending(List())) - - def casState(oldv: State[T], newv: State[T]): Boolean = { - updater.compareAndSet(this, oldv, newv) - } - - def getState: State[T] = { - updater.get(this) - } - - @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { - case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs - case _ => null - } - - def tryComplete(r: Try[T]) = r match { - case util.Failure(t) => tryFailure(t) - case util.Success(v) => trySuccess(v) - } - - override def trySuccess(value: T): Boolean = { - val cbs = tryCompleteState(Success(value)) - if (cbs == null) - false - else { - processCallbacks(cbs, util.Success(value)) - this.synchronized { - this.notifyAll() - } - true - } - } - - override def tryFailure(t: Throwable): Boolean = { - val wrapped = wrap(t) - val cbs = tryCompleteState(Failure(wrapped)) - if (cbs == null) - false - else { - processCallbacks(cbs, util.Failure(wrapped)) - this.synchronized { - this.notifyAll() - } - true - } - } - - def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { - case Success(res) => res - case Failure(t) => throw t - case _ => - this.synchronized { - while (true) - getState match { - case Pending(_) => this.wait() - case Success(res) => return res - case Failure(t) => throw t - } - } - sys.error("unreachable") - } - -} - -private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) -extends RecursiveAction with Task[T] with Future[T] with Completable[T] { - - val executor: ExecutionContextImpl = context - - @volatile private var state: State[T] = _ - - val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state") - - updater.set(this, Pending(List())) - - def casState(oldv: State[T], newv: State[T]): Boolean = { - updater.compareAndSet(this, oldv, newv) - } - - def getState: State[T] = { - updater.get(this) - } - - @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { - case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs - } - - def compute(): Unit = { - var cbs: List[Callback] = null - try { - val res = body - processCallbacks(tryCompleteState(Success(res)), util.Success(res)) - } catch { - case t if isFutureThrowable(t) => - processCallbacks(tryCompleteState(Failure(t)), util.Failure(t)) - case t => - val ee = new ExecutionException(t) - processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee)) - throw t - } - } - - def start(): Unit = { - Thread.currentThread match { - case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork() - case _ => executor.pool.execute(this) - } - } - - // TODO FIXME: handle timeouts - def await(atMost: Duration): this.type = - await - - def await: this.type = { - this.join() - this - } - - def tryCancel(): Unit = - tryUnfork() - - def await(atMost: Duration)(implicit canawait: CanAwait): T = { - join() // TODO handle timeout also - (updater.get(this): @unchecked) match { - case Success(r) => r - case Failure(t) => throw t - } - } - -} - - -private[concurrent] sealed abstract class State[T] - - -case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T] - - -case class Success[T](result: T) extends State[T] - - -case class Failure[T](throwable: Throwable) extends State[T] - - -private[concurrent] final class ExecutionContextImpl extends ExecutionContext { - import ExecutionContextImpl._ - - val pool = { - val p = new ForkJoinPool - p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { - def uncaughtException(t: Thread, throwable: Throwable) { - Console.err.println(throwable.getMessage) - throwable.printStackTrace(Console.err) - } - }) - p - } - - @inline - private def executeTask(task: RecursiveAction) { - if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) - task.fork() - else - pool execute task - } - - def execute(task: Runnable) { - val action = new RecursiveAction { def compute() { task.run() } } - executeTask(action) - } - - def execute[U](body: () => U) { - val action = new RecursiveAction { def compute() { body() } } - executeTask(action) - } - - def task[T](body: => T): Task[T] = { - new TaskImpl(this, body) - } - - def future[T](body: => T): Future[T] = { - val t = task(body) - t.start() - t.future - } - - def promise[T]: Promise[T] = - new PromiseImpl[T](this) - - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor - case x => x.blocking(awaitable, atMost) - } - } - - private def blockingCall[T](b: Awaitable[T]): T = b match { - case fj: TaskImpl[_] if fj.executor.pool eq pool => - fj.await(Duration.fromNanos(0)) - case _ => - var res: T = null.asInstanceOf[T] - @volatile var blockingDone = false - // TODO add exception handling here! - val mb = new ForkJoinPool.ManagedBlocker { - def block() = { - res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) - blockingDone = true - true - } - def isReleasable = blockingDone - } - ForkJoinPool.managedBlock(mb, true) - res - } - - def reportFailure(t: Throwable): Unit = {} - -} - - -object ExecutionContextImpl { - - private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { - override protected def initialValue = null - } - -} - - - - - - - diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled new file mode 100644 index 0000000000..94e54cb372 --- /dev/null +++ b/src/library/scala/concurrent/default/TaskImpl.scala.disabled @@ -0,0 +1,313 @@ +package scala.concurrent +package default + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } +import scala.util.Try +import scala.util +import scala.util.Duration +import scala.annotation.tailrec + + + +private[concurrent] trait Completable[T] { +self: Future[T] => + + val executor: ExecutionContextImpl + + def newPromise[S]: Promise[S] = executor promise + + type Callback = Try[T] => Any + + def getState: State[T] + + def casState(oldv: State[T], newv: State[T]): Boolean + + protected def dispatch[U](r: Runnable) = executionContext execute r + + protected def processCallbacks(cbs: List[Callback], r: Try[T]) = + for (cb <- cbs) dispatch(new Runnable { + override def run() = cb(r) + }) + + def future: Future[T] = self + + def onComplete[U](callback: Try[T] => U): this.type = { + @tailrec def tryAddCallback(): Try[T] = { + getState match { + case p @ Pending(lst) => + val pt = p.asInstanceOf[Pending[T]] + if (casState(pt, Pending(callback :: pt.callbacks))) null + else tryAddCallback() + case Success(res) => util.Success(res) + case Failure(t) => util.Failure(t) + } + } + + val res = tryAddCallback() + if (res != null) dispatch(new Runnable { + override def run() = + try callback(res) + catch handledFutureException andThen { + t => Console.err.println(t) + } + }) + + this + } + + def isTimedout: Boolean = getState match { + case Failure(ft: FutureTimeoutException) => true + case _ => false + } + +} + +private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) +extends Promise[T] with Future[T] with Completable[T] { + + val executor: scala.concurrent.default.ExecutionContextImpl = context + + @volatile private var state: State[T] = _ + + val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state") + + updater.set(this, Pending(List())) + + def casState(oldv: State[T], newv: State[T]): Boolean = { + updater.compareAndSet(this, oldv, newv) + } + + def getState: State[T] = { + updater.get(this) + } + + @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { + case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs + case _ => null + } + + def tryComplete(r: Try[T]) = r match { + case util.Failure(t) => tryFailure(t) + case util.Success(v) => trySuccess(v) + } + + override def trySuccess(value: T): Boolean = { + val cbs = tryCompleteState(Success(value)) + if (cbs == null) + false + else { + processCallbacks(cbs, util.Success(value)) + this.synchronized { + this.notifyAll() + } + true + } + } + + override def tryFailure(t: Throwable): Boolean = { + val wrapped = wrap(t) + val cbs = tryCompleteState(Failure(wrapped)) + if (cbs == null) + false + else { + processCallbacks(cbs, util.Failure(wrapped)) + this.synchronized { + this.notifyAll() + } + true + } + } + + def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { + case Success(res) => res + case Failure(t) => throw t + case _ => + this.synchronized { + while (true) + getState match { + case Pending(_) => this.wait() + case Success(res) => return res + case Failure(t) => throw t + } + } + sys.error("unreachable") + } + +} + +private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) +extends RecursiveAction with Task[T] with Future[T] with Completable[T] { + + val executor: ExecutionContextImpl = context + + @volatile private var state: State[T] = _ + + val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state") + + updater.set(this, Pending(List())) + + def casState(oldv: State[T], newv: State[T]): Boolean = { + updater.compareAndSet(this, oldv, newv) + } + + def getState: State[T] = { + updater.get(this) + } + + @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { + case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs + } + + def compute(): Unit = { + var cbs: List[Callback] = null + try { + val res = body + processCallbacks(tryCompleteState(Success(res)), util.Success(res)) + } catch { + case t if isFutureThrowable(t) => + processCallbacks(tryCompleteState(Failure(t)), util.Failure(t)) + case t => + val ee = new ExecutionException(t) + processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee)) + throw t + } + } + + def start(): Unit = { + Thread.currentThread match { + case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork() + case _ => executor.pool.execute(this) + } + } + + // TODO FIXME: handle timeouts + def await(atMost: Duration): this.type = + await + + def await: this.type = { + this.join() + this + } + + def tryCancel(): Unit = + tryUnfork() + + def await(atMost: Duration)(implicit canawait: CanAwait): T = { + join() // TODO handle timeout also + (updater.get(this): @unchecked) match { + case Success(r) => r + case Failure(t) => throw t + } + } + +} + + +private[concurrent] sealed abstract class State[T] + + +case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T] + + +case class Success[T](result: T) extends State[T] + + +case class Failure[T](throwable: Throwable) extends State[T] + + +private[concurrent] final class ExecutionContextImpl extends ExecutionContext { + import ExecutionContextImpl._ + + val pool = { + val p = new ForkJoinPool + p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { + def uncaughtException(t: Thread, throwable: Throwable) { + Console.err.println(throwable.getMessage) + throwable.printStackTrace(Console.err) + } + }) + p + } + + @inline + private def executeTask(task: RecursiveAction) { + if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) + task.fork() + else + pool execute task + } + + def execute(task: Runnable) { + val action = new RecursiveAction { def compute() { task.run() } } + executeTask(action) + } + + def execute[U](body: () => U) { + val action = new RecursiveAction { def compute() { body() } } + executeTask(action) + } + + def task[T](body: => T): Task[T] = { + new TaskImpl(this, body) + } + + def future[T](body: => T): Future[T] = { + val t = task(body) + t.start() + t.future + } + + def promise[T]: Promise[T] = + new PromiseImpl[T](this) + + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor + case x => x.blocking(awaitable, atMost) + } + } + + private def blockingCall[T](b: Awaitable[T]): T = b match { + case fj: TaskImpl[_] if fj.executor.pool eq pool => + fj.await(Duration.fromNanos(0)) + case _ => + var res: T = null.asInstanceOf[T] + @volatile var blockingDone = false + // TODO add exception handling here! + val mb = new ForkJoinPool.ManagedBlocker { + def block() = { + res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) + blockingDone = true + true + } + def isReleasable = blockingDone + } + ForkJoinPool.managedBlock(mb, true) + res + } + + def reportFailure(t: Throwable): Unit = {} + +} + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { + override protected def initialValue = null + } + +} + + + + + + + diff --git a/src/library/scala/concurrent/impl/AbstractPromise.java b/src/library/scala/concurrent/impl/AbstractPromise.java new file mode 100644 index 0000000000..5280d67854 --- /dev/null +++ b/src/library/scala/concurrent/impl/AbstractPromise.java @@ -0,0 +1,21 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl; + + + +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; + + + +abstract class AbstractPromise { + private volatile Object _ref = null; + protected final static AtomicReferenceFieldUpdater updater = + AtomicReferenceFieldUpdater.newUpdater(AbstractPromise.class, Object.class, "_ref"); +} diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala new file mode 100644 index 0000000000..af0eb66292 --- /dev/null +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -0,0 +1,134 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import java.util.concurrent.{Callable, ExecutorService} +import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable} +import scala.util.{ Duration, Try, Success, Failure } +import scala.collection.mutable.Stack + + + +class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionContext { + import ExecutionContextImpl._ + + def execute(runnable: Runnable): Unit = executorService match { + // case fj: ForkJoinPool => + // TODO fork if more applicable + // executorService execute runnable + case _ => + executorService execute runnable + } + + def execute[U](body: () => U): Unit = execute(new Runnable { + def run() = body() + }) + + def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this) + + def future[T](body: =>T): Future[T] = { + val p = promise[T] + + dispatchFuture { + () => + p complete { + try { + Success(body) + } catch { + case e => resolver(e) + } + } + } + + p.future + } + + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(awaitable) // inside an execution context thread + } + } + + def reportFailure(t: Throwable) = t match { + case e: Error => throw e // rethrow serious errors + case t => t.printStackTrace() + } + + /** Only callable from the tasks running on the same execution context. */ + private def blockingCall[T](body: Awaitable[T]): T = { + releaseStack() + + // TODO see what to do with timeout + body.await(Duration.fromNanos(0))(CanAwaitEvidence) + } + + // an optimization for batching futures + // TODO we should replace this with a public queue, + // so that it can be stolen from + // OR: a push to the local task queue should be so cheap that this is + // not even needed, but stealing is still possible + private val _taskStack = new ThreadLocal[Stack[() => Unit]]() + + private def releaseStack(): Unit = + _taskStack.get match { + case stack if (stack ne null) && stack.nonEmpty => + val tasks = stack.elems + stack.clear() + _taskStack.remove() + dispatchFuture(() => _taskStack.get.elems = tasks, true) + case null => + // do nothing - there is no local batching stack anymore + case _ => + _taskStack.remove() + } + + private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit = + _taskStack.get match { + case stack if (stack ne null) && !force => stack push task + case _ => this.execute( + new Runnable { + def run() { + try { + val taskStack = Stack[() => Unit](task) + _taskStack set taskStack + while (taskStack.nonEmpty) { + val next = taskStack.pop() + try { + next.apply() + } catch { + case e => + // TODO catching all and continue isn't good for OOME + reportFailure(e) + } + } + } finally { + _taskStack.remove() + } + } + } + ) + } + +} + + +object ExecutionContextImpl { + + private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContextImpl] = new ThreadLocal[ExecutionContextImpl] { + override protected def initialValue = null + } + +} + + diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala new file mode 100644 index 0000000000..3664241ec0 --- /dev/null +++ b/src/library/scala/concurrent/impl/Future.scala @@ -0,0 +1,77 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import scala.concurrent.{Awaitable, ExecutionContext} +import scala.util.{ Try, Success, Failure } +//import scala.util.continuations._ + + + +trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { + + implicit def executor: ExecutionContextImpl + + /** For use only within a Future.flow block or another compatible Delimited Continuations reset block. + * + * Returns the result of this Future without blocking, by suspending execution and storing it as a + * continuation until the result is available. + */ + //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) + + /** Tests whether this Future has been completed. + */ + final def isCompleted: Boolean = value.isDefined + + /** The contained value of this Future. Before this Future is completed + * the value will be None. After completion the value will be Some(Right(t)) + * if it contains a valid result, or Some(Left(error)) if it contains + * an exception. + */ + def value: Option[Try[T]] + + def onComplete[U](func: Try[T] => U): this.type + + /** Creates a new Future[A] which is completed with this Future's result if + * that conforms to A's erased type or a ClassCastException otherwise. + */ + final def mapTo[T](implicit m: Manifest[T]) = { + val p = executor.promise[T] + + onComplete { + case f @ Failure(t) => p complete f.asInstanceOf[Try[T]] + case Success(v) => + p complete (try { + Success(boxedType(m.erasure).cast(v).asInstanceOf[T]) + } catch { + case e: ClassCastException ⇒ Failure(e) + }) + } + + p.future + } + + /** Used by for-comprehensions. + */ + final def withFilter(p: T => Boolean) = new FutureWithFilter[T](this, p) + + final class FutureWithFilter[+A](self: Future[A], p: A => Boolean) { + def foreach(f: A => Unit): Unit = self filter p foreach f + def map[B](f: A => B) = self filter p map f + def flatMap[B](f: A => Future[B]) = self filter p flatMap f + def withFilter(q: A => Boolean): FutureWithFilter[A] = new FutureWithFilter[A](self, x ⇒ p(x) && q(x)) + } + +} + + + + diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala new file mode 100644 index 0000000000..3f9970b178 --- /dev/null +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -0,0 +1,251 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent.impl + + + +import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } +import java.util.concurrent.atomic.AtomicReferenceFieldUpdater +import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} +//import scala.util.continuations._ +import scala.util.Duration +import scala.util.Try +import scala.util +import scala.annotation.tailrec +//import scala.concurrent.NonDeterministic + + + +trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { + + def future = this + + def newPromise[S]: Promise[S] = executor promise + + // TODO refine answer and return types here from Any to type parameters + // then move this up in the hierarchy + /* + final def <<(value: T): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + cont(complete(Right(value))) + } + + final def <<(other: Future[T]): Future[T] @cps[Future[Any]] = shift { + cont: (Future[T] => Future[Any]) => + val p = executor.promise[Any] + val thisPromise = this + + thisPromise completeWith other + thisPromise onComplete { v => + try { + p completeWith cont(thisPromise) + } catch { + case e => p complete resolver(e) + } + } + + p.future + } + */ + // TODO finish this once we introduce something like dataflow streams + + /* + final def <<(stream: PromiseStreamOut[T]): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => + val fr = executor.promise[Any] + val f = stream.dequeue(this) + f.onComplete { _ => + try { + fr completeWith cont(f) + } catch { + case e => + fr failure e + } + } + fr + } + */ + +} + + +object Promise { + + def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]] + + /** Represents the internal state. + */ + sealed trait FState[+T] { def value: Option[Try[T]] } + + case class Pending[T](listeners: List[Try[T] => Any] = Nil) extends FState[T] { + def value: Option[Try[T]] = None + } + + case class Success[T](value: Option[util.Success[T]] = None) extends FState[T] { + def result: T = value.get.get + } + + case class Failure[T](value: Option[util.Failure[T]] = None) extends FState[T] { + def exception: Throwable = value.get.exception + } + + private val emptyPendingValue = Pending[Nothing](Nil) + + /** Default promise implementation. + */ + class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] { + self => + + updater.set(this, Promise.EmptyPending()) + + protected final def tryAwait(atMost: Duration): Boolean = { + @tailrec + def awaitUnsafe(waitTimeNanos: Long): Boolean = { + if (value.isEmpty && waitTimeNanos > 0) { + val ms = NANOSECONDS.toMillis(waitTimeNanos) + val ns = (waitTimeNanos % 1000000l).toInt // as per object.wait spec + val start = System.nanoTime() + try { + synchronized { + while (value.isEmpty) wait(ms, ns) + } + } catch { + case e: InterruptedException => + } + + awaitUnsafe(waitTimeNanos - (System.nanoTime() - start)) + } else + value.isDefined + } + + executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0)) + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = + if (value.isDefined || tryAwait(atMost)) this + else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds") + + def await(atMost: Duration)(implicit permit: CanAwait): T = + ready(atMost).value.get match { + case util.Failure(e) => throw e + case util.Success(r) => r + } + + def value: Option[Try[T]] = getState.value + + @inline + private[this] final def updater = AbstractPromise.updater.asInstanceOf[AtomicReferenceFieldUpdater[AbstractPromise, FState[T]]] + + @inline + protected final def updateState(oldState: FState[T], newState: FState[T]): Boolean = updater.compareAndSet(this, oldState, newState) + + @inline + protected final def getState: FState[T] = updater.get(this) + + def tryComplete(value: Try[T]): Boolean = { + val callbacks: List[Try[T] => Any] = { + try { + @tailrec + def tryComplete(v: Try[T]): List[Try[T] => Any] = { + getState match { + case cur @ Pending(listeners) => + if (updateState(cur, if (v.isFailure) Failure(Some(v.asInstanceOf[util.Failure[T]])) else Success(Some(v.asInstanceOf[util.Success[T]])))) listeners + else tryComplete(v) + case _ => null + } + } + tryComplete(resolve(value)) + } finally { + synchronized { notifyAll() } // notify any blockers from `tryAwait` + } + } + + callbacks match { + case null => false + case cs if cs.isEmpty => true + case cs => + executor dispatchFuture { + () => cs.foreach(f => notifyCompleted(f, value)) + } + true + } + } + + def onComplete[U](func: Try[T] => U): this.type = { + @tailrec // Returns whether the future has already been completed or not + def tryAddCallback(): Boolean = { + val cur = getState + cur match { + case _: Success[_] | _: Failure[_] => true + case p: Pending[_] => + val pt = p.asInstanceOf[Pending[T]] + if (updateState(pt, pt.copy(listeners = func :: pt.listeners))) false else tryAddCallback() + } + } + + if (tryAddCallback()) { + val result = value.get + executor dispatchFuture { + () => notifyCompleted(func, result) + } + } + + this + } + + private final def notifyCompleted(func: Try[T] => Any, result: Try[T]) { + try { + func(result) + } catch { + case e => executor.reportFailure(e) + } + } + } + + /** An already completed Future is given its result at creation. + * + * Useful in Future-composition when a value to contribute is already available. + */ + final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] { + val value = Some(resolve(suppliedValue)) + + def tryComplete(value: Try[T]): Boolean = false + + def onComplete[U](func: Try[T] => U): this.type = { + val completedAs = value.get + executor dispatchFuture { + () => func(completedAs) + } + this + } + + private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this + + def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match { + case util.Failure(e) => throw e + case util.Success(r) => r + } + } + +} + + + + + + + + + + + + + + + + diff --git a/src/library/scala/concurrent/impl/package.scala b/src/library/scala/concurrent/impl/package.scala new file mode 100644 index 0000000000..72add73167 --- /dev/null +++ b/src/library/scala/concurrent/impl/package.scala @@ -0,0 +1,39 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +package scala.concurrent + + + +import java.{lang => jl} +import scala.util.Duration + + + +package object impl { + + private val toBoxed = Map[Class[_], Class[_]]( + classOf[Boolean] -> classOf[jl.Boolean], + classOf[Byte] -> classOf[jl.Byte], + classOf[Char] -> classOf[jl.Character], + classOf[Short] -> classOf[jl.Short], + classOf[Int] -> classOf[jl.Integer], + classOf[Long] -> classOf[jl.Long], + classOf[Float] -> classOf[jl.Float], + classOf[Double] -> classOf[jl.Double], + classOf[Unit] -> classOf[scala.runtime.BoxedUnit]) + + def boxedType(c: Class[_]): Class[_] = { + if (c.isPrimitive) toBoxed(c) else c + } + + def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue + +} + + diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 35b8cf6664..0725332c5e 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -25,12 +25,12 @@ package object concurrent { /** A global execution environment for executing lightweight tasks. */ lazy val executionContext = - new akka.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) + new impl.ExecutionContextImpl(java.util.concurrent.Executors.newCachedThreadPool()) /** A global service for scheduling tasks for execution. */ - lazy val scheduler = - new default.SchedulerImpl + // lazy val scheduler = + // new default.SchedulerImpl val handledFutureException: PartialFunction[Throwable, Throwable] = { case t: Throwable if isFutureThrowable(t) => t diff --git a/test/disabled/jvm/scala-concurrent-tck-akka.scala b/test/disabled/jvm/scala-concurrent-tck-akka.scala new file mode 100644 index 0000000000..dfd906e59e --- /dev/null +++ b/test/disabled/jvm/scala-concurrent-tck-akka.scala @@ -0,0 +1,391 @@ + + +import akka.dispatch.{ + Future => future, + Promise => promise +} +import akka.dispatch.Await.{result => await} + +// Duration required for await +import akka.util.Duration +import java.util.concurrent.TimeUnit +import TimeUnit._ + +import scala.concurrent.{ + TimeoutException, + SyncVar, + ExecutionException +} +//import scala.concurrent.future +//import scala.concurrent.promise +//import scala.concurrent.await + + + +trait TestBase { + + implicit val disp = akka.actor.ActorSystem().dispatcher + + def once(body: (() => Unit) => Unit) { + val sv = new SyncVar[Boolean] + body(() => sv put true) + sv.take() + } + +} + + +trait FutureCallbacks extends TestBase { + + def testOnSuccess(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { case any => + done() + assert(x == 1) + } + } + + def testOnSuccessWhenCompleted(): Unit = once { + done => + var x = 0 + val f = future { + x = 1 + } + f onSuccess { case any => + assert(x == 1) + x = 2 + f onSuccess { case any => + assert(x == 2) + done() + } + } + } + + def testOnSuccessWhenFailed(): Unit = once { + done => + val f = future[Unit] { + done() + throw new Exception + } + f onSuccess { case any => + assert(false) + } + } + + def testOnFailure(): Unit = once { + done => + var x = 0 + val f = future[Unit] { + x = 1 + throw new Exception + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case _ => + done() + assert(x == 1) + } + } + + def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { + done => + val f = future[Unit] { + throw cause + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case e: ExecutionException if (e.getCause == cause) => + done() + case _ => + done() + assert(false) + } + } + + def testOnFailureWhenTimeoutException(): Unit = once { + done => + val f = future[Unit] { + throw new TimeoutException() + } + f onSuccess { case any => + done() + assert(false) + } + f onFailure { + case e: TimeoutException => + done() + case other => + done() + assert(false) + } + } + + testOnSuccess() + testOnSuccessWhenCompleted() + testOnSuccessWhenFailed() + testOnFailure() +// testOnFailureWhenSpecialThrowable(5, new Error) +// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) +// testOnFailureWhenSpecialThrowable(7, new InterruptedException) +// testOnFailureWhenTimeoutException() + +} + + +trait FutureCombinators extends TestBase { + + // map: stub + def testMapSuccess(): Unit = once { + done => + done() + } + + def testMapFailure(): Unit = once { + done => + done() + } + + // flatMap: stub + def testFlatMapSuccess(): Unit = once { + done => + done() + } + + def testFlatMapFailure(): Unit = once { + done => + done() + } + + // filter: stub + def testFilterSuccess(): Unit = once { + done => + done() + } + + def testFilterFailure(): Unit = once { + done => + done() + } + + // foreach: stub + def testForeachSuccess(): Unit = once { + done => + done() + } + + def testForeachFailure(): Unit = once { + done => + done() + } + + def testRecoverSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case re: RuntimeException => + "recovered" + } onSuccess { case x => + done() + assert(x == "recovered") + } onFailure { case any => + done() + assert(false) + } + } + + def testRecoverFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } recover { + case te: TimeoutException => "timeout" + } onSuccess { case x => + done() + assert(false) + } onFailure { case any => + done() + assert(any == cause) + } + } + + testMapSuccess() + testMapFailure() + testFlatMapSuccess() + testFlatMapFailure() + testFilterSuccess() + testFilterFailure() + testForeachSuccess() + testForeachFailure() + testRecoverSuccess() + testRecoverFailure() + +} + +/* +trait FutureProjections extends TestBase { + + def testFailedFailureOnComplete(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onComplete { + case Right(t) => + assert(t == cause) + done() + case Left(t) => + assert(false) + } + } + + def testFailedFailureOnSuccess(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + f.failed onSuccess { + t => + assert(t == cause) + done() + } + } + + def testFailedSuccessOnComplete(): Unit = once { + done => + val f = future { 0 } + f.failed onComplete { + case Right(t) => + assert(false) + case Left(t) => + assert(t.isInstanceOf[NoSuchElementException]) + done() + } + } + + def testFailedSuccessOnFailure(): Unit = once { + done => + val f = future { 0 } + f.failed onFailure { + case nsee: NoSuchElementException => + done() + } + } + + def testFailedFailureAwait(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + assert(await(0, f.failed) == cause) + done() + } + + def testFailedSuccessAwait(): Unit = once { + done => + val f = future { 0 } + try { + println(await(0, f.failed)) + assert(false) + } catch { + case nsee: NoSuchElementException => done() + } + } + + testFailedFailureOnComplete() + testFailedFailureOnSuccess() + testFailedSuccessOnComplete() + testFailedSuccessOnFailure() + testFailedFailureAwait() + //testFailedSuccessAwait() + +} +*/ + +trait Blocking extends TestBase { + + def testAwaitSuccess(): Unit = once { + done => + val f = future { 0 } + await(f, Duration(500, "ms")) + done() + } + + def testAwaitFailure(): Unit = once { + done => + val cause = new RuntimeException + val f = future { + throw cause + } + try { + await(f, Duration(500, "ms")) + assert(false) + } catch { + case t => + assert(t == cause) + done() + } + } + + testAwaitSuccess() + testAwaitFailure() + +} + +/* +trait Promises extends TestBase { + + def testSuccess(): Unit = once { + done => + val p = promise[Int]() + val f = p.future + + f.onSuccess { x => + done() + assert(x == 5) + } onFailure { case any => + done() + assert(false) + } + + p.success(5) + } + + testSuccess() + +} +*/ + +trait Exceptions extends TestBase { + +} + + +object Test +extends App +with FutureCallbacks +with FutureCombinators +/*with FutureProjections*/ +/*with Promises*/ +with Blocking +with Exceptions +{ + System.exit(0) +} + + diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala index eb3bbad591..b44d054219 100644 --- a/test/files/jvm/concurrent-future.scala +++ b/test/files/jvm/concurrent-future.scala @@ -22,7 +22,7 @@ object Test extends App { val f = future { output(1, "hai world") } - f onSuccess { _ => + f onSuccess { case _ => output(1, "kthxbye") done() } @@ -33,9 +33,9 @@ object Test extends App { val f = future { output(2, "hai world") } - f onSuccess { _ => + f onSuccess { case _ => output(2, "awsum thx") - f onSuccess { _ => + f onSuccess { case _ => output(2, "kthxbye") done() } @@ -49,7 +49,7 @@ object Test extends App { done() throw new Exception } - f onSuccess { _ => + f onSuccess { case _ => output(3, "onoes") } } @@ -60,7 +60,7 @@ object Test extends App { output(4, "hai world") throw new Exception } - f onSuccess { _ => + f onSuccess { case _ => output(4, "onoes") done() } @@ -76,7 +76,7 @@ object Test extends App { output(num, "hai world") throw cause } - f onSuccess { _ => + f onSuccess { case _ => output(num, "onoes") done() } @@ -96,7 +96,7 @@ object Test extends App { output(8, "hai world") throw new FutureTimeoutException(null) } - f onSuccess { _ => + f onSuccess { case _ => output(8, "onoes") done() } diff --git a/test/files/jvm/scala-concurrent-tck-akka.scala b/test/files/jvm/scala-concurrent-tck-akka.scala deleted file mode 100644 index dfd906e59e..0000000000 --- a/test/files/jvm/scala-concurrent-tck-akka.scala +++ /dev/null @@ -1,391 +0,0 @@ - - -import akka.dispatch.{ - Future => future, - Promise => promise -} -import akka.dispatch.Await.{result => await} - -// Duration required for await -import akka.util.Duration -import java.util.concurrent.TimeUnit -import TimeUnit._ - -import scala.concurrent.{ - TimeoutException, - SyncVar, - ExecutionException -} -//import scala.concurrent.future -//import scala.concurrent.promise -//import scala.concurrent.await - - - -trait TestBase { - - implicit val disp = akka.actor.ActorSystem().dispatcher - - def once(body: (() => Unit) => Unit) { - val sv = new SyncVar[Boolean] - body(() => sv put true) - sv.take() - } - -} - - -trait FutureCallbacks extends TestBase { - - def testOnSuccess(): Unit = once { - done => - var x = 0 - val f = future { - x = 1 - } - f onSuccess { case any => - done() - assert(x == 1) - } - } - - def testOnSuccessWhenCompleted(): Unit = once { - done => - var x = 0 - val f = future { - x = 1 - } - f onSuccess { case any => - assert(x == 1) - x = 2 - f onSuccess { case any => - assert(x == 2) - done() - } - } - } - - def testOnSuccessWhenFailed(): Unit = once { - done => - val f = future[Unit] { - done() - throw new Exception - } - f onSuccess { case any => - assert(false) - } - } - - def testOnFailure(): Unit = once { - done => - var x = 0 - val f = future[Unit] { - x = 1 - throw new Exception - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case _ => - done() - assert(x == 1) - } - } - - def testOnFailureWhenSpecialThrowable(num: Int, cause: Throwable): Unit = once { - done => - val f = future[Unit] { - throw cause - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case e: ExecutionException if (e.getCause == cause) => - done() - case _ => - done() - assert(false) - } - } - - def testOnFailureWhenTimeoutException(): Unit = once { - done => - val f = future[Unit] { - throw new TimeoutException() - } - f onSuccess { case any => - done() - assert(false) - } - f onFailure { - case e: TimeoutException => - done() - case other => - done() - assert(false) - } - } - - testOnSuccess() - testOnSuccessWhenCompleted() - testOnSuccessWhenFailed() - testOnFailure() -// testOnFailureWhenSpecialThrowable(5, new Error) -// testOnFailureWhenSpecialThrowable(6, new scala.util.control.ControlThrowable { }) -// testOnFailureWhenSpecialThrowable(7, new InterruptedException) -// testOnFailureWhenTimeoutException() - -} - - -trait FutureCombinators extends TestBase { - - // map: stub - def testMapSuccess(): Unit = once { - done => - done() - } - - def testMapFailure(): Unit = once { - done => - done() - } - - // flatMap: stub - def testFlatMapSuccess(): Unit = once { - done => - done() - } - - def testFlatMapFailure(): Unit = once { - done => - done() - } - - // filter: stub - def testFilterSuccess(): Unit = once { - done => - done() - } - - def testFilterFailure(): Unit = once { - done => - done() - } - - // foreach: stub - def testForeachSuccess(): Unit = once { - done => - done() - } - - def testForeachFailure(): Unit = once { - done => - done() - } - - def testRecoverSuccess(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } recover { - case re: RuntimeException => - "recovered" - } onSuccess { case x => - done() - assert(x == "recovered") - } onFailure { case any => - done() - assert(false) - } - } - - def testRecoverFailure(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } recover { - case te: TimeoutException => "timeout" - } onSuccess { case x => - done() - assert(false) - } onFailure { case any => - done() - assert(any == cause) - } - } - - testMapSuccess() - testMapFailure() - testFlatMapSuccess() - testFlatMapFailure() - testFilterSuccess() - testFilterFailure() - testForeachSuccess() - testForeachFailure() - testRecoverSuccess() - testRecoverFailure() - -} - -/* -trait FutureProjections extends TestBase { - - def testFailedFailureOnComplete(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - f.failed onComplete { - case Right(t) => - assert(t == cause) - done() - case Left(t) => - assert(false) - } - } - - def testFailedFailureOnSuccess(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - f.failed onSuccess { - t => - assert(t == cause) - done() - } - } - - def testFailedSuccessOnComplete(): Unit = once { - done => - val f = future { 0 } - f.failed onComplete { - case Right(t) => - assert(false) - case Left(t) => - assert(t.isInstanceOf[NoSuchElementException]) - done() - } - } - - def testFailedSuccessOnFailure(): Unit = once { - done => - val f = future { 0 } - f.failed onFailure { - case nsee: NoSuchElementException => - done() - } - } - - def testFailedFailureAwait(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - assert(await(0, f.failed) == cause) - done() - } - - def testFailedSuccessAwait(): Unit = once { - done => - val f = future { 0 } - try { - println(await(0, f.failed)) - assert(false) - } catch { - case nsee: NoSuchElementException => done() - } - } - - testFailedFailureOnComplete() - testFailedFailureOnSuccess() - testFailedSuccessOnComplete() - testFailedSuccessOnFailure() - testFailedFailureAwait() - //testFailedSuccessAwait() - -} -*/ - -trait Blocking extends TestBase { - - def testAwaitSuccess(): Unit = once { - done => - val f = future { 0 } - await(f, Duration(500, "ms")) - done() - } - - def testAwaitFailure(): Unit = once { - done => - val cause = new RuntimeException - val f = future { - throw cause - } - try { - await(f, Duration(500, "ms")) - assert(false) - } catch { - case t => - assert(t == cause) - done() - } - } - - testAwaitSuccess() - testAwaitFailure() - -} - -/* -trait Promises extends TestBase { - - def testSuccess(): Unit = once { - done => - val p = promise[Int]() - val f = p.future - - f.onSuccess { x => - done() - assert(x == 5) - } onFailure { case any => - done() - assert(false) - } - - p.success(5) - } - - testSuccess() - -} -*/ - -trait Exceptions extends TestBase { - -} - - -object Test -extends App -with FutureCallbacks -with FutureCombinators -/*with FutureProjections*/ -/*with Promises*/ -with Blocking -with Exceptions -{ - System.exit(0) -} - - diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index a951c09da2..244ff02da7 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -11,6 +11,7 @@ import scala.concurrent.{ import scala.concurrent.future import scala.concurrent.promise import scala.concurrent.blocking +import scala.util.{ Try, Success, Failure } import scala.util.Duration @@ -23,13 +24,13 @@ trait TestBase { sv.take() } - def assert(cond: => Boolean) { - try { - Predef.assert(cond) - } catch { - case e => e.printStackTrace() - } - } + // def assert(cond: => Boolean) { + // try { + // Predef.assert(cond) + // } catch { + // case e => e.printStackTrace() + // } + // } } @@ -264,10 +265,10 @@ trait FutureProjections extends TestBase { throw cause } f.failed onComplete { - case Right(t) => + case Success(t) => assert(t == cause) done() - case Left(t) => + case Failure(t) => assert(false) } } @@ -289,9 +290,9 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } f.failed onComplete { - case Right(t) => + case Success(t) => assert(false) - case Left(t) => + case Failure(t) => assert(t.isInstanceOf[NoSuchElementException]) done() } diff --git a/test/files/pos/Transactions.scala b/test/files/pos/Transactions.scala index 9b4388300b..525eff7514 100644 --- a/test/files/pos/Transactions.scala +++ b/test/files/pos/Transactions.scala @@ -1,4 +1,4 @@ -package scala.concurrent +package scala.concurrent1 class AbortException extends RuntimeException -- cgit v1.2.3 From 91dcdbd377b02f36b26420e2862eb597f5698c6d Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Sun, 19 Feb 2012 14:54:09 +0100 Subject: Fix race condition in scala-concurrent-tck test --- test/files/jvm/scala-concurrent-tck.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'test/files/jvm') diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 244ff02da7..ba7dffbcb0 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -321,7 +321,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - blocking(f.failed, Duration(0, "ms")) + blocking(f.failed, Duration(500, "ms")) assert(false) } catch { case nsee: NoSuchElementException => done() -- cgit v1.2.3 From 29bcadefb451bbf546e6f763027dac16f5b6f51b Mon Sep 17 00:00:00 2001 From: Adriaan Moors Date: Tue, 6 Mar 2012 15:58:58 +0100 Subject: SI-5189 fixed: safe type infer for constr pattern several fixes to the standard library due to - the safer type checker this fix gives us (thus, some casts had to be inserted) - SI-5548 - type inference gets a bit more complicated, it needs help (chainl1 in combinator.Parsers) To deal with the type slack between actual (run-time) types and statically known types, for each abstract type T, reflect its variance as a skolem that is upper-bounded by T (covariant position), or lower-bounded by T (contravariant). Consider the following example: class AbsWrapperCov[+A] case class Wrapper[B](x: Wrapped[B]) extends AbsWrapperCov[B] def unwrap[T](x: AbsWrapperCov[T]): Wrapped[T] = x match { case Wrapper(wrapped) => // Wrapper's type parameter must not be assumed to be equal to T, // it's *upper-bounded* by it wrapped // : Wrapped[_ <: T] } this method should type check if and only if Wrapped is covariant in its type parameter before inferring Wrapper's type parameter B from x's type AbsWrapperCov[T], we must take into account that x's actual type is: AbsWrapperCov[Tactual] forSome {type Tactual <: T} since AbsWrapperCov is covariant in A -- in other words, we must not assume we know T exactly, all we know is its upper bound since method application is the only way to generate this slack between run-time and compile-time types (TODO: right!?), we can simply replace skolems that represent method type parameters as seen from the method's body by other skolems that are (upper/lower)-bounded by that type-parameter skolem (depending on the variance position of the skolem in the statically assumed type of the scrutinee, pt) this type slack is introduced by adaptConstrPattern: before it calls inferConstructorInstance, it creates a new context that holds the new existential skolems the context created by adaptConstrPattern must not be a CaseDef, since that confuses instantiateTypeVar and the whole pushTypeBounds/restoreTypeBounds dance (CaseDef contexts remember the bounds of the type params that we clobbered during GADT typing) typedCase deskolemizes the existential skolems back to the method skolems, since they don't serve any further purpose (except confusing the old pattern matcher) typedCase is now better at finding that context (using nextEnclosing) --- src/compiler/scala/reflect/internal/Symbols.scala | 6 +- src/compiler/scala/tools/nsc/io/Pickler.scala | 2 +- .../scala/tools/nsc/typechecker/Infer.scala | 10 ++- .../scala/tools/nsc/typechecker/Typers.scala | 86 ++++++++++++++++++++-- src/library/scala/collection/JavaConversions.scala | 12 +-- .../scala/collection/immutable/IntMap.scala | 8 +- .../scala/collection/immutable/LongMap.scala | 8 +- .../scala/util/parsing/combinator/Parsers.scala | 5 +- .../scala/tools/scalap/scalax/rules/Rules.scala | 2 +- test/files/jvm/typerep.scala | 2 +- test/files/neg/t5189b.check | 8 ++ test/files/neg/t5189b.scala | 62 ++++++++++++++++ 12 files changed, 180 insertions(+), 31 deletions(-) create mode 100644 test/files/neg/t5189b.check create mode 100644 test/files/neg/t5189b.scala (limited to 'test/files/jvm') diff --git a/src/compiler/scala/reflect/internal/Symbols.scala b/src/compiler/scala/reflect/internal/Symbols.scala index 62b0206c28..446dbad03f 100644 --- a/src/compiler/scala/reflect/internal/Symbols.scala +++ b/src/compiler/scala/reflect/internal/Symbols.scala @@ -269,9 +269,9 @@ trait Symbols extends api.Symbols { self: SymbolTable => /** Create a new existential type skolem with this symbol its owner, * based on the given symbol and origin. */ - def newExistentialSkolem(basis: Symbol, origin: AnyRef): TypeSkolem = { - val skolem = newTypeSkolemSymbol(basis.name.toTypeName, origin, basis.pos, (basis.flags | EXISTENTIAL) & ~PARAM) - skolem setInfo (basis.info cloneInfo skolem) + def newExistentialSkolem(basis: Symbol, origin: AnyRef, name: TypeName = null, info: Type = null): TypeSkolem = { + val skolem = newTypeSkolemSymbol(if (name eq null) basis.name.toTypeName else name, origin, basis.pos, (basis.flags | EXISTENTIAL) & ~PARAM) + skolem setInfo (if (info eq null) basis.info cloneInfo skolem else info) } final def newExistential(name: TypeName, pos: Position = NoPosition, newFlags: Long = 0L): Symbol = diff --git a/src/compiler/scala/tools/nsc/io/Pickler.scala b/src/compiler/scala/tools/nsc/io/Pickler.scala index 5bb8bdda35..80b6e086da 100644 --- a/src/compiler/scala/tools/nsc/io/Pickler.scala +++ b/src/compiler/scala/tools/nsc/io/Pickler.scala @@ -165,7 +165,7 @@ object Pickler { def pkl[T: Pickler] = implicitly[Pickler[T]] /** A class represenenting `~`-pairs */ - case class ~[S, T](fst: S, snd: T) + case class ~[+S, +T](fst: S, snd: T) /** A wrapper class to be able to use `~` s an infix method */ class TildeDecorator[S](x: S) { diff --git a/src/compiler/scala/tools/nsc/typechecker/Infer.scala b/src/compiler/scala/tools/nsc/typechecker/Infer.scala index 7b72169ba9..50c45a2dc0 100644 --- a/src/compiler/scala/tools/nsc/typechecker/Infer.scala +++ b/src/compiler/scala/tools/nsc/typechecker/Infer.scala @@ -1090,7 +1090,15 @@ trait Infer { inferFor(pt.instantiateTypeParams(ptparams, ptparams map (x => WildcardType))) flatMap { targs => val ctorTpInst = tree.tpe.instantiateTypeParams(undetparams, targs) val resTpInst = skipImplicit(ctorTpInst.finalResultType) - val ptvars = ptparams map freshVar + val ptvars = + ptparams map { + // since instantiateTypeVar wants to modify the skolem that corresponds to the method's type parameter, + // and it uses the TypeVar's origin to locate it, deskolemize the existential skolem to the method tparam skolem + // (the existential skolem was created by adaptConstrPattern to introduce the type slack necessary to soundly deal with variant type parameters) + case skolem if skolem.isExistentialSkolem => freshVar(skolem.deSkolemize.asInstanceOf[TypeSymbol]) + case p => freshVar(p) + } + val ptV = pt.instantiateTypeParams(ptparams, ptvars) if (isPopulated(resTpInst, ptV)) { diff --git a/src/compiler/scala/tools/nsc/typechecker/Typers.scala b/src/compiler/scala/tools/nsc/typechecker/Typers.scala index 556c680cda..e300fdacf9 100644 --- a/src/compiler/scala/tools/nsc/typechecker/Typers.scala +++ b/src/compiler/scala/tools/nsc/typechecker/Typers.scala @@ -852,6 +852,33 @@ trait Typers extends Modes with Adaptations with PatMatVirtualiser { } } + /** + * To deal with the type slack between actual (run-time) types and statically known types, for each abstract type T, + * reflect its variance as a skolem that is upper-bounded by T (covariant position), or lower-bounded by T (contravariant). + * + * Consider the following example: + * + * class AbsWrapperCov[+A] + * case class Wrapper[B](x: Wrapped[B]) extends AbsWrapperCov[B] + * + * def unwrap[T](x: AbsWrapperCov[T]): Wrapped[T] = x match { + * case Wrapper(wrapped) => // Wrapper's type parameter must not be assumed to be equal to T, it's *upper-bounded* by it + * wrapped // : Wrapped[_ <: T] + * } + * + * this method should type check if and only if Wrapped is covariant in its type parameter + * + * when inferring Wrapper's type parameter B from x's type AbsWrapperCov[T], + * we must take into account that x's actual type is AbsWrapperCov[Tactual] forSome {type Tactual <: T} + * as AbsWrapperCov is covariant in A -- in other words, we must not assume we know T exactly, all we know is its upper bound + * + * since method application is the only way to generate this slack between run-time and compile-time types (TODO: right!?), + * we can simply replace skolems that represent method type parameters as seen from the method's body + * by other skolems that are (upper/lower)-bounded by that type-parameter skolem + * (depending on the variance position of the skolem in the statically assumed type of the scrutinee, pt) + * + * see test/files/../t5189*.scala + */ def adaptConstrPattern(): Tree = { // (5) val extractor = tree.symbol.filter(sym => reallyExists(unapplyMember(sym.tpe))) if (extractor != NoSymbol) { @@ -865,7 +892,32 @@ trait Typers extends Modes with Adaptations with PatMatVirtualiser { val tree1 = TypeTree(clazz.primaryConstructor.tpe.asSeenFrom(prefix, clazz.owner)) .setOriginal(tree) - inferConstructorInstance(tree1, clazz.typeParams, pt) + val skolems = new mutable.ListBuffer[TypeSymbol] + object variantToSkolem extends VariantTypeMap { + def apply(tp: Type) = mapOver(tp) match { + case TypeRef(NoPrefix, tpSym, Nil) if variance != 0 && tpSym.isTypeParameterOrSkolem && tpSym.owner.isTerm => + val bounds = if (variance == 1) TypeBounds.upper(tpSym.tpe) else TypeBounds.lower(tpSym.tpe) + val skolem = context.owner.newExistentialSkolem(tpSym, tpSym, unit.freshTypeName("?"+tpSym.name), bounds) + // println("mapping "+ tpSym +" to "+ skolem + " : "+ bounds +" -- pt= "+ pt) + skolems += skolem + skolem.tpe + case tp1 => tp1 + } + } + + // have to open up the existential and put the skolems in scope + // can't simply package up pt in an ExistentialType, because that takes us back to square one (List[_ <: T] == List[T] due to covariance) + val ptSafe = variantToSkolem(pt) // TODO: pt.skolemizeExistential(context.owner, tree) ? + val freeVars = skolems.toList + + // use "tree" for the context, not context.tree: don't make another CaseDef context, + // as instantiateTypeVar's bounds would end up there + val ctorContext = context.makeNewScope(tree, context.owner) + freeVars foreach ctorContext.scope.enter + newTyper(ctorContext).infer.inferConstructorInstance(tree1, clazz.typeParams, ptSafe) + + // tree1's type-slack skolems will be deskolemized (to the method type parameter skolems) + // once the containing CaseDef has been type checked (see typedCase) tree1 } else { tree @@ -1986,15 +2038,35 @@ trait Typers extends Modes with Adaptations with PatMatVirtualiser { val guard1: Tree = if (cdef.guard == EmptyTree) EmptyTree else typed(cdef.guard, BooleanClass.tpe) var body1: Tree = typed(cdef.body, pt) - if (!context.savedTypeBounds.isEmpty) { - body1.tpe = context.restoreTypeBounds(body1.tpe) - if (isFullyDefined(pt) && !(body1.tpe <:< pt)) { - // @M no need for pt.normalize here, is done in erasure + + val contextWithTypeBounds = context.nextEnclosing(_.tree.isInstanceOf[CaseDef]) + if (contextWithTypeBounds.savedTypeBounds nonEmpty) { + body1.tpe = contextWithTypeBounds restoreTypeBounds body1.tpe + + // insert a cast if something typechecked under the GADT constraints, + // but not in real life (i.e., now that's we've reset the method's type skolems' + // infos back to their pre-GADT-constraint state) + if (isFullyDefined(pt) && !(body1.tpe <:< pt)) body1 = typedPos(body1.pos)(gen.mkCast(body1, pt)) - } + } + // body1 = checkNoEscaping.locals(context.scope, pt, body1) - treeCopy.CaseDef(cdef, pat1, guard1, body1) setType body1.tpe + val treeWithSkolems = treeCopy.CaseDef(cdef, pat1, guard1, body1) setType body1.tpe + + // undo adaptConstrPattern's evil deeds, as they confuse the old pattern matcher + // TODO: Paul, can we do the deskolemization lazily in the old pattern matcher + object deskolemizeOnce extends TypeMap { + def apply(tp: Type): Type = mapOver(tp) match { + case TypeRef(pre, sym, args) if sym.isExistentialSkolem && sym.deSkolemize.isSkolem && sym.deSkolemize.owner.isTerm => + typeRef(NoPrefix, sym.deSkolemize, args) + case tp1 => tp1 + } + } + + new TypeMapTreeSubstituter(deskolemizeOnce).traverse(treeWithSkolems) + + treeWithSkolems // now without skolems, actually } def typedCases(cases: List[CaseDef], pattp: Type, pt: Type): List[CaseDef] = diff --git a/src/library/scala/collection/JavaConversions.scala b/src/library/scala/collection/JavaConversions.scala index d5011fc6aa..50919e506a 100644 --- a/src/library/scala/collection/JavaConversions.scala +++ b/src/library/scala/collection/JavaConversions.scala @@ -69,7 +69,7 @@ object JavaConversions { * @return A Java Iterator view of the argument. */ implicit def asJavaIterator[A](it: Iterator[A]): ju.Iterator[A] = it match { - case JIteratorWrapper(wrapped) => wrapped + case JIteratorWrapper(wrapped) => wrapped.asInstanceOf[ju.Iterator[A]] case _ => IteratorWrapper(it) } @@ -87,7 +87,7 @@ object JavaConversions { * @return A Java Enumeration view of the argument. */ implicit def asJavaEnumeration[A](it: Iterator[A]): ju.Enumeration[A] = it match { - case JEnumerationWrapper(wrapped) => wrapped + case JEnumerationWrapper(wrapped) => wrapped.asInstanceOf[ju.Enumeration[A]] case _ => IteratorWrapper(it) } @@ -105,7 +105,7 @@ object JavaConversions { * @return A Java Iterable view of the argument. */ implicit def asJavaIterable[A](i: Iterable[A]): jl.Iterable[A] = i match { - case JIterableWrapper(wrapped) => wrapped + case JIterableWrapper(wrapped) => wrapped.asInstanceOf[jl.Iterable[A]] case _ => IterableWrapper(i) } @@ -121,7 +121,7 @@ object JavaConversions { * @return A Java Collection view of the argument. */ implicit def asJavaCollection[A](it: Iterable[A]): ju.Collection[A] = it match { - case JCollectionWrapper(wrapped) => wrapped + case JCollectionWrapper(wrapped) => wrapped.asInstanceOf[ju.Collection[A]] case _ => new IterableWrapper(it) } @@ -179,7 +179,7 @@ object JavaConversions { * @return A Java List view of the argument. */ implicit def seqAsJavaList[A](seq: Seq[A]): ju.List[A] = seq match { - case JListWrapper(wrapped) => wrapped + case JListWrapper(wrapped) => wrapped.asInstanceOf[ju.List[A]] case _ => new SeqWrapper(seq) } @@ -286,7 +286,7 @@ object JavaConversions { */ implicit def mapAsJavaMap[A, B](m: Map[A, B]): ju.Map[A, B] = m match { //case JConcurrentMapWrapper(wrapped) => wrapped - case JMapWrapper(wrapped) => wrapped + case JMapWrapper(wrapped) => wrapped.asInstanceOf[ju.Map[A, B]] case _ => new MapWrapper(m) } diff --git a/src/library/scala/collection/immutable/IntMap.scala b/src/library/scala/collection/immutable/IntMap.scala index dd6b066878..3c9c0c2f24 100644 --- a/src/library/scala/collection/immutable/IntMap.scala +++ b/src/library/scala/collection/immutable/IntMap.scala @@ -353,19 +353,19 @@ extends AbstractMap[Int, T] def unionWith[S >: T](that : IntMap[S], f : (Int, S, S) => S) : IntMap[S] = (this, that) match{ case (IntMap.Bin(p1, m1, l1, r1), that@(IntMap.Bin(p2, m2, l2, r2))) => if (shorter(m1, m2)) { - if (!hasMatch(p2, p1, m1)) join(p1, this, p2, that); + if (!hasMatch(p2, p1, m1)) join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed else if (zero(p2, m1)) IntMap.Bin(p1, m1, l1.unionWith(that, f), r1); else IntMap.Bin(p1, m1, l1, r1.unionWith(that, f)); } else if (shorter(m2, m1)){ - if (!hasMatch(p1, p2, m2)) join(p1, this, p2, that); + if (!hasMatch(p1, p2, m2)) join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed else if (zero(p1, m2)) IntMap.Bin(p2, m2, this.unionWith(l2, f), r2); else IntMap.Bin(p2, m2, l2, this.unionWith(r2, f)); } else { if (p1 == p2) IntMap.Bin(p1, m1, l1.unionWith(l2,f), r1.unionWith(r2, f)); - else join(p1, this, p2, that); + else join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed } - case (IntMap.Tip(key, value), x) => x.updateWith(key, value, (x, y) => f(key, y, x)); + case (IntMap.Tip(key, value), x) => x.updateWith[S](key, value, (x, y) => f(key, y, x)); case (x, IntMap.Tip(key, value)) => x.updateWith[S](key, value, (x, y) => f(key, x, y)); case (IntMap.Nil, x) => x; case (x, IntMap.Nil) => x; diff --git a/src/library/scala/collection/immutable/LongMap.scala b/src/library/scala/collection/immutable/LongMap.scala index 963ddac762..11b5d1e311 100644 --- a/src/library/scala/collection/immutable/LongMap.scala +++ b/src/library/scala/collection/immutable/LongMap.scala @@ -349,19 +349,19 @@ extends AbstractMap[Long, T] def unionWith[S >: T](that : LongMap[S], f : (Long, S, S) => S) : LongMap[S] = (this, that) match{ case (LongMap.Bin(p1, m1, l1, r1), that@(LongMap.Bin(p2, m2, l2, r2))) => if (shorter(m1, m2)) { - if (!hasMatch(p2, p1, m1)) join(p1, this, p2, that); + if (!hasMatch(p2, p1, m1)) join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed else if (zero(p2, m1)) LongMap.Bin(p1, m1, l1.unionWith(that, f), r1); else LongMap.Bin(p1, m1, l1, r1.unionWith(that, f)); } else if (shorter(m2, m1)){ - if (!hasMatch(p1, p2, m2)) join(p1, this, p2, that); + if (!hasMatch(p1, p2, m2)) join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed else if (zero(p1, m2)) LongMap.Bin(p2, m2, this.unionWith(l2, f), r2); else LongMap.Bin(p2, m2, l2, this.unionWith(r2, f)); } else { if (p1 == p2) LongMap.Bin(p1, m1, l1.unionWith(l2,f), r1.unionWith(r2, f)); - else join(p1, this, p2, that); + else join[S](p1, this, p2, that); // TODO: remove [S] when SI-5548 is fixed } - case (LongMap.Tip(key, value), x) => x.updateWith(key, value, (x, y) => f(key, y, x)); + case (LongMap.Tip(key, value), x) => x.updateWith[S](key, value, (x, y) => f(key, y, x)); // TODO: remove [S] when SI-5548 is fixed case (x, LongMap.Tip(key, value)) => x.updateWith[S](key, value, (x, y) => f(key, x, y)); case (LongMap.Nil, x) => x; case (x, LongMap.Nil) => x; diff --git a/src/library/scala/util/parsing/combinator/Parsers.scala b/src/library/scala/util/parsing/combinator/Parsers.scala index 27e9112fce..9aaf0aeb54 100644 --- a/src/library/scala/util/parsing/combinator/Parsers.scala +++ b/src/library/scala/util/parsing/combinator/Parsers.scala @@ -794,7 +794,7 @@ trait Parsers { */ def chainl1[T, U](first: => Parser[T], p: => Parser[U], q: => Parser[(T, U) => T]): Parser[T] = first ~ rep(q ~ p) ^^ { - case x ~ xs => xs.foldLeft(x){(_, _) match {case (a, f ~ b) => f(a, b)}} + case x ~ xs => xs.foldLeft(x: T){case (a, f ~ b) => f(a, b)} // x's type annotation is needed to deal with changed type inference due to SI-5189 } /** A parser generator that generalises the `rep1sep` generator so that `q`, @@ -812,8 +812,7 @@ trait Parsers { */ def chainr1[T, U](p: => Parser[T], q: => Parser[(T, U) => U], combine: (T, U) => U, first: U): Parser[U] = p ~ rep(q ~ p) ^^ { - case x ~ xs => (new ~(combine, x) :: xs). - foldRight(first){(_, _) match {case (f ~ a, b) => f(a, b)}} + case x ~ xs => (new ~(combine, x) :: xs).foldRight(first){case (f ~ a, b) => f(a, b)} } /** A parser generator for optional sub-phrases. diff --git a/src/scalap/scala/tools/scalap/scalax/rules/Rules.scala b/src/scalap/scala/tools/scalap/scalax/rules/Rules.scala index 43f9c20b1d..70926208b3 100644 --- a/src/scalap/scala/tools/scalap/scalax/rules/Rules.scala +++ b/src/scalap/scala/tools/scalap/scalax/rules/Rules.scala @@ -130,7 +130,7 @@ trait StateRules { def rep(in : S, t : T) : Result[S, T, X] = { if (finished(t)) Success(in, t) else rule(in) match { - case Success(out, f) => rep(out, f(t)) + case Success(out, f) => rep(out, f(t)) // SI-5189 f.asInstanceOf[T => T] case Failure => Failure case Error(x) => Error(x) } diff --git a/test/files/jvm/typerep.scala b/test/files/jvm/typerep.scala index 49a216c05c..3befc7ff3f 100644 --- a/test/files/jvm/typerep.scala +++ b/test/files/jvm/typerep.scala @@ -161,7 +161,7 @@ object TypeRep { }).asInstanceOf[TypeRep[Option[A]]] def getType[A](x: List[A])(implicit rep: TypeRep[A]): TypeRep[List[A]] = (x match { - case h :: t => ListRep(getType(h)) + case h :: t => ListRep(rep) case Nil => NilRep }).asInstanceOf[TypeRep[List[A]]] diff --git a/test/files/neg/t5189b.check b/test/files/neg/t5189b.check new file mode 100644 index 0000000000..7f78cbb438 --- /dev/null +++ b/test/files/neg/t5189b.check @@ -0,0 +1,8 @@ +t5189b.scala:25: error: type mismatch; + found : TestNeg.Wrapped[?T2] where type ?T2 <: T + required: TestNeg.Wrapped[T] +Note: ?T2 <: T, but class Wrapped is invariant in type W. +You may wish to define W as +W instead. (SLS 4.5) + case Wrapper/*[_ <: T ]*/(wrapped) => wrapped // : Wrapped[_ <: T], which is a subtype of Wrapped[T] if and only if Wrapped is covariant in its type parameter + ^ +one error found diff --git a/test/files/neg/t5189b.scala b/test/files/neg/t5189b.scala new file mode 100644 index 0000000000..1750f14084 --- /dev/null +++ b/test/files/neg/t5189b.scala @@ -0,0 +1,62 @@ +class TestPos { + class AbsWrapperCov[+A] + case class Wrapper[B](x: B) extends AbsWrapperCov[B] + + def unwrap[T](x: AbsWrapperCov[T]): T = x match { + case Wrapper/*[_ <: T ]*/(x) => x // _ <: T, which is a subtype of T + } +} + +object TestNeg extends App { + class AbsWrapperCov[+A] + case class Wrapper[B](x: Wrapped[B]) extends AbsWrapperCov[B] + + /* + when inferring Wrapper's type parameter B from x's type AbsWrapperCov[T], + we must take into account that x's actual type is AbsWrapperCov[Tactual] forSome {type Tactual <: T} + as AbsWrapperCov is covariant in A -- in other words, we must not assume we know T exactly, all we know is its upper bound + + since method application is the only way to generate this slack between run-time and compile-time types, + we'll simply replace the skolems that represent method type parameters as seen from the method's body by + other skolems that are (upper/lower)-bounded by the type-parameter skolems + (depending on whether the skolem appears in a covariant/contravariant position) + */ + def unwrap[T](x: AbsWrapperCov[T]): Wrapped[T] = x match { + case Wrapper/*[_ <: T ]*/(wrapped) => wrapped // : Wrapped[_ <: T], which is a subtype of Wrapped[T] if and only if Wrapped is covariant in its type parameter + } + + class Wrapped[W](var cell: W) // must be invariant (to trigger the bug) + + // class A { def imNotAB = println("notB")} + // class B + // + // val w = new Wrapped(new A) + // unwrap[Any](Wrapper(w)).cell = new B + // w.cell.imNotAB +} + +// class TestPos1 { +// class Base[T] +// case class C[T](x: T) extends Base[T] +// def foo[T](b: Base[T]): T = b match { case C(x) => x } +// +// case class Span[K <: Ordered[K]](low: Option[K], high: Option[K]) extends Function1[K, Boolean] { +// override def equals(x$1: Any): Boolean = x$1 match { +// case Span((low$0 @ _), (high$0 @ _)) if low$0.equals(low).$amp$amp(high$0.equals(high)) => true +// case _ => false +// } +// def apply(k: K): Boolean = this match { +// case Span(Some(low), Some(high)) => (k >= low && k <= high) +// case Span(Some(low), None) => (k >= low) +// case Span(None, Some(high)) => (k <= high) +// case _ => false +// } +// } +// } +// +// class TestNeg1 { +// case class Foo[T, U](f: T => U) +// def f(x: Any): Any => Any = x match { case Foo(bar) => bar } +// // uh-oh, Any => Any should be Nothing => Any. +// } + -- cgit v1.2.3