diff options
author | brijest <brijest@tsf3-028.epfl.ch> | 2012-01-16 16:16:08 +0100 |
---|---|---|
committer | brijest <brijest@tsf3-028.epfl.ch> | 2012-01-16 16:16:08 +0100 |
commit | 031eea9cb2b7ff00f70f9adb8d8da371bd013bfe (patch) | |
tree | a66de2a6d297af652e35126cb322bc5a38e98493 /src | |
parent | 8b5f05ac364dd13f6b0443690825adc382ff8fc7 (diff) | |
download | scala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.tar.gz scala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.tar.bz2 scala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.zip |
Work in progress.
Diffstat (limited to 'src')
6 files changed, 48 insertions, 19 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala index 078b05c517..2650022e1e 100644 --- a/src/library/scala/concurrent/ExecutionContext.scala +++ b/src/library/scala/concurrent/ExecutionContext.scala @@ -30,8 +30,9 @@ trait ExecutionContext { def future[T](body: => T): Future[T] - /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](body: Awaitable[T]): T + def blocking[T](atMost: Duration)(body: =>T): T + + def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T } diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala index 28638d1247..1cff7211f3 100644 --- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala +++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala @@ -44,8 +44,17 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo p.future } + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(awaitable) // inside an execution context thread + } + } + /** Only callable from the tasks running on the same execution context. */ - def blockingCall[T](body: Awaitable[T]): T = { + private def blockingCall[T](body: Awaitable[T]): T = { releaseStack() // TODO see what to do with timeout diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala index c48009554c..5036768d36 100644 --- a/src/library/scala/concurrent/akka/Future.scala +++ b/src/library/scala/concurrent/akka/Future.scala @@ -11,7 +11,7 @@ package scala.concurrent.akka import scala.concurrent.{Awaitable, ExecutionContext} -import scala.util.continuations._ +//import scala.util.continuations._ @@ -24,7 +24,7 @@ trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] { * Returns the result of this Future without blocking, by suspending execution and storing it as a * continuation until the result is available. */ - def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) + //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any])) /** Tests whether this Future has been completed. */ diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala index 3b77f14f70..e7b6b50aeb 100644 --- a/src/library/scala/concurrent/akka/Promise.scala +++ b/src/library/scala/concurrent/akka/Promise.scala @@ -13,7 +13,7 @@ package scala.concurrent.akka import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS } import java.util.concurrent.atomic.AtomicReferenceFieldUpdater import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException} -import scala.util.continuations._ +//import scala.util.continuations._ import scala.util.Duration import scala.annotation.tailrec @@ -25,7 +25,7 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { // TODO refine answer and return types here from Any to type parameters // then move this up in the hierarchy - + /* final def <<(value: T): Future[T] @cps[Future[Any]] = shift { cont: (Future[T] => Future[Any]) => cont(complete(Right(value))) @@ -47,7 +47,7 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] { p.future } - + */ // TODO finish this once we introduce something like dataflow streams /* diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala index a38541df5d..4bddb740ef 100644 --- a/src/library/scala/concurrent/default/TaskImpl.scala +++ b/src/library/scala/concurrent/default/TaskImpl.scala @@ -257,7 +257,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext { def promise[T]: Promise[T] = new PromiseImpl[T](this) - def blockingCall[T](b: Awaitable[T]): T = b match { + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + + def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = { + currentExecutionContext.get match { + case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case + case x => x.blockingCall(awaitable) // inside an execution context thread + } + } + + private def blockingCall[T](b: Awaitable[T]): T = b match { case fj: TaskImpl[_] if fj.executor.pool eq pool => fj.await(Duration.fromNanos(0)) case _ => diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala index 7552100af2..d40eb4e2a1 100644 --- a/src/library/scala/concurrent/package.scala +++ b/src/library/scala/concurrent/package.scala @@ -76,6 +76,11 @@ package object concurrent { def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] = execCtx promise + /** Wraps a block of code into an awaitable object. */ + def body2awaitable[T](body: =>T) = new Awaitable[T] { + def await(atMost: Duration)(implicit cb: CanAwait) = body + } + /** Used to block on a piece of code which potentially blocks. * * @param body A piece of code which contains potentially blocking or long running calls. @@ -85,14 +90,10 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ - def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext = executionContext): T = + executionContext.blocking(atMost)(body) - /** Wraps a block of code into an awaitable object. */ - def body2awaitable[T](body: =>T) = new Awaitable[T] { - def await(atMost: Duration)(implicit cb: CanAwait) = body - } - - /** Blocks on a blockable object. + /** Blocks on an awaitable object. * * @param awaitable An object with a `block` method which runs potentially blocking or long running calls. * @@ -101,23 +102,32 @@ package object concurrent { * - InterruptedException - in the case that a wait within the blockable object was interrupted * - TimeoutException - in the case that the blockable object timed out */ + def blocking[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = + executionContext.blocking(atMost)(awaitable) + + /* + def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost) + def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = { currentExecutionContext.get match { case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case case x => x.blockingCall(awaitable) // inside an execution context thread } } + */ object await { - def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = { + def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = { try blocking(awaitable, atMost) catch { case _ => } awaitable } - def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable, atMost) + def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = { + blocking(awaitable, atMost) + } } - + } |