summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorViktor Klang <viktor.klang@gmail.com>2012-07-19 15:27:28 +0200
committerphaller <hallerp@gmail.com>2012-07-19 16:56:39 +0200
commitfaf0f3de05e79af3fd7b5cf3bc3f97331e25042e (patch)
tree6b380cb7721af12c8290d56e248752649e766b7b /src
parente75e862006b78d1a159c4ffb6194340492bd2de5 (diff)
downloadscala-faf0f3de05e79af3fd7b5cf3bc3f97331e25042e.tar.gz
scala-faf0f3de05e79af3fd7b5cf3bc3f97331e25042e.tar.bz2
scala-faf0f3de05e79af3fd7b5cf3bc3f97331e25042e.zip
Critical bugfixes/leak fixes/API corrections + ScalaDoc for SIP-14
Diffstat (limited to 'src')
-rw-r--r--src/library/scala/concurrent/BlockContext.scala35
-rw-r--r--src/library/scala/concurrent/Future.scala6
-rw-r--r--src/library/scala/concurrent/default/SchedulerImpl.scala.disabled44
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala.disabled313
-rw-r--r--src/library/scala/concurrent/impl/ExecutionContextImpl.scala10
-rw-r--r--src/library/scala/concurrent/impl/Future.scala9
-rw-r--r--src/library/scala/concurrent/package.scala76
7 files changed, 66 insertions, 427 deletions
diff --git a/src/library/scala/concurrent/BlockContext.scala b/src/library/scala/concurrent/BlockContext.scala
index a5b878c546..640560a174 100644
--- a/src/library/scala/concurrent/BlockContext.scala
+++ b/src/library/scala/concurrent/BlockContext.scala
@@ -12,9 +12,10 @@ import java.lang.Thread
import scala.concurrent.util.Duration
/**
- * A context to be notified by `scala.concurrent.blocking()` when
+ * A context to be notified by `scala.concurrent.blocking` when
* a thread is about to block. In effect this trait provides
- * the implementation for `scala.concurrent.blocking()`. `scala.concurrent.blocking()`
+ * the implementation for `scala.concurrent.Await`.
+ * `scala.concurrent.Await.result()` and `scala.concurrent.Await.ready()`
* locates an instance of `BlockContext` by first looking for one
* provided through `BlockContext.withBlockContext()` and failing that,
* checking whether `Thread.currentThread` is an instance of `BlockContext`.
@@ -27,11 +28,11 @@ import scala.concurrent.util.Duration
* {{{
* val oldContext = BlockContext.current
* val myContext = new BlockContext {
- * override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ * override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
* // you'd have code here doing whatever you need to do
* // when the thread is about to block.
* // Then you'd chain to the previous context:
- * oldContext.internalBlockingCall(awaitable, atMost)
+ * oldContext.blockOn(thunk)
* }
* }
* BlockContext.withBlockContext(myContext) {
@@ -42,35 +43,33 @@ import scala.concurrent.util.Duration
*/
trait BlockContext {
- /** Used internally by the framework; blocks execution for at most
- * `atMost` time while waiting for an `awaitable` object to become ready.
+ /** Used internally by the framework;
+ * Designates (and eventually executes) a thunk which potentially blocks the calling `Thread`.
*
- * Clients should use `scala.concurrent.blocking` instead; this is
- * the implementation of `scala.concurrent.blocking`, generally
- * provided by a `scala.concurrent.ExecutionContext` or `java.util.concurrent.Executor`.
+ * Clients must use `scala.concurrent.blocking` or `scala.concurrent.Await` instead.
*/
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T
+ def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T
}
object BlockContext {
private object DefaultBlockContext extends BlockContext {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- awaitable.result(atMost)(Await.canAwaitEvidence)
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = thunk
}
- private val contextLocal = new ThreadLocal[BlockContext]() {
- override def initialValue = Thread.currentThread match {
+ private val contextLocal = new ThreadLocal[BlockContext]()
+
+ /** Obtain the current thread's current `BlockContext`. */
+ def current: BlockContext = contextLocal.get match {
+ case null => Thread.currentThread match {
case ctx: BlockContext => ctx
case _ => DefaultBlockContext
}
+ case some => some
}
- /** Obtain the current thread's current `BlockContext`. */
- def current: BlockContext = contextLocal.get
-
/** Pushes a current `BlockContext` while executing `body`. */
def withBlockContext[T](blockContext: BlockContext)(body: => T): T = {
- val old = contextLocal.get
+ val old = contextLocal.get // can be null
try {
contextLocal.set(blockContext)
body
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index f82b79cb18..d24fdbf005 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -707,11 +707,9 @@ object Future {
// doesn't need to create defaultExecutionContext as
// a side effect.
private[concurrent] object InternalCallbackExecutor extends ExecutionContext {
- def execute(runnable: Runnable): Unit =
+ override def execute(runnable: Runnable): Unit =
runnable.run()
- def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T =
- throw new IllegalStateException("bug in scala.concurrent, called blocking() from internal callback")
- def reportFailure(t: Throwable): Unit =
+ override def reportFailure(t: Throwable): Unit =
throw new IllegalStateException("problem in scala.concurrent internal callback", t)
}
}
diff --git a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled b/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
deleted file mode 100644
index 241efa8857..0000000000
--- a/src/library/scala/concurrent/default/SchedulerImpl.scala.disabled
+++ /dev/null
@@ -1,44 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2003-2011, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-package scala.concurrent
-package default
-
-import scala.concurrent.util.Duration
-
-private[concurrent] final class SchedulerImpl extends Scheduler {
- private val timer =
- new java.util.Timer(true) // the associated thread runs as a daemon
-
- def schedule(delay: Duration, frequency: Duration)(thunk: => Unit): Cancellable = ???
-
- def scheduleOnce(delay: Duration, task: Runnable): Cancellable = {
- val timerTask = new java.util.TimerTask {
- def run(): Unit =
- task.run()
- }
- timer.schedule(timerTask, delay.toMillis)
- new Cancellable {
- def cancel(): Unit =
- timerTask.cancel()
- }
- }
-
- def scheduleOnce(delay: Duration)(task: => Unit): Cancellable = {
- val timerTask = new java.util.TimerTask {
- def run(): Unit =
- task
- }
- timer.schedule(timerTask, delay.toMillis)
- new Cancellable {
- def cancel(): Unit =
- timerTask.cancel()
- }
- }
-
-}
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala.disabled b/src/library/scala/concurrent/default/TaskImpl.scala.disabled
deleted file mode 100644
index 8b4eb12d4f..0000000000
--- a/src/library/scala/concurrent/default/TaskImpl.scala.disabled
+++ /dev/null
@@ -1,313 +0,0 @@
-package scala.concurrent
-package default
-
-
-
-import java.util.concurrent.atomic.AtomicReferenceFieldUpdater
-import scala.concurrent.forkjoin.{ ForkJoinPool, RecursiveAction, ForkJoinWorkerThread }
-import scala.util.Try
-import scala.util
-import scala.concurrent.util.Duration
-import scala.annotation.tailrec
-import scala.util.control.NonFatal
-
-
-private[concurrent] trait Completable[T] {
-self: Future[T] =>
-
- val executor: ExecutionContextImpl
-
- def newPromise[S]: Promise[S] = executor promise
-
- type Callback = Try[T] => Any
-
- def getState: State[T]
-
- def casState(oldv: State[T], newv: State[T]): Boolean
-
- protected def dispatch[U](r: Runnable) = executionContext execute r
-
- protected def processCallbacks(cbs: List[Callback], r: Try[T]) =
- for (cb <- cbs) dispatch(new Runnable {
- override def run() = cb(r)
- })
-
- def future: Future[T] = self
-
- def onComplete[U](callback: Try[T] => U): this.type = {
- @tailrec def tryAddCallback(): Try[T] = {
- getState match {
- case p @ Pending(lst) =>
- val pt = p.asInstanceOf[Pending[T]]
- if (casState(pt, Pending(callback :: pt.callbacks))) null
- else tryAddCallback()
- case Success(res) => util.Success(res)
- case Failure(t) => util.Failure(t)
- }
- }
-
- val res = tryAddCallback()
- if (res != null) dispatch(new Runnable {
- override def run() =
- try callback(res)
- catch handledFutureException andThen {
- t => Console.err.println(t)
- }
- })
-
- this
- }
-
- def isTimedout: Boolean = getState match {
- case Failure(ft: FutureTimeoutException) => true
- case _ => false
- }
-
-}
-
-private[concurrent] class PromiseImpl[T](context: ExecutionContextImpl)
-extends Promise[T] with Future[T] with Completable[T] {
-
- val executor: scala.concurrent.default.ExecutionContextImpl = context
-
- @volatile private var state: State[T] = _
-
- val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[PromiseImpl[T]], classOf[State[T]], "state")
-
- updater.set(this, Pending(List()))
-
- def casState(oldv: State[T], newv: State[T]): Boolean = {
- updater.compareAndSet(this, oldv, newv)
- }
-
- def getState: State[T] = {
- updater.get(this)
- }
-
- @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
- case _ => null
- }
-
- def tryComplete(r: Try[T]) = r match {
- case util.Failure(t) => tryFailure(t)
- case util.Success(v) => trySuccess(v)
- }
-
- override def trySuccess(value: T): Boolean = {
- val cbs = tryCompleteState(Success(value))
- if (cbs == null)
- false
- else {
- processCallbacks(cbs, util.Success(value))
- this.synchronized {
- this.notifyAll()
- }
- true
- }
- }
-
- override def tryFailure(t: Throwable): Boolean = {
- val wrapped = wrap(t)
- val cbs = tryCompleteState(Failure(wrapped))
- if (cbs == null)
- false
- else {
- processCallbacks(cbs, util.Failure(wrapped))
- this.synchronized {
- this.notifyAll()
- }
- true
- }
- }
-
- def await(atMost: Duration)(implicit canawait: scala.concurrent.CanAwait): T = getState match {
- case Success(res) => res
- case Failure(t) => throw t
- case _ =>
- this.synchronized {
- while (true)
- getState match {
- case Pending(_) => this.wait()
- case Success(res) => return res
- case Failure(t) => throw t
- }
- }
- sys.error("unreachable")
- }
-
-}
-
-private[concurrent] class TaskImpl[T](context: ExecutionContextImpl, body: => T)
-extends RecursiveAction with Task[T] with Future[T] with Completable[T] {
-
- val executor: ExecutionContextImpl = context
-
- @volatile private var state: State[T] = _
-
- val updater = AtomicReferenceFieldUpdater.newUpdater(classOf[TaskImpl[T]], classOf[State[T]], "state")
-
- updater.set(this, Pending(List()))
-
- def casState(oldv: State[T], newv: State[T]): Boolean = {
- updater.compareAndSet(this, oldv, newv)
- }
-
- def getState: State[T] = {
- updater.get(this)
- }
-
- @tailrec private def tryCompleteState(completed: State[T]): List[Callback] = (getState: @unchecked) match {
- case p @ Pending(cbs) => if (!casState(p, completed)) tryCompleteState(completed) else cbs
- }
-
- def compute(): Unit = {
- var cbs: List[Callback] = null
- try {
- val res = body
- processCallbacks(tryCompleteState(Success(res)), util.Success(res))
- } catch {
- case t if NonFatal(t) =>
- processCallbacks(tryCompleteState(Failure(t)), util.Failure(t))
- case t =>
- val ee = new ExecutionException(t)
- processCallbacks(tryCompleteState(Failure(ee)), util.Failure(ee))
- throw t
- }
- }
-
- def start(): Unit = {
- Thread.currentThread match {
- case fj: ForkJoinWorkerThread if fj.getPool eq executor.pool => fork()
- case _ => executor.pool.execute(this)
- }
- }
-
- // TODO FIXME: handle timeouts
- def await(atMost: Duration): this.type =
- await
-
- def await: this.type = {
- this.join()
- this
- }
-
- def tryCancel(): Unit =
- tryUnfork()
-
- def await(atMost: Duration)(implicit canawait: CanAwait): T = {
- join() // TODO handle timeout also
- (updater.get(this): @unchecked) match {
- case Success(r) => r
- case Failure(t) => throw t
- }
- }
-
-}
-
-
-private[concurrent] sealed abstract class State[T]
-
-
-case class Pending[T](callbacks: List[Try[T] => Any]) extends State[T]
-
-
-case class Success[T](result: T) extends State[T]
-
-
-case class Failure[T](throwable: Throwable) extends State[T]
-
-
-private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
- import ExecutionContextImpl._
-
- val pool = {
- val p = new ForkJoinPool
- p.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
- def uncaughtException(t: Thread, throwable: Throwable) {
- Console.err.println(throwable.getMessage)
- throwable.printStackTrace(Console.err)
- }
- })
- p
- }
-
- @inline
- private def executeTask(task: RecursiveAction) {
- if (Thread.currentThread.isInstanceOf[ForkJoinWorkerThread])
- task.fork()
- else
- pool execute task
- }
-
- def execute(task: Runnable) {
- val action = new RecursiveAction { def compute() { task.run() } }
- executeTask(action)
- }
-
- def execute[U](body: () => U) {
- val action = new RecursiveAction { def compute() { body() } }
- executeTask(action)
- }
-
- def task[T](body: => T): Task[T] = {
- new TaskImpl(this, body)
- }
-
- def future[T](body: => T): Future[T] = {
- val t = task(body)
- t.start()
- t.future
- }
-
- def promise[T]: Promise[T] =
- new PromiseImpl[T](this)
-
- def blocking[T](atMost: Duration)(body: =>T): T = blocking(body2awaitable(body), atMost)
-
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T = {
- currentExecutionContext.get match {
- case null => awaitable.await(atMost)(null) // outside - TODO - fix timeout case
- case x if x eq this => this.blockingCall(awaitable) // inside an execution context thread on this executor
- case x => x.blocking(awaitable, atMost)
- }
- }
-
- private def blockingCall[T](b: Awaitable[T]): T = b match {
- case fj: TaskImpl[_] if fj.executor.pool eq pool =>
- fj.await(Duration.fromNanos(0))
- case _ =>
- var res: T = null.asInstanceOf[T]
- @volatile var blockingDone = false
- // TODO add exception handling here!
- val mb = new ForkJoinPool.ManagedBlocker {
- def block() = {
- res = b.await(Duration.fromNanos(0))(CanAwaitEvidence)
- blockingDone = true
- true
- }
- def isReleasable = blockingDone
- }
- ForkJoinPool.managedBlock(mb, true)
- res
- }
-
- def reportFailure(t: Throwable): Unit = {}
-
-}
-
-
-object ExecutionContextImpl {
-
- private[concurrent] def currentExecutionContext: ThreadLocal[ExecutionContext] = new ThreadLocal[ExecutionContext] {
- override protected def initialValue = null
- }
-
-}
-
-
-
-
-
-
-
diff --git a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
index 98f821652f..875a558887 100644
--- a/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
+++ b/src/library/scala/concurrent/impl/ExecutionContextImpl.scala
@@ -13,7 +13,7 @@ package scala.concurrent.impl
import java.util.concurrent.{ LinkedBlockingQueue, Callable, Executor, ExecutorService, Executors, ThreadFactory, TimeUnit, ThreadPoolExecutor }
import java.util.Collection
import scala.concurrent.forkjoin._
-import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, ExecutionContextExecutor, ExecutionContextExecutorService }
+import scala.concurrent.{ BlockContext, ExecutionContext, Awaitable, CanAwait, ExecutionContextExecutor, ExecutionContextExecutorService }
import scala.concurrent.util.Duration
import scala.util.control.NonFatal
@@ -37,15 +37,15 @@ private[scala] class ExecutionContextImpl private[impl] (es: Executor, reporter:
def newThread(runnable: Runnable): Thread = wire(new Thread(runnable))
def newThread(fjp: ForkJoinPool): ForkJoinWorkerThread = wire(new ForkJoinWorkerThread(fjp) with BlockContext {
- override def internalBlockingCall[T](awaitable: Awaitable[T], atMost: Duration): T = {
+ override def blockOn[T](thunk: =>T)(implicit permission: CanAwait): T = {
var result: T = null.asInstanceOf[T]
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@volatile var isdone = false
- def block(): Boolean = {
- result = try awaitable.result(atMost)(scala.concurrent.Await.canAwaitEvidence) finally { isdone = true }
+ override def block(): Boolean = {
+ result = try thunk finally { isdone = true }
true
}
- def isReleasable = isdone
+ override def isReleasable = isdone
})
result
}
diff --git a/src/library/scala/concurrent/impl/Future.scala b/src/library/scala/concurrent/impl/Future.scala
index 132e1d79e7..b32824c0c9 100644
--- a/src/library/scala/concurrent/impl/Future.scala
+++ b/src/library/scala/concurrent/impl/Future.scala
@@ -22,15 +22,6 @@ private[concurrent] trait Future[+T] extends scala.concurrent.Future[T] with Awa
private[concurrent] object Future {
- /** Wraps a block of code into an awaitable object. */
- private[concurrent] def body2awaitable[T](body: =>T) = new Awaitable[T] {
- def ready(atMost: Duration)(implicit permit: CanAwait) = {
- body
- this
- }
- def result(atMost: Duration)(implicit permit: CanAwait) = body
- }
-
def boxedType(c: Class[_]): Class[_] = if (c.isPrimitive) scala.concurrent.Future.toBoxed(c) else c
private[impl] class PromiseCompletingRunnable[T](body: => T)
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index 76703bf081..a6488b602f 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -9,6 +9,7 @@
package scala
import scala.concurrent.util.Duration
+import scala.annotation.implicitNotFound
/** This package object contains primitives for concurrent and parallel programming.
*/
@@ -17,6 +18,41 @@ package object concurrent {
type CancellationException = java.util.concurrent.CancellationException
type TimeoutException = java.util.concurrent.TimeoutException
+ @implicitNotFound("Don't call `Awaitable` methods directly, use the `Await` object.")
+ sealed trait CanAwait
+ private implicit object AwaitPermission extends CanAwait
+
+ /**
+ * `Await` is what is used to ensure proper handling of blocking for `Awaitable` instances.
+ */
+ object Await {
+ /**
+ * Invokes ready() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
+ * ready() blocks until the awaitable has completed or the timeout expires.
+ *
+ * Throws a TimeoutException if the timeout expires, as that is in the contract of `Awaitable.ready`.
+ * @param awaitable the `Awaitable` on which `ready` is to be called
+ * @param atMost the maximum timeout for which to wait
+ * @return the result of `awaitable.ready` which is defined to be the awaitable itself.
+ */
+ @throws(classOf[TimeoutException])
+ def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type =
+ blocking(awaitable.ready(atMost))
+
+ /**
+ * Invokes result() on the awaitable, properly wrapped by a call to `scala.concurrent.blocking`.
+ * result() blocks until the awaitable has completed or the timeout expires.
+ *
+ * Throws a TimeoutException if the timeout expires, or any exception thrown by `Awaitable.result`.
+ * @param awaitable the `Awaitable` on which `result` is to be called
+ * @param atMost the maximum timeout for which to wait
+ * @return the result of `awaitable.result`
+ */
+ @throws(classOf[Exception])
+ def result[T](awaitable: Awaitable[T], atMost: Duration): T =
+ blocking(awaitable.result(atMost))
+ }
+
/** Starts an asynchronous computation and returns a `Future` object with the result of that computation.
*
* The result becomes available once the asynchronous computation is completed.
@@ -36,46 +72,18 @@ package object concurrent {
*/
def promise[T]()(implicit execctx: ExecutionContext): Promise[T] = Promise[T]()
- /** Used to block on a piece of code which potentially blocks.
+ /** Used to designate a piece of code which potentially blocks, allowing the BlockContext to adjust the runtime's behavior.
+ * Properly marking blocking code may improve performance or avoid deadlocks.
*
- * @param body A piece of code which contains potentially blocking or long running calls.
- *
- * Calling this method may throw the following exceptions:
- * - CancellationException - if the computation was cancelled
- * - InterruptedException - in the case that a wait within the blockable object was interrupted
- * - TimeoutException - in the case that the blockable object timed out
- */
- def blocking[T](body: =>T): T = blocking(impl.Future.body2awaitable(body), Duration.Inf)
-
- /** Blocks on an awaitable object.
+ * If you have an `Awaitable` then you should use Await.result instead of `blocking`.
*
- * @param awaitable An object with a `block` method which runs potentially blocking or long running calls.
+ * @param body A piece of code which contains potentially blocking or long running calls.
*
* Calling this method may throw the following exceptions:
* - CancellationException - if the computation was cancelled
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- def blocking[T](awaitable: Awaitable[T], atMost: Duration): T =
- BlockContext.current.internalBlockingCall(awaitable, atMost)
-}
-
-/* concurrency constructs */
-package concurrent {
-
- sealed trait CanAwait
-
- object Await {
- private[concurrent] implicit val canAwaitEvidence = new CanAwait {}
-
- def ready[T](awaitable: Awaitable[T], atMost: Duration): awaitable.type = {
- blocking(awaitable, atMost)
- awaitable
- }
-
- def result[T](awaitable: Awaitable[T], atMost: Duration): T = {
- blocking(awaitable, atMost)
- }
-
- }
+ @throws(classOf[Exception])
+ def blocking[T](body: =>T): T = BlockContext.current.blockOn(body)
}