summaryrefslogtreecommitdiff
path: root/src/library
diff options
context:
space:
mode:
authoraleksandar <aleksandar@aleksandar-Latitude-E6500.(none)>2012-04-12 20:04:57 +0200
committeraleksandar <aleksandar@aleksandar-Latitude-E6500.(none)>2012-04-12 20:04:57 +0200
commitc7a71c2d5c5afbb3dc047bca20c4b8c72e5c94c9 (patch)
treee3ba24a89c59b4178073c0b4c482b9af84307e13 /src/library
parentaf71b10083ed8f91b9735e363651a64149a5ca89 (diff)
downloadscala-c7a71c2d5c5afbb3dc047bca20c4b8c72e5c94c9.tar.gz
scala-c7a71c2d5c5afbb3dc047bca20c4b8c72e5c94c9.tar.bz2
scala-c7a71c2d5c5afbb3dc047bca20c4b8c72e5c94c9.zip
Making changes in the scala.concurrent package.
Diffstat (limited to 'src/library')
-rw-r--r--src/library/scala/concurrent/ConcurrentPackageObject.scala60
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala21
-rw-r--r--src/library/scala/concurrent/Future.scala9
-rw-r--r--src/library/scala/concurrent/Promise.scala18
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala20
-rw-r--r--src/library/scala/concurrent/impl/Future.scala13
-rw-r--r--src/library/scala/concurrent/impl/Promise.scala8
7 files changed, 96 insertions, 53 deletions
diff --git a/src/library/scala/concurrent/ConcurrentPackageObject.scala b/src/library/scala/concurrent/ConcurrentPackageObject.scala
index d185ade8a4..789738e6ec 100644
--- a/src/library/scala/concurrent/ConcurrentPackageObject.scala
+++ b/src/library/scala/concurrent/ConcurrentPackageObject.scala
@@ -36,34 +36,42 @@ abstract class ConcurrentPackageObject {
case _ => true
}
- private[concurrent] def resolve[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
- case Left(t: scala.runtime.NonLocalReturnControl[_]) => Right(t.value.asInstanceOf[T])
- case Left(t: scala.util.control.ControlThrowable) => Left(new ExecutionException("Boxed ControlThrowable", t))
- case Left(t: InterruptedException) => Left(new ExecutionException("Boxed InterruptedException", t))
- case Left(e: Error) => Left(new ExecutionException("Boxed Error", e))
- case _ => source
+ private[concurrent] def resolveEither[T](source: Either[Throwable, T]): Either[Throwable, T] = source match {
+ case Left(t) => resolver(t)
+ case _ => source
}
- private[concurrent] def resolver[T] =
- resolverFunction.asInstanceOf[PartialFunction[Throwable, Either[Throwable, T]]]
-
+ private[concurrent] def resolver[T](throwable: Throwable): Either[Throwable, T] = throwable match {
+ case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value.asInstanceOf[T])
+ case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
+ case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
+ case e: Error => Left(new ExecutionException("Boxed Error", e))
+ case t => Left(t)
+ }
+
/* concurrency constructs */
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
def future[T](body: =>T)(implicit execctx: ExecutionContext = defaultExecutionContext): Future[T] =
Future[T](body)
+ /** Creates a promise object which can be completed with a value.
+ *
+ * @tparam T the type of the value in the promise
+ * @param execctx the execution context on which the promise is created on
+ * @return the newly created `Promise` object
+ */
def promise[T]()(implicit execctx: ExecutionContext = defaultExecutionContext): Promise[T] =
Promise[T]()
- /** Wraps a block of code into an awaitable object. */
- def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def ready(atMost: Duration)(implicit permit: CanAwait) = {
- body
- this
- }
- def result(atMost: Duration)(implicit permit: CanAwait) = body
- }
-
/** Used to block on a piece of code which potentially blocks.
*
* @param body A piece of code which contains potentially blocking or long running calls.
@@ -74,7 +82,7 @@ abstract class ConcurrentPackageObject {
* - TimeoutException - in the case that the blockable object timed out
*/
def blocking[T](body: =>T): T =
- blocking(body2awaitable(body), Duration.fromNanos(0))
+ blocking(impl.Future.body2awaitable(body), Duration.fromNanos(0))
/** Blocks on an awaitable object.
*
@@ -100,11 +108,11 @@ private[concurrent] object ConcurrentPackageObject {
// compiling a subset of sources; it seems that the wildcard is not
// properly handled, and you get messages like "type _$1 defined twice".
// This is consistent with other package object breakdowns.
- private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = {
- case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value)
- case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
- case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
- case e: Error => Left(new ExecutionException("Boxed Error", e))
- case t => Left(t)
- }
+ // private val resolverFunction: PartialFunction[Throwable, Either[Throwable, _]] = {
+ // case t: scala.runtime.NonLocalReturnControl[_] => Right(t.value)
+ // case t: scala.util.control.ControlThrowable => Left(new ExecutionException("Boxed ControlThrowable", t))
+ // case t: InterruptedException => Left(new ExecutionException("Boxed InterruptedException", t))
+ // case e: Error => Left(new ExecutionException("Boxed Error", e))
+ // case t => Left(t)
+ // }
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index e1d4276396..3f62f58bf8 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -20,19 +20,22 @@ import collection._
trait ExecutionContext {
-
+
+ /** Runs a block of code on this execution context.
+ */
def execute(runnable: Runnable): Unit
-
- def execute[U](body: () => U): Unit
-
+
+ /** Used internally by the framework - blocks execution for at most `atMost` time while waiting
+ * for an `awaitable` object to become ready.
+ *
+ * Clients should use `scala.concurrent.blocking` instead.
+ */
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+ /** Reports that an asynchronous computation failed.
+ */
def reportFailure(t: Throwable): Unit
-
- /* implementations follow */
-
- private implicit val executionContext = this
-
+
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 5bc9ad783f..1463dbcebf 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -512,6 +512,15 @@ trait Future[+T] extends Awaitable[T] {
*/
object Future {
+ /** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
+ *
+ * The result becomes available once the asynchronous computation is completed.
+ *
+ * @tparam T the type of the result
+ * @param body the asychronous computation
+ * @param execctx the execution context on which the future is run
+ * @return the `Future` holding the result of the computation
+ */
def apply[T](body: =>T)(implicit executor: ExecutionContext): Future[T] = impl.Future(body)
import scala.collection.mutable.Builder
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index 8f2bce5d1a..cd22a55ce7 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -107,15 +107,27 @@ trait Promise[T] {
object Promise {
- /** Creates a new promise.
+ /** Creates a promise object which can be completed with a value.
+ *
+ * @tparam T the type of the value in the promise
+ * @param execctx the execution context on which the promise is created on
+ * @return the newly created `Promise` object
*/
def apply[T]()(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.DefaultPromise[T]()
- /** Creates an already completed Promise with the specified exception
+ /** Creates an already completed Promise with the specified exception.
+ *
+ * @tparam T the type of the value in the promise
+ * @param execctx the execution context on which the promise is created on
+ * @return the newly created `Promise` object
*/
def failed[T](exception: Throwable)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Left(exception))
- /** Creates an already completed Promise with the specified result
+ /** Creates an already completed Promise with the specified result.
+ *
+ * @tparam T the type of the value in the promise
+ * @param execctx the execution context on which the promise is created on
+ * @return the newly created `Promise` object
*/
def successful[T](result: T)(implicit executor: ExecutionContext): Promise[T] = new impl.Promise.KeptPromise[T](Right(result))
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index c5062267dc..d15a9b828b 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -12,7 +12,7 @@ package scala.concurrent.impl
import java.util.concurrent.{Callable, Executor, ExecutorService, Executors, ThreadFactory}
import scala.concurrent.forkjoin._
-import scala.concurrent.{ExecutionContext, resolver, Awaitable, body2awaitable}
+import scala.concurrent.{ExecutionContext, resolver, Awaitable}
import scala.concurrent.util.{ Duration }
@@ -56,20 +56,20 @@ private[scala] class ExecutionContextImpl(es: AnyRef) extends ExecutionContext w
def execute(runnable: Runnable): Unit = executorService match {
case fj: ForkJoinPool =>
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread]) {
- val fjtask = ForkJoinTask.adapt(runnable)
- fjtask.fork
- } else {
- fj.execute(runnable)
+ Thread.currentThread match {
+ case fjw: ForkJoinWorkerThread if fjw.getPool eq fj =>
+ val fjtask = runnable match {
+ case fjt: ForkJoinTask[_] => fjt
+ case _ => ForkJoinTask.adapt(runnable)
+ }
+ fjtask.fork
+ case _ =>
+ fj.execute(runnable)
}
case executor: Executor =>
executor execute runnable
}
- def execute[U](body: () => U): Unit = execute(new Runnable {
- def run() = body()
- })
-
def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
Future.releaseStack(this)
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 615ab061a5..a3c8ed3095 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -10,9 +10,11 @@ package scala.concurrent.impl
-import scala.concurrent.{Awaitable, ExecutionContext}
+import scala.concurrent.util.Duration
+import scala.concurrent.{Awaitable, ExecutionContext, CanAwait}
import scala.collection.mutable.Stack
+
private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awaitable[T] {
implicit def executor: ExecutionContext
@@ -54,6 +56,15 @@ object Future {
classOf[Unit] -> classOf[scala.runtime.BoxedUnit]
)
+ /** Wraps a block of code into an awaitable object. */
+ private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] {
+ def ready(atMost: Duration)(implicit permit: CanAwait) = {
+ body
+ this
+ }
+ def result(atMost: Duration)(implicit permit: CanAwait) = body
+ }
+
def boxedType(c: Class[_]): Class[_] = {
if (c.isPrimitive) toBoxed(c) else c
}
diff --git a/src/library/scala/concurrent/impl/Promise.scala b/src/library/scala/concurrent/impl/Promise.scala
index f7e073cb78..07b6d1f278 100644
--- a/src/library/scala/concurrent/impl/Promise.scala
+++ b/src/library/scala/concurrent/impl/Promise.scala
@@ -12,7 +12,7 @@ package scala.concurrent.impl
import java.util.concurrent.TimeUnit.{ NANOSECONDS, MILLISECONDS }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
-import scala.concurrent.{Awaitable, ExecutionContext, resolve, resolver, blocking, CanAwait, TimeoutException}
+import scala.concurrent.{Awaitable, ExecutionContext, resolveEither, resolver, blocking, CanAwait, TimeoutException}
//import scala.util.continuations._
import scala.concurrent.util.Duration
import scala.util
@@ -126,7 +126,7 @@ object Promise {
value.isDefined
}
- blocking(concurrent.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
+ blocking(Future.body2awaitable(awaitUnsafe(dur2long(atMost))), atMost)
}
def ready(atMost: Duration)(implicit permit: CanAwait): this.type =
@@ -166,7 +166,7 @@ object Promise {
case _ => null
}
}
- tryComplete(resolve(value))
+ tryComplete(resolveEither(value))
} finally {
synchronized { notifyAll() } // notify any blockers from `tryAwait`
}
@@ -220,7 +220,7 @@ object Promise {
*/
final class KeptPromise[T](suppliedValue: Either[Throwable, T])(implicit val executor: ExecutionContext) extends Promise[T] {
- val value = Some(resolve(suppliedValue))
+ val value = Some(resolveEither(suppliedValue))
def tryComplete(value: Either[Throwable, T]): Boolean = false