summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authorAleksandar Prokopec <axel22@gmail.com>2012-03-28 19:17:49 +0200
committerAleksandar Prokopec <axel22@gmail.com>2012-03-28 19:17:49 +0200
commit47318105010786bc6eba835c957ce3cd4fe88d70 (patch)
tree2fe3398daaf39ad0c6e292e7a3656d8058ce15e0 /src/library
parent3e40fb5d94a4d185c93861b9caf86ac92a04ef2a (diff)
downloadscala-47318105010786bc6eba835c957ce3cd4fe88d70.tar.gz
scala-47318105010786bc6eba835c957ce3cd4fe88d70.tar.bz2
scala-47318105010786bc6eba835c957ce3cd4fe88d70.zip
Work on source compatibility between akka and scala futures.
Removed some methods from execution contexts. Changed Awaitable interface.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/Awaitable.scala9
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala14
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala91
-rw-r--r--src/library/scala/concurrent/Future.scala31
-rw-r--r--src/library/scala/concurrent/Promise.scala16
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala84
-rw-r--r--src/library/scala/concurrent/impl/Future.scala74
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala29
-rw-r--r--src/library/scala/concurrent/package.scala28
9 files changed, 135 insertions, 241 deletions
diff --git a/src/library/scala/concurrent/Awaitable.scala b/src/library/scala/concurrent/Awaitable.scala
index 6c9995eb05..052e6e2366 100644
--- a/src/library/scala/concurrent/Awaitable.scala
+++ b/src/library/scala/concurrent/Awaitable.scala
@@ -16,8 +16,13 @@ import scala.concurrent.util.Duration
trait Awaitable[+T] {
- @implicitNotFound(msg = "Waiting must be done by calling `blocking(timeout) b`, where `b` is the `Awaitable` object or a potentially blocking piece of code.")
- def await(atMost: Duration)(implicit canawait: CanAwait): T
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type
+
+ /**
+ * Throws exceptions if cannot produce a T within the specified time
+ * This method should not be called directly.
+ */
+ def result(atMost: Duration)(implicit permit: CanAwait): T
}
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index 3471095051..ba98757906 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -59,14 +59,18 @@ abstract class ConcurrentPackageObject {
/* concurrency constructs */
def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
- execCtx future body
+ Future[T](body)
def promise[T]()(implicit execCtx: ExecutionContext = executionContext): Promise[T] =
- execCtx promise
+ Promise[T]()
/** Wraps a block of code into an awaitable object. */
def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def await(atMost: Duration)(implicit cb: CanAwait) = body
+ def ready(atMost: Duration)(implicit permit: CanAwait) = {
+ body
+ this
+ }
+ def result(atMost: Duration)(implicit permit: CanAwait) = body
}
/** Used to block on a piece of code which potentially blocks.
@@ -78,8 +82,8 @@ abstract class ConcurrentPackageObject {
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](atMost: Duration)(body: =>T)(implicit execCtx: ExecutionContext): T =
- executionContext.blocking(atMost)(body)
+ def blocking[T](body: =>T)(implicit execCtx: ExecutionContext): T =
+ executionContext.blocking(body)
/** Blocks on an awaitable object.
*
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index c4a45f9fb5..a206a2d4ea 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -22,19 +22,11 @@ import collection._
trait ExecutionContext {
- protected implicit object CanAwaitEvidence extends CanAwait
-
def execute(runnable: Runnable): Unit
def execute[U](body: () => U): Unit
- def promise[T]: Promise[T]
-
- def future[T](body: Callable[T]): Future[T] = future(body.call())
-
- def future[T](body: => T): Future[T]
-
- def blocking[T](atMost: Duration)(body: =>T): T
+ def blocking[T](body: =>T): T
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T
@@ -44,89 +36,8 @@ trait ExecutionContext {
private implicit val executionContext = this
- def keptPromise[T](result: T): Promise[T] = {
- val p = promise[T]
- p success result
- }
-
- def brokenPromise[T](t: Throwable): Promise[T] = {
- val p = promise[T]
- p failure t
- }
-
- /** TODO some docs
- *
- */
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]]): Future[Coll[T]] = {
- import nondeterministic._
- val buffer = new mutable.ArrayBuffer[T]
- val counter = new AtomicInteger(1) // how else could we do this?
- val p: Promise[Coll[T]] = promise[Coll[T]] // we need an implicit execctx in the signature
- var idx = 0
-
- def tryFinish() = if (counter.decrementAndGet() == 0) {
- val builder = cbf(futures)
- builder ++= buffer
- p success builder.result
- }
-
- for (f <- futures) {
- val currentIndex = idx
- buffer += null.asInstanceOf[T]
- counter.incrementAndGet()
- f onComplete {
- case Failure(t) =>
- p tryFailure t
- case Success(v) =>
- buffer(currentIndex) = v
- tryFinish()
- }
- idx += 1
- }
-
- tryFinish()
-
- p.future
- }
-
- /** TODO some docs
- *
- */
- def any[T](futures: Traversable[Future[T]]): Future[T] = {
- val p = promise[T]
- val completeFirst: Try[T] => Unit = elem => p tryComplete elem
-
- futures foreach (_ onComplete completeFirst)
-
- p.future
- }
-
- /** TODO some docs
- *
- */
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean): Future[Option[T]] = {
- if (futures.isEmpty) Promise.kept[Option[T]](None).future
- else {
- val result = promise[Option[T]]
- val count = new AtomicInteger(futures.size)
- val search: Try[T] => Unit = {
- v => v match {
- case Success(r) => if (predicate(r)) result trySuccess Some(r)
- case _ =>
- }
- if (count.decrementAndGet() == 0) result trySuccess None
- }
-
- futures.foreach(_ onComplete search)
-
- result.future
- }
- }
-
}
-sealed trait CanAwait
-
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 1dc8e38355..fa4c61c227 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -235,8 +235,8 @@ self =>
* val f = future { 5 }
* val g = f filter { _ % 2 == 1 }
* val h = f filter { _ % 2 == 0 }
- * await(0) g // evaluates to 5
- * await(0) h // throw a NoSuchElementException
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
* }}}
*/
def filter(pred: T => Boolean): Future[T] = {
@@ -272,8 +272,8 @@ self =>
* val h = f collect {
* case x if x > 0 => x * 2
* }
- * await(0) g // evaluates to 5
- * await(0) h // throw a NoSuchElementException
+ * await(g, 0) // evaluates to 5
+ * await(h, 0) // throw a NoSuchElementException
* }}}
*/
def collect[S](pf: PartialFunction[T, S]): Future[S] = {
@@ -383,7 +383,7 @@ self =>
* val f = future { sys.error("failed") }
* val g = future { 5 }
* val h = f orElse g
- * await(0) h // evaluates to 5
+ * await(h, 0) // evaluates to 5
* }}}
*/
def fallbackTo[U >: T](that: Future[U]): Future[U] = {
@@ -445,7 +445,7 @@ self =>
* val f = future { sys.error("failed") }
* val g = future { 5 }
* val h = f either g
- * await(0) h // evaluates to either 5 or throws a runtime exception
+ * await(h, 0) // evaluates to either 5 or throws a runtime exception
* }}}
*/
def either[U >: T](that: Future[U]): Future[U] = {
@@ -466,26 +466,15 @@ self =>
-/** TODO some docs
+/** Future companion object.
*
* @define nonDeterministic
* Note: using this method yields nondeterministic dataflow programs.
*/
object Future {
-
- // TODO make more modular by encoding all other helper methods within the execution context
- /** TODO some docs
- */
- def all[T, Coll[X] <: Traversable[X]](futures: Coll[Future[T]])(implicit cbf: CanBuildFrom[Coll[_], T, Coll[T]], ec: ExecutionContext): Future[Coll[T]] =
- ec.all[T, Coll](futures)
-
- // move this to future companion object
- @inline def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = executor.future(body)
-
- def any[T](futures: Traversable[Future[T]])(implicit ec: ExecutionContext): Future[T] = ec.any(futures)
-
- def find[T](futures: Traversable[Future[T]])(predicate: T => Boolean)(implicit ec: ExecutionContext): Future[Option[T]] = ec.find(futures)(predicate)
-
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
+
}
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 4404e90971..61e21606e6 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -30,8 +30,6 @@ import scala.util.{ Try, Success, Failure }
*/
trait Promise[T] {
- import nondeterministic._
-
/** Future containing the value of this promise.
*/
def future: Future[T]
@@ -114,12 +112,18 @@ trait Promise[T] {
object Promise {
- def kept[T](result: T)(implicit execctx: ExecutionContext): Promise[T] =
- execctx keptPromise result
+ /** Creates a new promise.
+ */
+ def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]()
- def broken[T](t: Throwable)(implicit execctx: ExecutionContext): Promise[T] =
- execctx brokenPromise t
+ /** Creates an already completed Promise with the specified exception
+ */
+ def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Failure(exception))
+ /** Creates an already completed Promise with the specified result
+ */
+ def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Success(result))
+
}
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 8ac745fd25..5dc440f42b 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -15,7 +15,6 @@ import scala.concurrent.forkjoin._
import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
import scala.util.{ Try, Success, Failure }
import scala.concurrent.util.{ Duration }
-import scala.collection.mutable.Stack
@@ -38,32 +37,12 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
def run() = body()
})
- def promise[T]: Promise[T] = new Promise.DefaultPromise[T]()(this)
-
- def future[T](body: =>T): Future[T] = {
- val p = promise[T]
-
- dispatchFuture {
- () =>
- p complete {
- try {
- Success(body)
- } catch {
- case e => resolver(e)
- }
- }
- }
-
- p.future
- }
-
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
+ def blocking[T](body: =>T): T = blocking(body2awaitable(body), Duration.fromNanos(0))
def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x => x.blockingCall(awaitable) // inside an execution context thread
- }
+ Future.releaseStack(this)
+
+ awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence)
}
def reportFailure(t: Throwable) = t match {
@@ -71,61 +50,6 @@ private[scala] class ExecutionContextImpl(val executorService: AnyRef) extends E
case t => t.printStackTrace()
}
- /** Only callable from the tasks running on the same execution context. */
- private def blockingCall[T](body: Awaitable[T]): T = {
- releaseStack()
-
- // TODO see what to do with timeout
- body.await(Duration.fromNanos(0))(CanAwaitEvidence)
- }
-
- // an optimization for batching futures
- // TODO we should replace this with a public queue,
- // so that it can be stolen from
- // OR: a push to the local task queue should be so cheap that this is
- // not even needed, but stealing is still possible
- private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
-
- private def releaseStack(): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && stack.nonEmpty =>
- val tasks = stack.elems
- stack.clear()
- _taskStack.remove()
- dispatchFuture(() => _taskStack.get.elems = tasks, true)
- case null =>
- // do nothing - there is no local batching stack anymore
- case _ =>
- _taskStack.remove()
- }
-
- private[impl] def dispatchFuture(task: () => Unit, force: Boolean = false): Unit =
- _taskStack.get match {
- case stack if (stack ne null) && !force => stack push task
- case _ => this.execute(
- new Runnable {
- def run() {
- try {
- val taskStack = Stack[() => Unit](task)
- _taskStack set taskStack
- while (taskStack.nonEmpty) {
- val next = taskStack.pop()
- try {
- next.apply()
- } catch {
- case e =>
- // TODO catching all and continue isn't good for OOME
- reportFailure(e)
- }
- }
- } finally {
- _taskStack.remove()
- }
- }
- }
- )
- }
-
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index b4385ea34a..6833b2467f 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -8,13 +8,17 @@
package scala.concurrent.impl
+
+
import scala.concurrent.{Awaitable, ExecutionContext}
import scala.util.{ Try, Success, Failure }
-//import scala.util.continuations._
+import scala.collection.mutable.Stack
+
+
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
- implicit def executor: ExecutionContextImpl
+ implicit def executor: ExecutionContext
/** For use only within a Future.flow block or another compatible Delimited Continuations reset block.
*
@@ -40,7 +44,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
* that conforms to A's erased type or a ClassCastException otherwise.
*/
final def mapTo[T](implicit m: Manifest[T]) = {
- val p = executor.promise[T]
+ val p = new Promise.DefaultPromise[T]
onComplete {
case f @ Failure(t) => p complete f.asInstanceOf[Try[T]]
@@ -48,7 +52,7 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
p complete (try {
Success(Future.boxedType(m.erasure).cast(v).asInstanceOf[T])
} catch {
- case e: ClassCastException ⇒ Failure(e)
+ case e: ClassCastException => Failure(e)
})
}
@@ -86,4 +90,66 @@ object Future {
def boxedType(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
+
+ def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = {
+ val promise = new Promise.DefaultPromise[T]()
+ executor.execute(new Runnable {
+ def run = {
+ promise complete {
+ try {
+ Success(body)
+ } catch {
+ case e => scala.concurrent.resolver(e)
+ }
+ }
+ }
+ })
+ promise.future
+ }
+
+ // an optimization for batching futures
+ // TODO we should replace this with a public queue,
+ // so that it can be stolen from
+ // OR: a push to the local task queue should be so cheap that this is
+ // not even needed, but stealing is still possible
+ private val _taskStack = new ThreadLocal[Stack[() => Unit]]()
+
+ private[impl] def releaseStack(executor: ExecutionContext): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && stack.nonEmpty =>
+ val tasks = stack.elems
+ stack.clear()
+ _taskStack.remove()
+ dispatchFuture(executor, () => _taskStack.get.elems = tasks, true)
+ case null =>
+ // do nothing - there is no local batching stack anymore
+ case _ =>
+ _taskStack.remove()
+ }
+
+ private[impl] def dispatchFuture(executor: ExecutionContext, task: () => Unit, force: Boolean = false): Unit =
+ _taskStack.get match {
+ case stack if (stack ne null) && !force => stack push task
+ case _ => executor.execute(new Runnable {
+ def run() {
+ try {
+ val taskStack = Stack[() => Unit](task)
+ _taskStack set taskStack
+ while (taskStack.nonEmpty) {
+ val next = taskStack.pop()
+ try {
+ next.apply()
+ } catch {
+ case e =>
+ // TODO catching all and continue isn't good for OOME
+ executor.reportFailure(e)
+ }
+ }
+ } finally {
+ _taskStack.remove()
+ }
+ }
+ })
+ }
+
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index 4a983b5001..c79b0d02cc 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -26,7 +26,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
def future = this
- def newPromise[S]: Promise[S] = executor promise
+ def newPromise[S]: scala.concurrent.Promise[S] = new Promise.DefaultPromise()
// TODO refine answer and return types here from Any to type parameters
// then move this up in the hierarchy
@@ -75,6 +75,7 @@ private[concurrent] trait Promise[T] extends scala.concurrent.Promise[T] with Fu
object Promise {
+
def dur2long(dur: Duration): Long = if (dur.isFinite) dur.toNanos else Long.MaxValue
def EmptyPending[T](): FState[T] = emptyPendingValue.asInstanceOf[FState[T]]
@@ -101,7 +102,7 @@ object Promise {
/** Default promise implementation.
*/
- class DefaultPromise[T](implicit val executor: ExecutionContextImpl) extends AbstractPromise with Promise[T] {
+ class DefaultPromise[T](implicit val executor: ExecutionContext) extends AbstractPromise with Promise[T] {
self =>
updater.set(this, Promise.EmptyPending())
@@ -126,14 +127,14 @@ object Promise {
value.isDefined
}
- executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), Duration.fromNanos(0))
+ executor.blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
}
- private def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
if (value.isDefined || tryAwait(atMost)) this
else throw new TimeoutException("Futures timed out after [" + atMost.toMillis + "] milliseconds")
- def await(atMost: Duration)(implicit permit: CanAwait): T =
+ def result(atMost: Duration)(implicit permit: CanAwait): T =
ready(atMost).value.get match {
case util.Failure(e) => throw e
case util.Success(r) => r
@@ -176,9 +177,9 @@ object Promise {
case null => false
case cs if cs.isEmpty => true
case cs =>
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => cs.foreach(f => notifyCompleted(f, value))
- }
+ })
true
}
}
@@ -197,9 +198,9 @@ object Promise {
if (tryAddCallback()) {
val result = value.get
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => notifyCompleted(func, result)
- }
+ })
}
this
@@ -218,22 +219,22 @@ object Promise {
*
* Useful in Future-composition when a value to contribute is already available.
*/
- final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContextImpl) extends Promise[T] {
+ final class KeptPromise[T](suppliedValue: Try[T])(implicit val executor: ExecutionContext) extends Promise[T] {
val value = Some(resolve(suppliedValue))
def tryComplete(value: Try[T]): Boolean = false
def onComplete[U](func: Try[T] => U): this.type = {
val completedAs = value.get
- executor dispatchFuture {
+ Future.dispatchFuture(executor, {
() => func(completedAs)
- }
+ })
this
}
- private def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
+ def ready(atMost: Duration)(implicit permit: CanAwait): this.type = this
- def await(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
+ def result(atMost: Duration)(implicit permit: CanAwait): T = value.get match {
case util.Failure(e) => throw e
case util.Success(r) => r
}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 204b3f2673..e2ae17498f 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -20,27 +20,17 @@ package object concurrent extends scala.concurrent.ConcurrentPackageObject {
}
package concurrent {
- object await {
- def ready[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): Awaitable[T] = {
- try blocking(awaitable, atMost)
- catch { case _ => }
- awaitable
- }
-
- def result[T](atMost: Duration)(awaitable: Awaitable[T])(implicit execCtx: ExecutionContext = executionContext): T = {
- blocking(awaitable, atMost)
- }
+
+ sealed trait CanAwait
+
+ object Await {
+ private[concurrent] implicit val canAwaitEvidence = new CanAwait {}
+
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): Awaitable[T] = awaitable.ready(atMost)
+
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T = awaitable.result(atMost)
}
- /** Importing this object allows using some concurrency primitives
- * on futures and promises that can yield nondeterministic programs.
- *
- * While program determinism is broken when using these primitives,
- * some programs cannot be written without them (e.g. multiple client threads
- * cannot send requests to a server thread through regular promises and futures).
- */
- object nondeterministic { }
-
/** A timeout exception.
*
* Futures are failed with a timeout exception when their timeout expires.