summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorbrijest <brijest@tsf3-028.epfl.ch>2012-01-16 16:16:08 +0100
committerbrijest <brijest@tsf3-028.epfl.ch>2012-01-16 16:16:08 +0100
commit031eea9cb2b7ff00f70f9adb8d8da371bd013bfe (patch)
treea66de2a6d297af652e35126cb322bc5a38e98493
parent8b5f05ac364dd13f6b0443690825adc382ff8fc7 (diff)
downloadscala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.tar.gz
scala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.tar.bz2
scala-031eea9cb2b7ff00f70f9adb8d8da371bd013bfe.zip
Work in progress.
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala5
-rw-r--r--src/library/scala/concurrent/akka/ExecutionContextImpl.scala11
-rw-r--r--src/library/scala/concurrent/akka/Future.scala4
-rw-r--r--src/library/scala/concurrent/akka/Promise.scala6
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala11
-rw-r--r--src/library/scala/concurrent/package.scala30
6 files changed, 48 insertions, 19 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 078b05c517..2650022e1e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -30,8 +30,9 @@ trait ExecutionContext {
def future[T](body: => T): Future[T]
- /** Only callable from the tasks running on the same execution context. */
- def blockingCall[T](body: Awaitable[T]): T
+ def blocking[T](atMost: Duration)(body: =>T): T
+
+ def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T
}
diff --git a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
index 28638d1247..1cff7211f3 100644
--- a/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/akka/ExecutionContextImpl.scala
@@ -44,8 +44,17 @@ class ExecutionContextImpl(executorService: ExecutorService) extends ExecutionCo
p.future
}
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
+ def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = {
+ currentExecutionContext.get match {
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(awaitable) // inside an execution context thread
+ }
+ }
+
/** Only callable from the tasks running on the same execution context. */
- def blockingCall[T](body: Awaitable[T]): T = {
+ private def blockingCall[T](body: Awaitable[T]): T = {
releaseStack()
// TODO see what to do with timeout
diff --git a/src/library/scala/concurrent/akka/Future.scala b/src/library/scala/concurrent/akka/Future.scala
index c48009554c..5036768d36 100644
--- a/src/library/scala/concurrent/akka/Future.scala
+++ b/src/library/scala/concurrent/akka/Future.scala
@@ -11,7 +11,7 @@ package scala.concurrent.akka
import scala.concurrent.{Awaitable, ExecutionContext}
-import scala.util.continuations._
+//import scala.util.continuations._
@@ -24,7 +24,7 @@ trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
* Returns the result of this Future without blocking, by suspending execution and storing it as a
* continuation until the result is available.
*/
- def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
+ //def apply(): T @cps[Future[Any]] = shift(this flatMap (_: T => Future[Any]))
/** Tests whether this Future has been completed.
*/
diff --git a/src/library/scala/concurrent/akka/Promise.scala b/src/library/scala/concurrent/akka/Promise.scala
index 3b77f14f70..e7b6b50aeb 100644
--- a/src/library/scala/concurrent/akka/Promise.scala
+++ b/src/library/scala/concurrent/akka/Promise.scala
@@ -13,7 +13,7 @@ package scala.concurrent.akka
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException}
-import scala.util.continuations._
+//import scala.util.continuations._
import scala.util.Duration
import scala.annotation.tailrec
@@ -25,7 +25,7 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
// TODO refine answer and return types here from Any to type parameters
// then move this up in the hierarchy
-
+ /*
final def <<(value: T): Future[T] @cps[Future[Any]] = shift {
cont: (Future[T] => Future[Any]) =>
cont(complete(Right(value)))
@@ -47,7 +47,7 @@ trait Promise[T] extends scala.concurrent.Promise[T] with Future[T] {
p.future
}
-
+ */
// TODO finish this once we introduce something like dataflow streams
/*
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index a38541df5d..4bddb740ef 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -257,7 +257,16 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
def promise[T]: Promise[T] =
new PromiseImpl[T](this)
- def blockingCall[T](b: Awaitable[T]): T = b match {
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
+ def blocking[T](atMost: Duration)(awaitable: Awaitable[T]): T = {
+ currentExecutionContext.get match {
+ case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(awaitable) // inside an execution context thread
+ }
+ }
+
+ private def blockingCall[T](b: Awaitable[T]): T = b match {
case fj: TaskImpl[_] if fj.executor.pool eq pool =>
fj.await(Duration.fromNanos(0))
case _ =>
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 7552100af2..d40eb4e2a1 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -76,6 +76,11 @@ package object concurrent {
def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
execCtx promise
+ /** Wraps a block of code into an awaitable object. */
+ def body2awaitable[T](body: =>T) = new Awaitable[T] {
+ def await(atMost: Duration)(implicit cb: CanAwait) = body
+ }
+
/** Used to block on a piece of code which potentially blocks.
*
* @param body A piece of code which contains potentially blocking or long running calls.
@@ -85,14 +90,10 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+ def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext = executionContext): T =
+ executionContext.blocking(atMost)(body)
- /** Wraps a block of code into an awaitable object. */
- def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def await(atMost: Duration)(implicit cb: CanAwait) = body
- }
-
- /** Blocks on a blockable object.
+ /** Blocks on an awaitable object.
*
* @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
*
@@ -101,23 +102,32 @@ package object concurrent {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
+ def blocking[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T =
+ executionContext.blocking(atMost)(awaitable)
+
+ /*
+ def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
currentExecutionContext.get match {
case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
case x => x.blockingCall(awaitable) // inside an execution context thread
}
}
+ */
object await {
- def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = {
+ def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = {
try blocking(awaitable, atMost)
catch { case _ => }
awaitable
}
- def result[T](awaitable: Awaitable[T], atMost: Duration): T = blocking(awaitable, atMost)
+ def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = {
+ blocking(awaitable, atMost)
+ }
}
-
+
}