From faf0f3de05e79af3fd7b5cf3bc3f97331e25042e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 19 Jul 2012 15:27:28 +0200 Subject: Critical bugfixes/leak fixes/API corrections + ScalaDoc for SIP-14 --- src/library/scala/concurrent/BlockContext.scala | 35 ++- src/library/scala/concurrent/Future.scala | 6 +- .../default/SchedulerImpl.scala.disabled | 44 --- .../concurrent/default/TaskImpl.scala.disabled | 313 --------------------- .../concurrent/impl/ExecutionContextImpl.scala | 10 +- src/library/scala/concurrent/impl/Future.scala | 9 - src/library/scala/concurrent/package.scala | 76 ++--- test/files/jvm/future-spec/FutureTests.scala | 6 + test/files/jvm/future-spec/PromiseTests.scala | 9 +- test/files/jvm/scala-concurrent-tck.scala | 20 +- 10 files changed, 85 insertions(+), 443 deletions(-) delete mode 100644 src/library/scala/concurrent/default/SchedulerImpl.scala.disabled delete mode 100644 src/library/scala/concurrent/default/TaskImpl.scala.disabled diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala index a5b878c546..640560a174 100644 --- a/src/library/scala/concurrent/BlockContext.scala +++ b/src/library/scala/concurrent/BlockContext.scala @@ -12,9 +12,10 @@ import java.lang.Thread import scala.concurrent.util.Duration /** - * A context to be notified by `scala.concurrent.blocking()` when + * A context to be notified by `scala.concurrent.blocking` when * a thread is about to block. In effect this trait provides - * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()` + * the implementation for `scala.concurrent.Await`. + * `scala.concurrent.Await.result()` and `scala.concurrent.Await.ready()` * locates an instance of `BlockContext` by first looking for one * provided through `BlockContext.withBlockContext()` and failing that, * checking whether `Thread.currentThread` is an instance of `BlockContext`. @@ -27,11 +28,11 @@ import scala.concurrent.util.Duration * {{{ * val oldContext = BlockContext.current * val myContext = new BlockContext { - * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + * override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { * // you'd have code here doing whatever you need to do * // when the thread is about to block. * // Then you'd chain to the previous context: - * oldContext.internalBlockingCall(awaitable, atMost) + * oldContext.blockOn(thunk) * } * } * BlockContext.withBlockContext(myContext) { @@ -42,35 +43,33 @@ import scala.concurrent.util.Duration */ trait BlockContext { - /** Used internally by the framework; blocks execution for at most - * `atMost` time while waiting for an `awaitable` object to become ready. + /** Used internally by the framework; + * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`. * - * Clients should use `scala.concurrent.blocking` instead; this is - * the implementation of `scala.concurrent.blocking`, generally - * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`. + * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead. */ - def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T + def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T } object BlockContext { private object DefaultBlockContext extends BlockContext { - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - awaitable.result(atMost)(Await.canAwaitEvidence) + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk } - private val contextLocal = new ThreadLocal[BlockContext]() { - override def initialValue = Thread.currentThread match { + private val contextLocal = new ThreadLocal[BlockContext]() + + /** Obtain the current thread's current `BlockContext`. */ + def current: BlockContext = contextLocal.get match { + case null => Thread.currentThread match { case ctx: BlockContext => ctx case _ => DefaultBlockContext } + case some => some } - /** Obtain the current thread's current `BlockContext`. */ - def current: BlockContext = contextLocal.get - /** Pushes a current `BlockContext` while executing `body`. */ def withBlockContext[T](blockContext: BlockContext)(body: => T): T = { - val old = contextLocal.get + val old = contextLocal.get // can be null try { contextLocal.set(blockContext) body diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala index f82b79cb18..d24fdbf005 100644 --- a/src/library/scala/concurrent/Future.scala +++ b/src/library/scala/concurrent/Future.scala @@ -707,11 +707,9 @@ object Future { // doesn't need to create defaultExecutionContext as // a side effect. private[concurrent] object InternalCallbackExecutor extends ExecutionContext { - def execute(runnable: Runnable): Unit = + override def execute(runnable: Runnable): Unit = runnable.run() - def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback") - def reportFailure(t: Throwable): Unit = + override def reportFailure(t: Throwable): Unit = throw new IllegalStateException("problem in scala.concurrent internal callback", t) } } diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled deleted file mode 100644 index 241efa8857..0000000000 --- a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled +++ /dev/null @@ -1,44 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.concurrent -package default - -import scala.concurrent.util.Duration - -private[concurrent] final class SchedulerImpl extends Scheduler { - private val timer = - new java.util.Timer(true) // the associated thread runs as a daemon - - def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ??? - - def scheduleOnce(delay: Duration, task: Runnable): Cancellable = { - val timerTask = new java.util.TimerTask { - def run(): Unit = - task.run() - } - timer.schedule(timerTask, delay.toMillis) - new Cancellable { - def cancel(): Unit = - timerTask.cancel() - } - } - - def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = { - val timerTask = new java.util.TimerTask { - def run(): Unit = - task - } - timer.schedule(timerTask, delay.toMillis) - new Cancellable { - def cancel(): Unit = - timerTask.cancel() - } - } - -} diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled deleted file mode 100644 index 8b4eb12d4f..0000000000 --- a/src/library/scala/concurrent/default/TaskImpl.scala.disabled +++ /dev/null @@ -1,313 +0,0 @@ -package scala.concurrent -package default - - - -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater -import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread } -import scala.util.Try -import scala.util -import scala.concurrent.util.Duration -import scala.annotation.tailrec -import scala.util.control.NonFatal - - -private[concurrent] trait Completable[T] { -self: Future[T] => - - val executor: ExecutionContextImpl - - def newPromise[S]: Promise[S] = executor promise - - type Callback = Try[T] => Any - - def getState: State[T] - - def casState(oldv: State[T], newv: State[T]): Boolean - - protected def dispatch[U](r: Runnable) = executionContext execute r - - protected def processCallbacks(cbs: List[Callback], r: Try[T]) = - for (cb <- cbs) dispatch(new Runnable { - override def run() = cb(r) - }) - - def future: Future[T] = self - - def onComplete[U](callback: Try[T] => U): this.type = { - @tailrec def tryAddCallback(): Try[T] = { - getState match { - case p @ Pending(lst) => - val pt = p.asInstanceOf[Pending[T]] - if (casState(pt, Pending(callback :: pt.callbacks))) null - else tryAddCallback() - case Success(res) => util.Success(res) - case Failure(t) => util.Failure(t) - } - } - - val res = tryAddCallback() - if (res != null) dispatch(new Runnable { - override def run() = - try callback(res) - catch handledFutureException andThen { - t => Console.err.println(t) - } - }) - - this - } - - def isTimedout: Boolean = getState match { - case Failure(ft: FutureTimeoutException) => true - case _ => false - } - -} - -private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl) -extends Promise[T] with Future[T] with Completable[T] { - - val executor: scala.concurrent.default.ExecutionContextImpl = context - - @volatile private var state: State[T] = _ - - val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state") - - updater.set(this, Pending(List())) - - def casState(oldv: State[T], newv: State[T]): Boolean = { - updater.compareAndSet(this, oldv, newv) - } - - def getState: State[T] = { - updater.get(this) - } - - @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { - case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs - case _ => null - } - - def tryComplete(r: Try[T]) = r match { - case util.Failure(t) => tryFailure(t) - case util.Success(v) => trySuccess(v) - } - - override def trySuccess(value: T): Boolean = { - val cbs = tryCompleteState(Success(value)) - if (cbs == null) - false - else { - processCallbacks(cbs, util.Success(value)) - this.synchronized { - this.notifyAll() - } - true - } - } - - override def tryFailure(t: Throwable): Boolean = { - val wrapped = wrap(t) - val cbs = tryCompleteState(Failure(wrapped)) - if (cbs == null) - false - else { - processCallbacks(cbs, util.Failure(wrapped)) - this.synchronized { - this.notifyAll() - } - true - } - } - - def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match { - case Success(res) => res - case Failure(t) => throw t - case _ => - this.synchronized { - while (true) - getState match { - case Pending(_) => this.wait() - case Success(res) => return res - case Failure(t) => throw t - } - } - sys.error("unreachable") - } - -} - -private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T) -extends RecursiveAction with Task[T] with Future[T] with Completable[T] { - - val executor: ExecutionContextImpl = context - - @volatile private var state: State[T] = _ - - val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state") - - updater.set(this, Pending(List())) - - def casState(oldv: State[T], newv: State[T]): Boolean = { - updater.compareAndSet(this, oldv, newv) - } - - def getState: State[T] = { - updater.get(this) - } - - @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match { - case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs - } - - def compute(): Unit = { - var cbs: List[Callback] = null - try { - val res = body - processCallbacks(tryCompleteState(Success(res)), util.Success(res)) - } catch { - case t if NonFatal(t) => - processCallbacks(tryCompleteState(Failure(t)), util.Failure(t)) - case t => - val ee = new ExecutionException(t) - processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee)) - throw t - } - } - - def start(): Unit = { - Thread.currentThread match { - case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork() - case _ => executor.pool.execute(this) - } - } - - // TODO FIXME: handle timeouts - def await(atMost: Duration): this.type = - await - - def await: this.type = { - this.join() - this - } - - def tryCancel(): Unit = - tryUnfork() - - def await(atMost: Duration)(implicit canawait: CanAwait): T = { - join() // TODO handle timeout also - (updater.get(this): @unchecked) match { - case Success(r) => r - case Failure(t) => throw t - } - } - -} - - -private[concurrent] sealed abstract class State[T] - - -case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T] - - -case class Success[T](result: T) extends State[T] - - -case class Failure[T](throwable: Throwable) extends State[T] - - -private[concurrent] final class ExecutionContextImpl extends ExecutionContext { - import ExecutionContextImpl._ - - val pool = { - val p = new ForkJoinPool - p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler { - def uncaughtException(t: Thread, throwable: Throwable) { - Console.err.println(throwable.getMessage) - throwable.printStackTrace(Console.err) - } - }) - p - } - - @inline - private def executeTask(task: RecursiveAction) { - if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) - task.fork() - else - pool execute task - } - - def execute(task: Runnable) { - val action = new RecursiveAction { def compute() { task.run() } } - executeTask(action) - } - - def execute[U](body: () => U) { - val action = new RecursiveAction { def compute() { body() } } - executeTask(action) - } - - def task[T](body: => T): Task[T] = { - new TaskImpl(this, body) - } - - def future[T](body: => T): Future[T] = { - val t = task(body) - t.start() - t.future - } - - def promise[T]: Promise[T] = - new PromiseImpl[T](this) - - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) - - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { - currentExecutionContext.get match { - case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case - case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor - case x => x.blocking(awaitable, atMost) - } - } - - private def blockingCall[T](b: Awaitable[T]): T = b match { - case fj: TaskImpl[_] if fj.executor.pool eq pool => - fj.await(Duration.fromNanos(0)) - case _ => - var res: T = null.asInstanceOf[T] - @volatile var blockingDone = false - // TODO add exception handling here! - val mb = new ForkJoinPool.ManagedBlocker { - def block() = { - res = b.await(Duration.fromNanos(0))(CanAwaitEvidence) - blockingDone = true - true - } - def isReleasable = blockingDone - } - ForkJoinPool.managedBlock(mb, true) - res - } - - def reportFailure(t: Throwable): Unit = {} - -} - - -object ExecutionContextImpl { - - private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] { - override protected def initialValue = null - } - -} - - - - - - - diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala index 98f821652f..875a558887 100644 --- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala @@ -13,7 +13,7 @@ package scala.concurrent.impl import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor } import java.util.Collection import scala.concurrent.forkjoin._ -import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService } +import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService } import scala.concurrent.util.Duration import scala.util.control.NonFatal @@ -37,15 +37,15 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter: def newThread(runnable: Runnable): Thread = wire(new Thread(runnable)) def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext { - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = { + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = { var result: T = null.asInstanceOf[T] ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @volatile var isdone = false - def block(): Boolean = { - result = try awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) finally { isdone = true } + override def block(): Boolean = { + result = try thunk finally { isdone = true } true } - def isReleasable = isdone + override def isReleasable = isdone }) result } diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index 132e1d79e7..b32824c0c9 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -22,15 +22,6 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa private[concurrent] object Future { - /** Wraps a block of code into an awaitable object. */ - private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] { - def ready(atMost: Duration)(implicit permit: CanAwait) = { - body - this - } - def result(atMost: Duration)(implicit permit: CanAwait) = body - } - def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) scala.concurrent.Future.toBoxed(c) else c private[impl] class PromiseCompletingRunnable[T](body: => T) diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 76703bf081..a6488b602f 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -9,6 +9,7 @@ package scala import scala.concurrent.util.Duration +import scala.annotation.implicitNotFound /** This package object contains primitives for concurrent and parallel programming. */ @@ -17,6 +18,41 @@ package object concurrent { type CancellationException = java.util.concurrent.CancellationException type TimeoutException = java.util.concurrent.TimeoutException + @implicitNotFound("Don't call `Awaitable` methods directly, use the `Await` object.") + sealed trait CanAwait + private implicit object AwaitPermission extends CanAwait + + /** + * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances. + */ + object Await { + /** + * Invokes ready() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`. + * ready() blocks until the awaitable has completed or the timeout expires. + * + * Throws a TimeoutException if the timeout expires, as that is in the contract of `Awaitable.ready`. + * @param awaitable the `Awaitable` on which `ready` is to be called + * @param atMost the maximum timeout for which to wait + * @return the result of `awaitable.ready` which is defined to be the awaitable itself. + */ + @throws(classOf[TimeoutException]) + def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = + blocking(awaitable.ready(atMost)) + + /** + * Invokes result() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`. + * result() blocks until the awaitable has completed or the timeout expires. + * + * Throws a TimeoutException if the timeout expires, or any exception thrown by `Awaitable.result`. + * @param awaitable the `Awaitable` on which `result` is to be called + * @param atMost the maximum timeout for which to wait + * @return the result of `awaitable.result` + */ + @throws(classOf[Exception]) + def result[T](awaitable: Awaitable[T], atMost: Duration): T = + blocking(awaitable.result(atMost)) + } + /** Starts an asynchronous computation and returns a `Future` object with the result of that computation. * * The result becomes available once the asynchronous computation is completed. @@ -36,46 +72,18 @@ package object concurrent { */ def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]() - /** Used to block on a piece of code which potentially blocks. + /** Used to designate a piece of code which potentially blocks, allowing the BlockContext to adjust the runtime's behavior. + * Properly marking blocking code may improve performance or avoid deadlocks. * - * @param body A piece of code which contains potentially blocking or long running calls. - * - * Calling this method may throw the following exceptions: - * - CancellationException - if the computation was cancelled - * - InterruptedException - in the case that a wait within the blockable object was interrupted - * - TimeoutException - in the case that the blockable object timed out - */ - def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf) - - /** Blocks on an awaitable object. + * If you have an `Awaitable` then you should use Await.result instead of `blocking`. * - * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. + * @param body A piece of code which contains potentially blocking or long running calls. * * Calling this method may throw the following exceptions: * - CancellationException - if the computation was cancelled * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = - BlockContext.current.internalBlockingCall(awaitable, atMost) -} - -/* concurrency constructs */ -package concurrent { - - sealed trait CanAwait - - object Await { - private[concurrent] implicit val canAwaitEvidence = new CanAwait {} - - def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = { - blocking(awaitable, atMost) - awaitable - } - - def result[T](awaitable: Awaitable[T], atMost: Duration): T = { - blocking(awaitable, atMost) - } - - } + @throws(classOf[Exception]) + def blocking[T](body: =>T): T = BlockContext.current.blockOn(body) } diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala index ca9ff5090f..30e1a722bf 100644 --- a/test/files/jvm/future-spec/FutureTests.scala +++ b/test/files/jvm/future-spec/FutureTests.scala @@ -507,6 +507,12 @@ object FutureTests extends MinimalScalaTest { } Await.ready(complex, defaultTimeout).isCompleted mustBe (true) } + + "should not throw when Await.ready" in { + val expected = try Right(5 / 0) catch { case a: ArithmeticException => Left(a) } + val f = future(5).map(_ / 0) + Await.ready(f, defaultTimeout).value.get.toString mustBe expected.toString + } } diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala index 49bc642b57..d15bb31f36 100644 --- a/test/files/jvm/future-spec/PromiseTests.scala +++ b/test/files/jvm/future-spec/PromiseTests.scala @@ -78,7 +78,7 @@ object PromiseTests extends MinimalScalaTest { "contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) } - "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) } + "return when ready with 'Await.ready'" in { f((future, result) => Await.ready(future, defaultTimeout).isCompleted mustBe (true)) } "return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) } @@ -163,12 +163,9 @@ object PromiseTests extends MinimalScalaTest { }) } - "throw exception with 'blocking'" in { + "throw not throw exception with 'Await.ready'" in { f { - (future, message) => - intercept[E] { - blocking(future, defaultTimeout) - }.getMessage mustBe (message) + (future, message) => Await.ready(future, defaultTimeout).isCompleted mustBe (true) } } diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala index 5c9c71f3f8..1209b710b0 100644 --- a/test/files/jvm/scala-concurrent-tck.scala +++ b/test/files/jvm/scala-concurrent-tck.scala @@ -4,7 +4,9 @@ import scala.concurrent.{ TimeoutException, SyncVar, ExecutionException, - ExecutionContext + ExecutionContext, + CanAwait, + Await } import scala.concurrent.{ future, promise, blocking } import scala.util.{ Try, Success, Failure } @@ -647,7 +649,7 @@ trait FutureProjections extends TestBase { val f = future { throw cause } - assert(blocking(f.failed, Duration(500, "ms")) == cause) + assert(Await.result(f.failed, Duration(500, "ms")) == cause) done() } @@ -655,7 +657,7 @@ trait FutureProjections extends TestBase { done => val f = future { 0 } try { - blocking(f.failed, Duration(500, "ms")) + Await.result(f.failed, Duration(500, "ms")) assert(false) } catch { case nsee: NoSuchElementException => done() @@ -678,7 +680,7 @@ trait Blocking extends TestBase { def testAwaitSuccess(): Unit = once { done => val f = future { 0 } - blocking(f, Duration(500, "ms")) + Await.result(f, Duration(500, "ms")) done() } @@ -689,7 +691,7 @@ trait Blocking extends TestBase { throw cause } try { - blocking(f, Duration(500, "ms")) + Await.result(f, Duration(500, "ms")) assert(false) } catch { case t => @@ -708,7 +710,7 @@ trait BlockContexts extends TestBase { import scala.concurrent.{ Await, Awaitable, BlockContext } private def getBlockContext(body: => BlockContext): BlockContext = { - blocking(Future { body }, Duration(500, "ms")) + Await.result(Future { body }, Duration(500, "ms")) } // test outside of an ExecutionContext @@ -727,8 +729,7 @@ trait BlockContexts extends TestBase { def testPushCustom(): Unit = { val orig = BlockContext.current val customBC = new BlockContext() { - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - orig.internalBlockingCall(awaitable, atMost) + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk) } val bc = getBlockContext({ @@ -744,8 +745,7 @@ trait BlockContexts extends TestBase { def testPopCustom(): Unit = { val orig = BlockContext.current val customBC = new BlockContext() { - override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = - orig.internalBlockingCall(awaitable, atMost) + override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk) } val bc = getBlockContext({ -- cgit v1.2.3 From a6aaee845174c3102d4602d16dab9fd673bf4f77 Mon Sep 17 00:00:00 2001 From: phaller Date: Thu, 19 Jul 2012 16:53:21 +0200 Subject: Clean ups in impl.Future --- src/library/scala/concurrent/impl/Future.scala | 15 +++------------ src/library/scala/concurrent/impl/Promise.scala | 2 +- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala index b32824c0c9..098008e958 100644 --- a/src/library/scala/concurrent/impl/Future.scala +++ b/src/library/scala/concurrent/impl/Future.scala @@ -10,22 +10,13 @@ package scala.concurrent.impl -import scala.concurrent.util.Duration -import scala.concurrent.{Awaitable, ExecutionContext, CanAwait} -import scala.collection.mutable.Stack +import scala.concurrent.ExecutionContext import scala.util.control.NonFatal -private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { - -} private[concurrent] object Future { - - def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) scala.concurrent.Future.toBoxed(c) else c - - private[impl] class PromiseCompletingRunnable[T](body: => T) - extends Runnable { + class PromiseCompletingRunnable[T](body: => T) extends Runnable { val promise = new Promise.DefaultPromise[T]() override def run() = { @@ -35,7 +26,7 @@ private[concurrent] object Future { } } - def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = { + def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = { val runnable = new PromiseCompletingRunnable(body) executor.execute(runnable) runnable.promise.future diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala index 84638586cf..c2df9ac296 100644 --- a/src/library/scala/concurrent/impl/Promise.scala +++ b/src/library/scala/concurrent/impl/Promise.scala @@ -18,7 +18,7 @@ import scala.util.control.NonFatal -private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { +private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] { def future: this.type = this } -- cgit v1.2.3