summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/BlockContext.scala35
-rw-r--r--src/library/scala/concurrent/Future.scala6
-rw-r--r--src/library/scala/concurrent/default/SchedulerImpl.scala.disabled44
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala.disabled313
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala10
-rw-r--r--src/library/scala/concurrent/impl/Future.scala24
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala2
-rw-r--r--src/library/scala/concurrent/package.scala76
-rw-r--r--test/files/jvm/future-spec/FutureTests.scala6
-rw-r--r--test/files/jvm/future-spec/PromiseTests.scala9
-rw-r--r--test/files/jvm/scala-concurrent-tck.scala20
11 files changed, 89 insertions, 456 deletions
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
index a5b878c546..640560a174 100644
--- a/src/library/scala/concurrent/BlockContext.scala
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -12,9 +12,10 @@ import java.lang.Thread
import scala.concurrent.util.Duration
/**
- * A context to be notified by `scala.concurrent.blocking()` when
+ * A context to be notified by `scala.concurrent.blocking` when
* a thread is about to block. In effect this trait provides
- * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * the implementation for `scala.concurrent.Await`.
+ * `scala.concurrent.Await.result()` and `scala.concurrent.Await.ready()`
* locates an instance of `BlockContext` by first looking for one
* provided through `BlockContext.withBlockContext()` and failing that,
* checking whether `Thread.currentThread` is an instance of `BlockContext`.
@@ -27,11 +28,11 @@ import scala.concurrent.util.Duration
* {{{
* val oldContext = BlockContext.current
* val myContext = new BlockContext {
- * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
* // you'd have code here doing whatever you need to do
* // when the thread is about to block.
* // Then you'd chain to the previous context:
- * oldContext.internalBlockingCall(awaitable, atMost)
+ * oldContext.blockOn(thunk)
* }
* }
* BlockContext.withBlockContext(myContext) {
@@ -42,35 +43,33 @@ import scala.concurrent.util.Duration
*/
trait BlockContext {
- /** Used internally by the framework; blocks execution for at most
- * `atMost` time while waiting for an `awaitable` object to become ready.
+ /** Used internally by the framework;
+ * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`.
*
- * Clients should use `scala.concurrent.blocking` instead; this is
- * the implementation of `scala.concurrent.blocking`, generally
- * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead.
*/
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+ def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T
}
object BlockContext {
private object DefaultBlockContext extends BlockContext {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- awaitable.result(atMost)(Await.canAwaitEvidence)
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk
}
- private val contextLocal = new ThreadLocal[BlockContext]() {
- override def initialValue = Thread.currentThread match {
+ private val contextLocal = new ThreadLocal[BlockContext]()
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get match {
+ case null => Thread.currentThread match {
case ctx: BlockContext => ctx
case _ => DefaultBlockContext
}
+ case some => some
}
- /** Obtain the current thread's current `BlockContext`. */
- def current: BlockContext = contextLocal.get
-
/** Pushes a current `BlockContext` while executing `body`. */
def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
- val old = contextLocal.get
+ val old = contextLocal.get // can be null
try {
contextLocal.set(blockContext)
body
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index f82b79cb18..d24fdbf005 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -707,11 +707,9 @@ object Future {
// doesn't need to create defaultExecutionContext as
// a side effect.
private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
- def execute(runnable: Runnable): Unit =
+ override def execute(runnable: Runnable): Unit =
runnable.run()
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback")
- def reportFailure(t: Throwable): Unit =
+ override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
}
}
diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
deleted file mode 100644
index 241efa8857..0000000000
--- a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
+++ /dev/null
@@ -1,44 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-package scala.concurrent
-package default
-
-import scala.concurrent.util.Duration
-
-private[concurrent] final class SchedulerImpl extends Scheduler {
- private val timer =
- new java.util.Timer(true) // the associated thread runs as a daemon
-
- def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ???
-
- def scheduleOnce(delay: Duration, task: Runnable): Cancellable = {
- val timerTask = new java.util.TimerTask {
- def run(): Unit =
- task.run()
- }
- timer.schedule(timerTask, delay.toMillis)
- new Cancellable {
- def cancel(): Unit =
- timerTask.cancel()
- }
- }
-
- def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = {
- val timerTask = new java.util.TimerTask {
- def run(): Unit =
- task
- }
- timer.schedule(timerTask, delay.toMillis)
- new Cancellable {
- def cancel(): Unit =
- timerTask.cancel()
- }
- }
-
-}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
deleted file mode 100644
index 8b4eb12d4f..0000000000
--- a/src/library/scala/concurrent/default/TaskImpl.scala.disabled
+++ /dev/null
@@ -1,313 +0,0 @@
-package scala.concurrent
-package default
-
-
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
-import scala.util.Try
-import scala.util
-import scala.concurrent.util.Duration
-import scala.annotation.tailrec
-import scala.util.control.NonFatal
-
-
-private[concurrent] trait Completable[T] {
-self: Future[T] =>
-
- val executor: ExecutionContextImpl
-
- def newPromise[S]: Promise[S] = executor promise
-
- type Callback = Try[T] => Any
-
- def getState: State[T]
-
- def casState(oldv: State[T], newv: State[T]): Boolean
-
- protected def dispatch[U](r: Runnable) = executionContext execute r
-
- protected def processCallbacks(cbs: List[Callback], r: Try[T]) =
- for (cb <- cbs) dispatch(new Runnable {
- override def run() = cb(r)
- })
-
- def future: Future[T] = self
-
- def onComplete[U](callback: Try[T] => U): this.type = {
- @tailrec def tryAddCallback(): Try[T] = {
- getState match {
- case p @ Pending(lst) =>
- val pt = p.asInstanceOf[Pending[T]]
- if (casState(pt, Pending(callback :: pt.callbacks))) null
- else tryAddCallback()
- case Success(res) => util.Success(res)
- case Failure(t) => util.Failure(t)
- }
- }
-
- val res = tryAddCallback()
- if (res != null) dispatch(new Runnable {
- override def run() =
- try callback(res)
- catch handledFutureException andThen {
- t => Console.err.println(t)
- }
- })
-
- this
- }
-
- def isTimedout: Boolean = getState match {
- case Failure(ft: FutureTimeoutException) => true
- case _ => false
- }
-
-}
-
-private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
-extends Promise[T] with Future[T] with Completable[T] {
-
- val executor: scala.concurrent.default.ExecutionContextImpl = context
-
- @volatile private var state: State[T] = _
-
- val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state")
-
- updater.set(this, Pending(List()))
-
- def casState(oldv: State[T], newv: State[T]): Boolean = {
- updater.compareAndSet(this, oldv, newv)
- }
-
- def getState: State[T] = {
- updater.get(this)
- }
-
- @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
- case _ => null
- }
-
- def tryComplete(r: Try[T]) = r match {
- case util.Failure(t) => tryFailure(t)
- case util.Success(v) => trySuccess(v)
- }
-
- override def trySuccess(value: T): Boolean = {
- val cbs = tryCompleteState(Success(value))
- if (cbs == null)
- false
- else {
- processCallbacks(cbs, util.Success(value))
- this.synchronized {
- this.notifyAll()
- }
- true
- }
- }
-
- override def tryFailure(t: Throwable): Boolean = {
- val wrapped = wrap(t)
- val cbs = tryCompleteState(Failure(wrapped))
- if (cbs == null)
- false
- else {
- processCallbacks(cbs, util.Failure(wrapped))
- this.synchronized {
- this.notifyAll()
- }
- true
- }
- }
-
- def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match {
- case Success(res) => res
- case Failure(t) => throw t
- case _ =>
- this.synchronized {
- while (true)
- getState match {
- case Pending(_) => this.wait()
- case Success(res) => return res
- case Failure(t) => throw t
- }
- }
- sys.error("unreachable")
- }
-
-}
-
-private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
-extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
-
- val executor: ExecutionContextImpl = context
-
- @volatile private var state: State[T] = _
-
- val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state")
-
- updater.set(this, Pending(List()))
-
- def casState(oldv: State[T], newv: State[T]): Boolean = {
- updater.compareAndSet(this, oldv, newv)
- }
-
- def getState: State[T] = {
- updater.get(this)
- }
-
- @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
- }
-
- def compute(): Unit = {
- var cbs: List[Callback] = null
- try {
- val res = body
- processCallbacks(tryCompleteState(Success(res)), util.Success(res))
- } catch {
- case t if NonFatal(t) =>
- processCallbacks(tryCompleteState(Failure(t)), util.Failure(t))
- case t =>
- val ee = new ExecutionException(t)
- processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee))
- throw t
- }
- }
-
- def start(): Unit = {
- Thread.currentThread match {
- case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork()
- case _ => executor.pool.execute(this)
- }
- }
-
- // TODO FIXME: handle timeouts
- def await(atMost: Duration): this.type =
- await
-
- def await: this.type = {
- this.join()
- this
- }
-
- def tryCancel(): Unit =
- tryUnfork()
-
- def await(atMost: Duration)(implicit canawait: CanAwait): T = {
- join() // TODO handle timeout also
- (updater.get(this): @unchecked) match {
- case Success(r) => r
- case Failure(t) => throw t
- }
- }
-
-}
-
-
-private[concurrent] sealed abstract class State[T]
-
-
-case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T]
-
-
-case class Success[T](result: T) extends State[T]
-
-
-case class Failure[T](throwable: Throwable) extends State[T]
-
-
-private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
- import ExecutionContextImpl._
-
- val pool = {
- val p = new ForkJoinPool
- p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
- def uncaughtException(t: Thread, throwable: Throwable) {
- Console.err.println(throwable.getMessage)
- throwable.printStackTrace(Console.err)
- }
- })
- p
- }
-
- @inline
- private def executeTask(task: RecursiveAction) {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
- task.fork()
- else
- pool execute task
- }
-
- def execute(task: Runnable) {
- val action = new RecursiveAction { def compute() { task.run() } }
- executeTask(action)
- }
-
- def execute[U](body: () => U) {
- val action = new RecursiveAction { def compute() { body() } }
- executeTask(action)
- }
-
- def task[T](body: => T): Task[T] = {
- new TaskImpl(this, body)
- }
-
- def future[T](body: => T): Future[T] = {
- val t = task(body)
- t.start()
- t.future
- }
-
- def promise[T]: Promise[T] =
- new PromiseImpl[T](this)
-
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
-
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor
- case x => x.blocking(awaitable, atMost)
- }
- }
-
- private def blockingCall[T](b: Awaitable[T]): T = b match {
- case fj: TaskImpl[_] if fj.executor.pool eq pool =>
- fj.await(Duration.fromNanos(0))
- case _ =>
- var res: T = null.asInstanceOf[T]
- @volatile var blockingDone = false
- // TODO add exception handling here!
- val mb = new ForkJoinPool.ManagedBlocker {
- def block() = {
- res = b.await(Duration.fromNanos(0))(CanAwaitEvidence)
- blockingDone = true
- true
- }
- def isReleasable = blockingDone
- }
- ForkJoinPool.managedBlock(mb, true)
- res
- }
-
- def reportFailure(t: Throwable): Unit = {}
-
-}
-
-
-object ExecutionContextImpl {
-
- private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
- override protected def initialValue = null
- }
-
-}
-
-
-
-
-
-
-
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 98f821652f..875a558887 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,7 +13,7 @@ package scala.concurrent.impl
import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
@@ -37,15 +37,15 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))
def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
var result: T = null.asInstanceOf[T]
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
- def block(): Boolean = {
- result = try awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) finally { isdone = true }
+ override def block(): Boolean = {
+ result = try thunk finally { isdone = true }
true
}
- def isReleasable = isdone
+ override def isReleasable = isdone
})
result
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 132e1d79e7..098008e958 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -10,31 +10,13 @@ package scala.concurrent.impl
-import scala.concurrent.util.Duration
-import scala.concurrent.{Awaitable, ExecutionContext, CanAwait}
-import scala.collection.mutable.Stack
+import scala.concurrent.ExecutionContext
import scala.util.control.NonFatal
-private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
-
-}
private[concurrent] object Future {
-
- /** Wraps a block of code into an awaitable object. */
- private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def ready(atMost: Duration)(implicit permit: CanAwait) = {
- body
- this
- }
- def result(atMost: Duration)(implicit permit: CanAwait) = body
- }
-
- def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) scala.concurrent.Future.toBoxed(c) else c
-
- private[impl] class PromiseCompletingRunnable[T](body: => T)
- extends Runnable {
+ class PromiseCompletingRunnable[T](body: => T) extends Runnable {
val promise = new Promise.DefaultPromise[T]()
override def run() = {
@@ -44,7 +26,7 @@ private[concurrent] object Future {
}
}
- def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): scala.concurrent.Future[T] = {
val runnable = new PromiseCompletingRunnable(body)
executor.execute(runnable)
runnable.promise.future
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 84638586cf..c2df9ac296 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -18,7 +18,7 @@ import scala.util.control.NonFatal
-private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
+private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with scala.concurrent.Future[T] {
def future: this.type = this
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 76703bf081..a6488b602f 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -9,6 +9,7 @@
package scala
import scala.concurrent.util.Duration
+import scala.annotation.implicitNotFound
/** This package object contains primitives for concurrent and parallel programming.
*/
@@ -17,6 +18,41 @@ package object concurrent {
type CancellationException = java.util.concurrent.CancellationException
type TimeoutException = java.util.concurrent.TimeoutException
+ @implicitNotFound("Don't call `Awaitable` methods directly, use the `Await` object.")
+ sealed trait CanAwait
+ private implicit object AwaitPermission extends CanAwait
+
+ /**
+ * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances.
+ */
+ object Await {
+ /**
+ * Invokes ready() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
+ * ready() blocks until the awaitable has completed or the timeout expires.
+ *
+ * Throws a TimeoutException if the timeout expires, as that is in the contract of `Awaitable.ready`.
+ * @param awaitable the `Awaitable` on which `ready` is to be called
+ * @param atMost the maximum timeout for which to wait
+ * @return the result of `awaitable.ready` which is defined to be the awaitable itself.
+ */
+ @throws(classOf[TimeoutException])
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type =
+ blocking(awaitable.ready(atMost))
+
+ /**
+ * Invokes result() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
+ * result() blocks until the awaitable has completed or the timeout expires.
+ *
+ * Throws a TimeoutException if the timeout expires, or any exception thrown by `Awaitable.result`.
+ * @param awaitable the `Awaitable` on which `result` is to be called
+ * @param atMost the maximum timeout for which to wait
+ * @return the result of `awaitable.result`
+ */
+ @throws(classOf[Exception])
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T =
+ blocking(awaitable.result(atMost))
+ }
+
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
* The result becomes available once the asynchronous computation is completed.
@@ -36,46 +72,18 @@ package object concurrent {
*/
def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
- /** Used to block on a piece of code which potentially blocks.
+ /** Used to designate a piece of code which potentially blocks, allowing the BlockContext to adjust the runtime's behavior.
+ * Properly marking blocking code may improve performance or avoid deadlocks.
*
- * @param body A piece of code which contains potentially blocking or long running calls.
- *
- * Calling this method may throw the following exceptions:
- * - CancellationException - if the computation was cancelled
- * - InterruptedException - in the case that a wait within the blockable object was interrupted
- * - TimeoutException - in the case that the blockable object timed out
- */
- def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
-
- /** Blocks on an awaitable object.
+ * If you have an `Awaitable` then you should use Await.result instead of `blocking`.
*
- * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
+ * @param body A piece of code which contains potentially blocking or long running calls.
*
* Calling this method may throw the following exceptions:
* - CancellationException - if the computation was cancelled
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
- BlockContext.current.internalBlockingCall(awaitable, atMost)
-}
-
-/* concurrency constructs */
-package concurrent {
-
- sealed trait CanAwait
-
- object Await {
- private[concurrent] implicit val canAwaitEvidence = new CanAwait {}
-
- def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
- blocking(awaitable, atMost)
- awaitable
- }
-
- def result[T](awaitable: Awaitable[T], atMost: Duration): T = {
- blocking(awaitable, atMost)
- }
-
- }
+ @throws(classOf[Exception])
+ def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)
}
diff --git a/test/files/jvm/future-spec/FutureTests.scala b/test/files/jvm/future-spec/FutureTests.scala
index ca9ff5090f..30e1a722bf 100644
--- a/test/files/jvm/future-spec/FutureTests.scala
+++ b/test/files/jvm/future-spec/FutureTests.scala
@@ -507,6 +507,12 @@ object FutureTests extends MinimalScalaTest {
}
Await.ready(complex, defaultTimeout).isCompleted mustBe (true)
}
+
+ "should not throw when Await.ready" in {
+ val expected = try Right(5 / 0) catch { case a: ArithmeticException => Left(a) }
+ val f = future(5).map(_ / 0)
+ Await.ready(f, defaultTimeout).value.get.toString mustBe expected.toString
+ }
}
diff --git a/test/files/jvm/future-spec/PromiseTests.scala b/test/files/jvm/future-spec/PromiseTests.scala
index 49bc642b57..d15bb31f36 100644
--- a/test/files/jvm/future-spec/PromiseTests.scala
+++ b/test/files/jvm/future-spec/PromiseTests.scala
@@ -78,7 +78,7 @@ object PromiseTests extends MinimalScalaTest {
"contain a value" in { f((future, result) => future.value mustBe (Some(Right(result)))) }
- "return result with 'blocking'" in { f((future, result) => blocking(future, defaultTimeout) mustBe (result)) }
+ "return when ready with 'Await.ready'" in { f((future, result) => Await.ready(future, defaultTimeout).isCompleted mustBe (true)) }
"return result with 'Await.result'" in { f((future, result) => Await.result(future, defaultTimeout) mustBe (result)) }
@@ -163,12 +163,9 @@ object PromiseTests extends MinimalScalaTest {
})
}
- "throw exception with 'blocking'" in {
+ "throw not throw exception with 'Await.ready'" in {
f {
- (future, message) =>
- intercept[E] {
- blocking(future, defaultTimeout)
- }.getMessage mustBe (message)
+ (future, message) => Await.ready(future, defaultTimeout).isCompleted mustBe (true)
}
}
diff --git a/test/files/jvm/scala-concurrent-tck.scala b/test/files/jvm/scala-concurrent-tck.scala
index 5c9c71f3f8..1209b710b0 100644
--- a/test/files/jvm/scala-concurrent-tck.scala
+++ b/test/files/jvm/scala-concurrent-tck.scala
@@ -4,7 +4,9 @@ import scala.concurrent.{
TimeoutException,
SyncVar,
ExecutionException,
- ExecutionContext
+ ExecutionContext,
+ CanAwait,
+ Await
}
import scala.concurrent.{ future, promise, blocking }
import scala.util.{ Try, Success, Failure }
@@ -647,7 +649,7 @@ trait FutureProjections extends TestBase {
val f = future {
throw cause
}
- assert(blocking(f.failed, Duration(500, "ms")) == cause)
+ assert(Await.result(f.failed, Duration(500, "ms")) == cause)
done()
}
@@ -655,7 +657,7 @@ trait FutureProjections extends TestBase {
done =>
val f = future { 0 }
try {
- blocking(f.failed, Duration(500, "ms"))
+ Await.result(f.failed, Duration(500, "ms"))
assert(false)
} catch {
case nsee: NoSuchElementException => done()
@@ -678,7 +680,7 @@ trait Blocking extends TestBase {
def testAwaitSuccess(): Unit = once {
done =>
val f = future { 0 }
- blocking(f, Duration(500, "ms"))
+ Await.result(f, Duration(500, "ms"))
done()
}
@@ -689,7 +691,7 @@ trait Blocking extends TestBase {
throw cause
}
try {
- blocking(f, Duration(500, "ms"))
+ Await.result(f, Duration(500, "ms"))
assert(false)
} catch {
case t =>
@@ -708,7 +710,7 @@ trait BlockContexts extends TestBase {
import scala.concurrent.{ Await, Awaitable, BlockContext }
private def getBlockContext(body: => BlockContext): BlockContext = {
- blocking(Future { body }, Duration(500, "ms"))
+ Await.result(Future { body }, Duration(500, "ms"))
}
// test outside of an ExecutionContext
@@ -727,8 +729,7 @@ trait BlockContexts extends TestBase {
def testPushCustom(): Unit = {
val orig = BlockContext.current
val customBC = new BlockContext() {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- orig.internalBlockingCall(awaitable, atMost)
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk)
}
val bc = getBlockContext({
@@ -744,8 +745,7 @@ trait BlockContexts extends TestBase {
def testPopCustom(): Unit = {
val orig = BlockContext.current
val customBC = new BlockContext() {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- orig.internalBlockingCall(awaitable, atMost)
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = orig.blockOn(thunk)
}
val bc = getBlockContext({