summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoraleksandar <aleksandar@lampmac14.epfl.ch>2011-12-12 18:15:41 +0100
committeraleksandar <aleksandar@lampmac14.epfl.ch>2011-12-12 18:15:41 +0100
commit7021aef3fd8a20c8f730af36f229e7bb2cfe8fb5 (patch)
tree64b868af65bb1b998c63f1265ed12430a4a78da5
parent64d50018b7a17c952c2bffaa38928bee2a9ee36c (diff)
downloadscala-7021aef3fd8a20c8f730af36f229e7bb2cfe8fb5.tar.gz
scala-7021aef3fd8a20c8f730af36f229e7bb2cfe8fb5.tar.bz2
scala-7021aef3fd8a20c8f730af36f229e7bb2cfe8fb5.zip
Syntax changes for the scala.concurrent package and some cleanup.
block on { } is now changed to: block(timeout) { }
-rw-r--r--src/library/scala/concurrent/ExecutionContext.scala4
-rw-r--r--src/library/scala/concurrent/Future.scala19
-rw-r--r--src/library/scala/concurrent/default/TaskImpl.scala10
-rw-r--r--src/library/scala/concurrent/package.scala56
-rw-r--r--test/files/jvm/concurrent-future.scala15
5 files changed, 55 insertions, 49 deletions
diff --git a/src/library/scala/concurrent/ExecutionContext.scala b/src/library/scala/concurrent/ExecutionContext.scala
index 207e190e95..5d802d71b2 100644
--- a/src/library/scala/concurrent/ExecutionContext.scala
+++ b/src/library/scala/concurrent/ExecutionContext.scala
@@ -12,6 +12,8 @@ trait ExecutionContext {
def execute(runnable: Runnable): Unit
+ def execute[U](body: () => U): Unit
+
def promise[T]: Promise[T]
def future[T](body: Callable[T]): Future[T] = future(body.call())
@@ -19,7 +21,7 @@ trait ExecutionContext {
def future[T](body: => T): Future[T]
/** Only callable from the tasks running on the same execution context. */
- def blockingCall[T](body: Blockable[T]): T
+ def blockingCall[T](timeout: Timeout, body: Blockable[T]): T
}
diff --git a/src/library/scala/concurrent/Future.scala b/src/library/scala/concurrent/Future.scala
index dc2a92ebd4..1f44b50018 100644
--- a/src/library/scala/concurrent/Future.scala
+++ b/src/library/scala/concurrent/Future.scala
@@ -26,11 +26,6 @@ import scala.collection.generic.CanBuildFrom
/** The trait that represents futures.
*
- * @define futureTimeout
- * The timeout of the future is:
- * - if this future was obtained from a task (i.e. by calling `task.future`), the timeout associated with that task
- * - if this future was obtained from a promise (i.e. by calling `promise.future`), the timeout associated with that promise
- *
* @define multipleCallbacks
* Multiple callbacks may be registered; there is no guarantee that they will be
* executed in a particular order.
@@ -75,8 +70,6 @@ self =>
* If the future has already been completed with a value,
* this will either be applied immediately or be scheduled asynchronously.
*
- * Will not be called in case of an exception (this includes the FutureTimeoutException).
- *
* $multipleCallbacks
*/
def onSuccess[U](func: T => U): this.type = onComplete {
@@ -94,15 +87,14 @@ self =>
*
* Will not be called in case that the future is completed with a value.
*
- * Will be called if the future is completed with a FutureTimeoutException.
- *
* $multipleCallbacks
*/
def onFailure[U](callback: PartialFunction[Throwable, U]): this.type = onComplete {
- case Left(t) if t.isInstanceOf[FutureTimeoutException] || isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t)
+ case Left(t) if isFutureThrowable(t) => if (callback.isDefinedAt(t)) callback(t)
case Right(v) => // do nothing
}
+ /* To be removed
/** When this future times out, apply the provided function.
*
* If the future has already timed out,
@@ -114,6 +106,7 @@ self =>
case Left(te: FutureTimeoutException) => callback(te)
case Right(v) => // do nothing
}
+ */
/** When this future is completed, either through an exception, a timeout, or a value,
* apply the provided function.
@@ -136,11 +129,13 @@ self =>
*/
def newPromise[S]: Promise[S] = executionContext promise
+ /*
/** Tests whether this `Future`'s timeout has expired.
*
* $futureTimeout
*/
def isTimedout: Boolean
+ */
/* Projections */
@@ -166,7 +161,6 @@ self =>
}
this
}
- def isTimedout = self.isTimedout
def block()(implicit canblock: CanBlock) = try {
val res = self.block()
throw noSuchElem(res)
@@ -177,6 +171,7 @@ self =>
new NoSuchElementException("Future.failed not completed with a throwable. Instead completed with: " + v)
}
+ /*
/** A timed out projection of this future.
*
* The timed out projection is a future holding a value of type `FutureTimeoutException`.
@@ -215,7 +210,7 @@ self =>
private def noSuchElemThrowable(v: Throwable) =
new NoSuchElementException("Future.timedout didn't time out. Instead failed with: " + v)
}
-
+ */
/* Monadic operations */
diff --git a/src/library/scala/concurrent/default/TaskImpl.scala b/src/library/scala/concurrent/default/TaskImpl.scala
index ca88735266..b4c08ba710 100644
--- a/src/library/scala/concurrent/default/TaskImpl.scala
+++ b/src/library/scala/concurrent/default/TaskImpl.scala
@@ -244,6 +244,11 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
executeTask(action)
}
+ def execute[U](body: () => U) {
+ val action = new RecursiveAction { def compute() { body() } }
+ executeTask(action)
+ }
+
def task[T](body: => T): Task[T] = {
new TaskImpl(this, body)
}
@@ -255,9 +260,10 @@ private[concurrent] final class ExecutionContextImpl extends ExecutionContext {
}
def promise[T]: Promise[T] =
- null
+ null // TODO
- def blockingCall[T](b: Blockable[T]): T = b match {
+ // TODO fix the timeout
+ def blockingCall[T](timeout: Timeout, b: Blockable[T]): T = b match {
case fj: TaskImpl[_] if fj.executionContext.pool eq pool =>
fj.block()
case _ =>
diff --git a/src/library/scala/concurrent/package.scala b/src/library/scala/concurrent/package.scala
index fed7f7caf8..dbe2f90f18 100644
--- a/src/library/scala/concurrent/package.scala
+++ b/src/library/scala/concurrent/package.scala
@@ -12,6 +12,9 @@ package scala
+import scala.util.{ Timeout, Duration }
+
+
/** This package object contains primitives for parallel programming.
*/
@@ -42,43 +45,38 @@ package object concurrent {
/* concurrency constructs */
- def future[T](body: =>T): Future[T] =
- executionContext future body
+ def future[T](body: =>T)(implicit execCtx: ExecutionContext = executionContext): Future[T] =
+ execCtx future body
- def promise[T]: Promise[T] =
- executionContext promise
+ def promise[T](implicit execCtx: ExecutionContext = executionContext): Promise[T] =
+ execCtx promise
- /** The keyword used to block on a piece of code which potentially blocks.
+ /** Used to block on a piece of code which potentially blocks.
*
- * @define mayThrow
+ * @param body A piece of code which contains potentially blocking or long running calls.
+ *
+ * Calling this method may throw the following exceptions:
+ * - CancellationException - if the computation was cancelled
+ * - InterruptedException - in the case that a wait within the blockable object was interrupted
+ * - TimeoutException - in the case that the blockable object timed out
+ */
+ def block[T](timeout: Timeout)(body: =>T): T = block(timeout, new Blockable[T] {
+ def block()(implicit cb: CanBlock) = body
+ })
+
+ /** Blocks on a blockable object.
+ *
+ * @param blockable An object with a `block` method which runs potentially blocking or long running calls.
+ *
* Calling this method may throw the following exceptions:
* - CancellationException - if the computation was cancelled
* - InterruptedException - in the case that a wait within the blockable object was interrupted
* - TimeoutException - in the case that the blockable object timed out
*/
- object block {
-
- /** Blocks on a piece of code.
- *
- * @param body A piece of code which contains potentially blocking or long running calls.
- *
- * $mayThrow
- */
- def on[T](body: =>T): T = on(new Blockable[T] {
- def block()(implicit cb: CanBlock) = body
- })
-
- /** Blocks on a blockable object.
- *
- * @param blockable An object with a `block` method which runs potentially blocking or long running calls.
- *
- * $mayThrow
- */
- def on[T](blockable: Blockable[T]): T = {
- currentExecutionContext.get match {
- case null => blockable.block()(null) // outside
- case x => x.blockingCall(blockable) // inside an execution context thread
- }
+ def block[T](timeout: Timeout, blockable: Blockable[T]): T = {
+ currentExecutionContext.get match {
+ case null => blockable.block()(null) // outside - TODO - fix timeout case
+ case x => x.blockingCall(timeout, blockable) // inside an execution context thread
}
}
diff --git a/test/files/jvm/concurrent-future.scala b/test/files/jvm/concurrent-future.scala
index 9c2f04fb07..8fb237eb0a 100644
--- a/test/files/jvm/concurrent-future.scala
+++ b/test/files/jvm/concurrent-future.scala
@@ -1,5 +1,9 @@
-import scala.concurrent.{ executionContext, FutureTimeoutException, ExecutionException, SyncVar }
-import executionContext._
+
+
+
+import scala.concurrent._
+
+
object Test extends App {
@@ -60,9 +64,10 @@ object Test extends App {
output(4, "onoes")
done()
}
- f onFailure { _ =>
- output(4, "kthxbye")
- done()
+ f onFailure {
+ case _ =>
+ output(4, "kthxbye")
+ done()
}
}