summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-07 14:55:49 +0100
committerAleksandar Prokopec <aleksandar.prokopec@gmail.com>2011-12-07 14:55:49 +0100
commit83d0ece42e4f0c5e41e3acae61e8e41db95defc5 (patch)
tree658d2f76227d479f44039c9da4443a26eef13af2
parent014a13d2be634bb3ab3468d0c071a9a870a9a9b0 (diff)
downloadscala-83d0ece42e4f0c5e41e3acae61e8e41db95defc5.tar.gz
scala-83d0ece42e4f0c5e41e3acae61e8e41db95defc5.tar.bz2
scala-83d0ece42e4f0c5e41e3acae61e8e41db95defc5.zip
Got the futures and execution contexts related code to compile.
-rw-r--r--src/library/scala/concurrent/DelayedLazyVal.scala3
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala10
-rw-r--r--src/library/scala/concurrent/ForkJoinTaskImpl.scala103
-rw-r--r--src/library/scala/concurrent/Future.scala57
-rw-r--r--src/library/scala/concurrent/Promise.scala4
-rw-r--r--src/library/scala/concurrent/package.scala10
6 files changed, 132 insertions, 55 deletions
diff --git a/src/library/scala/concurrent/DelayedLazyVal.scala b/src/library/scala/concurrent/DelayedLazyVal.scala
index e308c3b5a6..391ba7e314 100644
--- a/src/library/scala/concurrent/DelayedLazyVal.scala
+++ b/src/library/scala/concurrent/DelayedLazyVal.scala
@@ -8,7 +8,6 @@
package scala.concurrent
-import ops.future
/** A `DelayedLazyVal` is a wrapper for lengthy computations which have a
* valid partially computed result.
@@ -40,7 +39,7 @@ class DelayedLazyVal[T](f: () => T, body: => Unit) {
*/
def apply(): T = if (isDone) complete else f()
- future {
+ ops.future {
body
_isDone = true
}
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 34c14147f5..d54b6c370e 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -12,10 +12,11 @@ trait ExecutionContext {
def execute(task: Runnable): Unit
- def task[T](task: () => T)(implicit timeout: Timeout): Task[T]
+ def task[T](task: () => T): Task[T]
- def promise[T](implicit timeout: Timeout): Promise[T]
+ def promise[T]: Promise[T]
+ /** Only callable from the tasks running on the same execution context. */
def blockingCall[T](body: Blockable[T]): T
}
@@ -28,3 +29,8 @@ object ExecutionContext {
//lazy val forBlocking = new BlockingExecutionContext
}
+
+
+sealed trait CanBlock
+
+
diff --git a/src/library/scala/concurrent/ForkJoinTaskImpl.scala b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
index 6a33ca162a..bc09644aca 100644
--- a/src/library/scala/concurrent/ForkJoinTaskImpl.scala
+++ b/src/library/scala/concurrent/ForkJoinTaskImpl.scala
@@ -2,8 +2,9 @@ package scala.concurrent
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveTask => FJTask, RecursiveAction, ForkJoinWorkerThread }
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
+import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
+import scala.util.{ Timeout, Duration }
import scala.annotation.tailrec
@@ -13,40 +14,53 @@ import scala.annotation.tailrec
* into a trait, i.e., removing its constructor parameters.
*/
private[concurrent] class ForkJoinTaskImpl[T](val executionContext: ForkJoinExecutionContext, val body: () => T, val timeout: Timeout)
-extends FJTask[T] with Task[T] with Future[T] {
+extends RecursiveAction with Task[T] with Future[T] {
- private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[FJState[T]], "state")
+ private val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[ForkJoinTaskImpl[T]], classOf[State[T]], "state")
@volatile private var state: State[T] = _
+ type Callback = Either[Throwable, T] => Any
+
updater.set(this, Pending(List()))
private def casState(oldv: State[T], newv: State[T]) = {
updater.compareAndSet(this, oldv, newv)
}
- @tailrec private def trySucceedState(res: T): Unit = updater.get(this) match {
- case p @ Pending(cbs) => if (!casState(p, Success(res))) trySucceedState(res)
- case _ => // return
+ @tailrec private def trySucceedState(res: T): List[Callback] = (updater.get(this): @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, Success(res))) trySucceedState(res) else cbs
}
- @tailrec private def tryFailState(t: Throwable): Unit = updater.get(this) match {
- case p @ Pending(cbs) => if (!casState(p, Failure(t))) tryFailState(t)
- case _ => // return
+ @tailrec private def tryFailState(t: Throwable): List[Callback] = (updater.get(this): @unchecked) match {
+ case p @ Pending(cbs) => if (!casState(p, Failure(t))) tryFailState(t) else cbs
}
- // body of RecursiveTask
- def compute(): T = {
+ private def dispatch[U](r: Runnable) = executionContext execute r
+
+ private def processCallbacks(cbs: List[Callback], r: Either[Throwable, T]) =
+ for (cb <- cbs) dispatch(new Runnable {
+ override def run() = cb(r)
+ })
+
+ def compute(): Unit = {
+ var cbs: List[Callback] = null
+
try {
val res = body()
- trySucceedState(res)
- } catch handledFutureException andThen {
- t => tryFailState(t)
- } finally tryFailState(new ExecutionException)
+ processCallbacks(trySucceedState(res), Right(res))
+ } catch {
+ case t if isFutureThrowable(t) =>
+ processCallbacks(tryFailState(t), Left(t))
+ case t =>
+ val ee = new ExecutionException(t)
+ processCallbacks(tryFailState(ee), Left(ee))
+ throw t
+ }
}
def start(): Unit = {
Thread.currentThread match {
- case fj: ForkJoinWorkerThread if fj.pool eq executionContext.pool => fork()
+ case fj: ForkJoinWorkerThread if fj.getPool eq executionContext.pool => fork()
case _ => executionContext.pool.execute(this)
}
}
@@ -58,7 +72,7 @@ extends FJTask[T] with Task[T] with Future[T] {
updater.get(this) match {
case p @ Pending(lst) =>
val pt = p.asInstanceOf[Pending[T]]
- if (casState(pt, Pending(callback :: pt.lst))) null
+ if (casState(pt, Pending(callback :: pt.callbacks))) null
else tryAddCallback()
case Success(res) => Right(res)
case Failure(t) => Left(t)
@@ -66,16 +80,19 @@ extends FJTask[T] with Task[T] with Future[T] {
}
val res = tryAddCallback()
- if (res != null) dispatchTask new Runnable {
+ if (res != null) dispatch(new Runnable {
override def run() =
try callback(res)
catch handledFutureException
- }
+ })
+
+ this
}
- private def dispatchTask[U](r: Runnable) = executionContext execute r
-
- def isTimedout: Boolean = false // TODO
+ def isTimedout: Boolean = updater.get() match {
+ case Failure(ft: FutureTimeoutException) => true
+ case _ => false
+ }
// TODO FIXME: handle timeouts
def await(atMost: Duration): this.type =
@@ -89,19 +106,27 @@ extends FJTask[T] with Task[T] with Future[T] {
def tryCancel(): Unit =
tryUnfork()
+ def block()(implicit canblock: CanBlock): T = {
+ join()
+ (updater.get(this): @unchecked) match {
+ case Success(r) => r
+ case Failure(t) => throw t
+ }
+ }
+
}
-private[concurrent] sealed abstract class FJState[T]
+private[concurrent] sealed abstract class State[T]
-case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends FJState[T]
+case class Pending[T](callbacks: List[Either[Throwable, T] => Any]) extends State[T]
-case class Success[T](result: T) extends FJState[T]
+case class Success[T](result: T) extends State[T]
-case class Failure[T](throwable: Throwable) extends FJState[T]
+case class Failure[T](throwable: Throwable) extends State[T]
private[concurrent] final class ForkJoinExecutionContext extends ExecutionContext {
@@ -120,14 +145,30 @@ private[concurrent] final class ForkJoinExecutionContext extends ExecutionContex
executeForkJoinTask(action)
}
- def makeTask[T](body: () => T)(implicit timeout: Timeout): Task[T] = {
- new ForkJoinTaskImpl(this, body, timeout)
+ def task[T](body: () => T): Task[T] = {
+ new ForkJoinTaskImpl(this, body, Timeout.never)
}
- def makePromise[T](timeout: Timeout): Promise[T] =
+ def promise[T]: Promise[T] =
null
- def blockingCall[T](body: Blockable[T]): T =
- body.block()(CanBlockEvidence)
+ def blockingCall[T](b: Blockable[T]): T = b match {
+ case fj: ForkJoinTaskImpl[_] if fj.executionContext.pool eq pool =>
+ fj.block()
+ case _ =>
+ var res: T = null.asInstanceOf[T]
+ @volatile var blockingDone = false
+ // TODO add exception handling here!
+ val mb = new ForkJoinPool.ManagedBlocker {
+ def block() = {
+ res = b.block()(CanBlockEvidence)
+ blockingDone = true
+ true
+ }
+ def isReleasable = blockingDone
+ }
+ ForkJoinPool.managedBlock(mb, true)
+ res
+ }
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index b65d777d67..0680e87736 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -157,23 +157,51 @@ self =>
/* Projections */
def failed: Future[Throwable] = new Future[Throwable] {
- def newPromise[S] = self.newPromise[S]
- def onComplete[U](func: Either[Throwable, Throwable] => U) = self.onComplete {
- case Left(t) => func(Right(t))
- case Right(v) => // do nothing
+ def executionContext = self.executionContext
+ def onComplete[U](func: Either[Throwable, Throwable] => U) = {
+ self.onComplete {
+ case Left(t) => func(Right(t))
+ case Right(v) => func(Left(noSuchElem(v))) // do nothing
+ }
+ this
}
def isTimedout = self.isTimedout
def timeout = self.timeout
+ def block()(implicit canblock: CanBlock) = try {
+ val res = self.block()
+ throw noSuchElem(res)
+ } catch {
+ case t: Throwable => t
+ }
+ private def noSuchElem(v: T) =
+ new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
}
def timedout: Future[FutureTimeoutException] = new Future[FutureTimeoutException] {
- def newPromise[S] = self.newPromise[S]
- def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = self.onComplete {
- case Left(te: FutureTimeoutException) => func(Right(te))
- case _ => // do nothing
+ def executionContext = self.executionContext
+ def onComplete[U](func: Either[Throwable, FutureTimeoutException] => U) = {
+ self.onComplete {
+ case Left(te: FutureTimeoutException) => func(Right(te))
+ case Left(t) => func(Left(noSuchElemThrowable(t)))
+ case Right(v) => func(Left(noSuchElemValue(v)))
+ }
+ this
}
def isTimedout = self.isTimedout
def timeout = self.timeout
+ def block()(implicit canblock: CanBlock) = try {
+ val res = self.block() // TODO fix
+ throw noSuchElemValue(res)
+ } catch {
+ case ft: FutureTimeoutException =>
+ ft
+ case t: Throwable =>
+ throw noSuchElemThrowable(t)
+ }
+ private def noSuchElemValue(v: T) =
+ new NoSuchElementException("Future.timedout didn't time out. Instead completed with: " + v)
+ private def noSuchElemThrowable(v: Throwable) =
+ new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
}
@@ -206,9 +234,9 @@ self =>
*
* Will not be called if the future times out or fails.
*
- * This method typically registers an `onResult` callback.
+ * This method typically registers an `onSuccess` callback.
*/
- def foreach[U](f: T => U): Unit = onResult f
+ def foreach[U](f: T => U): Unit = onSuccess(f)
/** Creates a new future by applying a function to the successful result of
* this future. If this future is completed with an exception then the new
@@ -239,8 +267,9 @@ self =>
onComplete {
case Left(t) => p fail t
- case Right(f) => f onComplete {
- p fulfill _
+ case Right(v) => f(v) onComplete {
+ case Left(t) => p fail t
+ case Right(v) => p fulfill v
}
}
@@ -263,12 +292,12 @@ self =>
* block on h // throw a NoSuchElementException
* }}}
*/
- def filter(p: T => Boolean): Future[T] = {
+ def filter(pred: T => Boolean): Future[T] = {
val p = newPromise[T]
onComplete {
case Left(t) => p fail t
- case Right(v) => if (p(v)) p fulfill v else p fail new NoSuchElementException
+ case Right(v) => if (pred(v)) p fulfill v else p fail new NoSuchElementException("Future.filter predicate is not satisfied by: " + v)
}
p.future
diff --git a/src/library/scala/concurrent/Promise.scala b/src/library/scala/concurrent/Promise.scala
index c5336ab00f..898344cb66 100644
--- a/src/library/scala/concurrent/Promise.scala
+++ b/src/library/scala/concurrent/Promise.scala
@@ -10,6 +10,10 @@ package scala.concurrent
+import scala.util.Timeout
+
+
+
/** Promise is an object which can be completed with a value or failed
* with an exception.
*
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 51bb1ac3e0..b9e39a21a1 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -62,14 +62,14 @@ package object concurrent {
def future[T](body: =>T): Future[T] = null // TODO
val handledFutureException: PartialFunction[Throwable, Throwable] = {
- case t: Throwable if isFutureThrowable => t
+ case t: Throwable if isFutureThrowable(t) => t
}
// TODO rename appropriately and make public
private[concurrent] def isFutureThrowable(t: Throwable) = t match {
case e: Error => false
case t: scala.util.control.ControlThrowable => false
- case i: InterruptException => false
+ case i: InterruptedException => false
case _ => true
}
@@ -78,16 +78,14 @@ package object concurrent {
package concurrent {
- private[concurrent] trait CanBlock
-
/** A timeout exception.
*
* Futures are failed with a timeout exception when their timeout expires.
*
* Each timeout exception contains an origin future which originally timed out.
*/
- class FutureTimeoutException(origin: Future[T], message: String) extends TimeoutException(message) {
- def this(origin: Future[T]) = this(origin, "Future timed out.")
+ class FutureTimeoutException(origin: Future[_], message: String) extends TimeoutException(message) {
+ def this(origin: Future[_]) = this(origin, "Future timed out.")
}
}