summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/library/scala/concurrent/ForkJoinTaskImpl.scala185
-rw-r--r--src/library/scala/concurrent/Future.scala14
-rw-r--r--src/library/scala/concurrent/Promise.scala14
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala273
-rw-r--r--src/library/scala/concurrent/package.scala2
5 files changed, 286 insertions, 202 deletions
diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
deleted file mode 100644
index 9df4768ebb..0000000000
--- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala
+++ /dev/null
@@ -1,185 +0,0 @@
-package scala.concurrent
-
-
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
-import scala.util.{ Timeout, Duration }
-import scala.annotation.tailrec
-
-
-
-/* DONE: The challenge is to make ForkJoinPromise inherit from RecursiveAction
- * to avoid an object allocation per promise. This requires turning DefaultPromise
- * into a trait, i.e., removing its constructor parameters.
- */
-private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, body: => T, val timeout: Timeout)
-extends RecursiveAction with Task[T] with Future[T] {
-
- private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state")
- @volatile private var state: State[T] = _
-
- type Callback = Either[Throwable, T] => Any
-
- updater.set(this, Pending(List()))
-
- private def casState(oldv: State[T], newv: State[T]) = {
- updater.compareAndSet(this, oldv, newv)
- }
-
- @tailrec private def trySucceedState(res: T): List[Callback] = (updater.get(this): @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, Success(res))) trySucceedState(res) else cbs
- }
-
- @tailrec private def tryFailState(t: Throwable): List[Callback] = (updater.get(this): @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, Failure(t))) tryFailState(t) else cbs
- }
-
- private def dispatch[U](r: Runnable) = executionContext execute r
-
- private def processCallbacks(cbs: List[Callback], r: Either[Throwable, T]) =
- for (cb <- cbs) dispatch(new Runnable {
- override def run() = cb(r)
- })
-
- def compute(): Unit = {
- var cbs: List[Callback] = null
-
- try {
- val res = body
- processCallbacks(trySucceedState(res), Right(res))
- } catch {
- case t if isFutureThrowable(t) =>
- processCallbacks(tryFailState(t), Left(t))
- case t =>
- val ee = new ExecutionException(t)
- processCallbacks(tryFailState(ee), Left(ee))
- throw t
- }
- }
-
- def start(): Unit = {
- Thread.currentThread match {
- case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork()
- case _ => executionContext.pool.execute(this)
- }
- }
-
- def future: Future[T] = this
-
- def onComplete[U](callback: Either[Throwable, T] => U): this.type = {
- @tailrec def tryAddCallback(): Either[Throwable, T] = {
- updater.get(this) match {
- case p @ Pending(lst) =>
- val pt = p.asInstanceOf[Pending[T]]
- if (casState(pt, Pending(callback :: pt.callbacks))) null
- else tryAddCallback()
- case Success(res) => Right(res)
- case Failure(t) => Left(t)
- }
- }
-
- val res = tryAddCallback()
- if (res != null) dispatch(new Runnable {
- override def run() =
- try callback(res)
- catch handledFutureException andThen {
- t => Console.err.println(t)
- }
- })
-
- this
- }
-
- def isTimedout: Boolean = updater.get(this) match {
- case Failure(ft: FutureTimeoutException) => true
- case _ => false
- }
-
- // TODO FIXME: handle timeouts
- def await(atMost: Duration): this.type =
- await
-
- def await: this.type = {
- this.join()
- this
- }
-
- def tryCancel(): Unit =
- tryUnfork()
-
- def block()(implicit canblock: CanBlock): T = {
- join()
- (updater.get(this): @unchecked) match {
- case Success(r) => r
- case Failure(t) => throw t
- }
- }
-
-}
-
-
-private[concurrent] sealed abstract class State[T]
-
-
-case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends State[T]
-
-
-case class Success[T](result: T) extends State[T]
-
-
-case class Failure[T](throwable: Throwable) extends State[T]
-
-
-private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
- val pool = {
- val p = new ForkJoinPool
- p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
- def uncaughtException(t: Thread, throwable: Throwable) {
- Console.err.println(throwable.getMessage)
- throwable.printStackTrace(Console.err)
- }
- })
- p
- }
-
- @inline
- private def executeForkJoinTask(task: RecursiveAction) {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
- task.fork()
- else
- pool execute task
- }
-
- def execute(task: Runnable) {
- val action = new RecursiveAction { def compute() { task.run() } }
- executeForkJoinTask(action)
- }
-
- def task[T](body: => T): Task[T] = {
- new ForkJoinTaskImpl(this, body, Timeout.never)
- }
-
- def promise[T]: Promise[T] =
- null
-
- def blockingCall[T](b: Blockable[T]): T = b match {
- case fj: ForkJoinTaskImpl[_] if fj.executionContext.pool eq pool =>
- fj.block()
- case _ =>
- var res: T = null.asInstanceOf[T]
- @volatile var blockingDone = false
- // TODO add exception handling here!
- val mb = new ForkJoinPool.ManagedBlocker {
- def block() = {
- res = b.block()(CanBlockEvidence)
- blockingDone = true
- true
- }
- def isReleasable = blockingDone
- }
- ForkJoinPool.managedBlock(mb, true)
- res
- }
-
-}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index 19b29182bf..325350bddf 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -145,18 +145,6 @@ self =>
*/
def isTimedout: Boolean
- /** This `Future`'s timeout.
- *
- * $futureTimeout
- */
- def timeout: Timeout
-
- /** This `Future`'s timeout in nanoseconds.
- *
- * $futureTimeout
- */
- def timeoutInNanos = if (timeout.duration.isFinite) timeout.duration.toNanos else Long.MaxValue
-
/* Projections */
@@ -182,7 +170,6 @@ self =>
this
}
def isTimedout = self.isTimedout
- def timeout = self.timeout
def block()(implicit canblock: CanBlock) = try {
val res = self.block()
throw noSuchElem(res)
@@ -217,7 +204,6 @@ self =>
this
}
def isTimedout = self.isTimedout
- def timeout = self.timeout
def block()(implicit canblock: CanBlock) = try {
val res = self.block()
throw noSuchElemValue(res)
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index b244d8c5e0..fb80eb8f31 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -48,11 +48,21 @@ trait Promise[T] {
*
* $promiseCompletion
*/
- def fail(t: Throwable): Unit
-
+ def break(t: Throwable): Unit
+
+ /** Wraps a `Throwable` in an `ExecutionException` if necessary.
+ *
+ * $allowedThrowables
+ */
+ protected def wrap(t: Throwable): Throwable = t match {
+ case t: Throwable if isFutureThrowable(t) => t
+ case _ => new ExecutionException(t)
+ }
+
}
+
object Promise {
/*
/**
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
new file mode 100644
index 0000000000..ebb8f65ab6
--- /dev/null
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -0,0 +1,273 @@
+package scala.concurrent
+package default
+
+
+
+import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
+import scala.util.{ Timeout, Duration }
+import scala.annotation.tailrec
+
+
+
+private[concurrent] trait Completable[T] {
+ self: Future[T] =>
+
+ val executionContext: ExecutionContextImpl
+
+ type Callback = Either[Throwable, T] => Any
+
+ def getState: State[T]
+
+ def casState(oldv: State[T], newv: State[T]): Boolean
+
+ protected def dispatch[U](r: Runnable) = executionContext execute r
+
+ protected def processCallbacks(cbs: List[Callback], r: Either[Throwable, T]) =
+ for (cb <- cbs) dispatch(new Runnable {
+ override def run() = cb(r)
+ })
+
+ def future: Future[T] = self
+
+ def onComplete[U](callback: Either[Throwable, T] => U): this.type = {
+ @tailrec def tryAddCallback(): Either[Throwable, T] = {
+ getState match {
+ case p @ Pending(lst) =>
+ val pt = p.asInstanceOf[Pending[T]]
+ if (casState(pt, Pending(callback :: pt.callbacks))) null
+ else tryAddCallback()
+ case Success(res) => Right(res)
+ case Failure(t) => Left(t)
+ }
+ }
+
+ val res = tryAddCallback()
+ if (res != null) dispatch(new Runnable {
+ override def run() =
+ try callback(res)
+ catch handledFutureException andThen {
+ t => Console.err.println(t)
+ }
+ })
+
+ this
+ }
+
+ def isTimedout: Boolean = getState match {
+ case Failure(ft: FutureTimeoutException) => true
+ case _ => false
+ }
+
+}
+
+private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
+ extends Promise[T] with Future[T] with Completable[T] {
+
+ val executionContext: scala.concurrent.default.ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ case _ => null
+ }
+
+ /** Completes the promise with a value.
+ *
+ * @param value The value to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def fulfill(value: T): Unit = {
+ val cbs = tryCompleteState(Success(value))
+ if (cbs == null)
+ throw new IllegalStateException
+ else {
+ processCallbacks(cbs, Right(value))
+ this.synchronized {
+ this.notifyAll()
+ }
+ }
+ }
+
+ /** Completes the promise with an exception.
+ *
+ * @param t The throwable to complete the promise with.
+ *
+ * $promiseCompletion
+ */
+ def break(t: Throwable): Unit = {
+ val wrapped = wrap(t)
+ val cbs = tryCompleteState(Failure(wrapped))
+ if (cbs == null)
+ throw new IllegalStateException
+ else {
+ processCallbacks(cbs, Left(wrapped))
+ this.synchronized {
+ this.notifyAll()
+ }
+ }
+ }
+
+ def block()(implicit canblock: scala.concurrent.CanBlock): T = getState match {
+ case Success(res) => res
+ case Failure(t) => throw t
+ case _ =>
+ this.synchronized {
+ while (true)
+ getState match {
+ case Pending(_) => this.wait()
+ case Success(res) => return res
+ case Failure(t) => throw t
+ }
+ }
+ sys.error("unreachable")
+ }
+
+}
+
+private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
+ extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
+
+ val executionContext: ExecutionContextImpl = context
+
+ @volatile private var state: State[T] = _
+
+ val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state")
+
+ updater.set(this, Pending(List()))
+
+ def casState(oldv: State[T], newv: State[T]): Boolean = {
+ updater.compareAndSet(this, oldv, newv)
+ }
+
+ def getState: State[T] = {
+ updater.get(this)
+ }
+
+ @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
+ }
+
+ def compute(): Unit = {
+ var cbs: List[Callback] = null
+ try {
+ val res = body
+ processCallbacks(tryCompleteState(Success(res)), Right(res))
+ } catch {
+ case t if isFutureThrowable(t) =>
+ processCallbacks(tryCompleteState(Failure(t)), Left(t))
+ case t =>
+ val ee = new ExecutionException(t)
+ processCallbacks(tryCompleteState(Failure(ee)), Left(ee))
+ throw t
+ }
+ }
+
+ def start(): Unit = {
+ Thread.currentThread match {
+ case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork()
+ case _ => executionContext.pool.execute(this)
+ }
+ }
+
+ // TODO FIXME: handle timeouts
+ def await(atMost: Duration): this.type =
+ await
+
+ def await: this.type = {
+ this.join()
+ this
+ }
+
+ def tryCancel(): Unit =
+ tryUnfork()
+
+ def block()(implicit canblock: CanBlock): T = {
+ join()
+ (updater.get(this): @unchecked) match {
+ case Success(r) => r
+ case Failure(t) => throw t
+ }
+ }
+
+}
+
+
+private[concurrent] sealed abstract class State[T]
+
+
+case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends State[T]
+
+
+case class Success[T](result: T) extends State[T]
+
+
+case class Failure[T](throwable: Throwable) extends State[T]
+
+
+private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
+ val pool = {
+ val p = new ForkJoinPool
+ p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
+ def uncaughtException(t: Thread, throwable: Throwable) {
+ Console.err.println(throwable.getMessage)
+ throwable.printStackTrace(Console.err)
+ }
+ })
+ p
+ }
+
+ @inline
+ private def executeTask(task: RecursiveAction) {
+ if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
+ task.fork()
+ else
+ pool execute task
+ }
+
+ def execute(task: Runnable) {
+ val action = new RecursiveAction { def compute() { task.run() } }
+ executeTask(action)
+ }
+
+ def task[T](body: => T): Task[T] = {
+ new TaskImpl(this, body)
+ }
+
+ def promise[T]: Promise[T] =
+ null
+
+ def blockingCall[T](b: Blockable[T]): T = b match {
+ case fj: TaskImpl[_] if fj.executionContext.pool eq pool =>
+ fj.block()
+ case _ =>
+ var res: T = null.asInstanceOf[T]
+ @volatile var blockingDone = false
+ // TODO add exception handling here!
+ val mb = new ForkJoinPool.ManagedBlocker {
+ def block() = {
+ res = b.block()(CanBlockEvidence)
+ blockingDone = true
+ true
+ }
+ def isReleasable = blockingDone
+ }
+ ForkJoinPool.managedBlock(mb, true)
+ res
+ }
+
+}
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 63faeef502..900e872b51 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -22,7 +22,7 @@ package object concurrent {
type TimeoutException = java.util.concurrent.TimeoutException
lazy val executionContext =
- new ForkJoinExecutionContext
+ new default.ExecutionContextImpl
private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
override protected def initialValue = null