summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-09-11 15:50:39 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-09-11 15:50:39 +0000
commit514ff83e3983d81f8bf948abebbe5b9141d9690d (patch)
tree28acd359da510bdff60fa15d9917670446b931aa /src
parent61ff261346289f7886350a8a4da5688574070e59 (diff)
downloadscala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.gz
scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.bz2
scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.zip
Split TaskRunner into FutureTaskRunner and Task...
Split TaskRunner into FutureTaskRunner and TaskRunner. FutureTaskRunner has an abstract Future[T] type member and inherits an abstract Task[T] type member from TaskRunner. Implicit conversions enable tasks and futures to be treated as parameter-less functions. This allows TaskRunners to be used by actor schedulers without creating lots of wrapper objects.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala11
-rw-r--r--src/actors/scala/actors/ActorTask.scala5
-rw-r--r--src/actors/scala/actors/ReactorTask.scala5
-rw-r--r--src/actors/scala/actors/Replyable.scala15
-rw-r--r--src/actors/scala/actors/ReplyableReactor.scala2
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinPool.java2
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinTask.java2
-rw-r--r--src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java2
-rw-r--r--src/actors/scala/actors/forkjoin/LinkedTransferQueue.java2
-rw-r--r--src/actors/scala/actors/forkjoin/RecursiveAction.java2
-rw-r--r--src/actors/scala/actors/forkjoin/RecursiveTask.java2
-rw-r--r--src/actors/scala/actors/forkjoin/ThreadLocalRandom.java2
-rw-r--r--src/actors/scala/actors/forkjoin/TransferQueue.java2
-rw-r--r--src/actors/scala/actors/forkjoin/package-info.java2
-rw-r--r--src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala2
-rw-r--r--src/actors/scala/actors/scheduler/ExecutorScheduler.scala20
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala3
-rw-r--r--src/actors/scala/actors/scheduler/SchedulerService.scala9
-rw-r--r--src/actors/scala/actors/scheduler/TerminationService.scala9
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala9
-rw-r--r--src/library/scala/concurrent/FutureTaskRunner.scala17
-rw-r--r--src/library/scala/concurrent/JavaConversions.scala25
-rw-r--r--src/library/scala/concurrent/TaskRunner.scala11
-rw-r--r--src/library/scala/concurrent/TaskRunners.scala6
-rw-r--r--src/library/scala/concurrent/ThreadPoolRunner.scala31
-rw-r--r--src/library/scala/concurrent/ThreadRunner.scala19
-rw-r--r--src/library/scala/concurrent/ops.scala16
27 files changed, 132 insertions, 101 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 5972588089..f8da909459 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -13,7 +13,7 @@ package scala.actors
import scala.compat.Platform
import scala.util.control.ControlException
import java.util.{Timer, TimerTask}
-import java.util.concurrent.ExecutionException
+import java.util.concurrent.{ExecutionException, Callable}
/**
* The <code>Actor</code> object provides functions for the definition of
@@ -398,11 +398,14 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
*/
private var onTimeout: Option[TimerTask] = None
+ private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable {
+ def call() = fun()
+ def run() = fun()
+ }
+
protected[this] override def makeReaction(fun: () => Unit): Runnable = {
if (isSuspended)
- new Runnable {
- def run() { fun() }
- }
+ new RunCallable(fun)
else
new ActorTask(this, fun)
}
diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala
index 331c84a3ee..16c04aa34f 100644
--- a/src/actors/scala/actors/ActorTask.scala
+++ b/src/actors/scala/actors/ActorTask.scala
@@ -12,6 +12,7 @@
package scala.actors
import java.lang.Runnable
+import java.util.concurrent.Callable
/** <p>
* The class <code>ActorTask</code>...
@@ -19,7 +20,7 @@ import java.lang.Runnable
*
* @author Philipp Haller
*/
-private[actors] class ActorTask extends Runnable {
+private[actors] class ActorTask extends Callable[Unit] with Runnable {
private var a: Actor = null
private var fun: () => Unit = null
@@ -30,6 +31,8 @@ private[actors] class ActorTask extends Runnable {
this.fun = fun
}
+ def call() = run()
+
def run() {
val saved = Actor.tl.get
Actor.tl set a
diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala
index 30a0fc988b..824e95090d 100644
--- a/src/actors/scala/actors/ReactorTask.scala
+++ b/src/actors/scala/actors/ReactorTask.scala
@@ -12,6 +12,7 @@
package scala.actors
import java.lang.Runnable
+import java.util.concurrent.Callable
/** <p>
* The class <code>ReactorTask</code>...
@@ -19,7 +20,7 @@ import java.lang.Runnable
*
* @author Philipp Haller
*/
-private[actors] class ReactorTask extends Runnable {
+private[actors] class ReactorTask extends Callable[Unit] with Runnable {
private var reactor: Reactor = null
private var fun: () => Unit = null
@@ -30,6 +31,8 @@ private[actors] class ReactorTask extends Runnable {
this.fun = fun
}
+ def call() = run()
+
def run() {
val saved = Actor.tl.get
Actor.tl set reactor
diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala
index 39d87241a6..62247a6b8e 100644
--- a/src/actors/scala/actors/Replyable.scala
+++ b/src/actors/scala/actors/Replyable.scala
@@ -16,11 +16,8 @@ package scala.actors
*
* @author Philipp Haller
*/
-trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] {
-/*
- def apply(msg: T): this.Future[R] =
- this !! msg
-*/
+trait Replyable[-T, +R] {
+
/**
* Sends <code>msg</code> to this Replyable and awaits reply
* (synchronous).
@@ -48,8 +45,8 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] {
* @param msg the message to be sent
* @return the future
*/
- def !!(msg: T): this.Future[R]
-// () => this !? msg
+ def !!(msg: T): () => R =
+ () => this !? msg
/**
* Sends <code>msg</code> to this actor and immediately
@@ -62,7 +59,7 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] {
* @param f the function to be applied to the response
* @return the future
*/
- def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P]
-// () => f(this !? msg)
+ def !![P](msg: T, f: PartialFunction[R, P]): () => P =
+ () => f(this !? msg)
}
diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala
index 6752fbb7b4..84168abe0a 100644
--- a/src/actors/scala/actors/ReplyableReactor.scala
+++ b/src/actors/scala/actors/ReplyableReactor.scala
@@ -20,8 +20,6 @@ package scala.actors
trait ReplyableReactor extends Replyable[Any, Any] {
thiz: ReplyReactor =>
- type Future[+S] = scala.actors.Future[S]
-
/**
* Sends <code>msg</code> to this actor and awaits reply
* (synchronous).
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
index cd4444ea97..ba30f3a161 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinTask.java b/src/actors/scala/actors/forkjoin/ForkJoinTask.java
index 230c7a0a20..e6c0fa7bb4 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinTask.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinTask.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.io.Serializable;
import java.util.*;
import java.util.concurrent.*;
diff --git a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
index 2d75987f91..941f5ec0cb 100644
--- a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
+++ b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
diff --git a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java
index d4a9760dfd..3055e3b68f 100644
--- a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java
+++ b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.util.concurrent.*;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;
diff --git a/src/actors/scala/actors/forkjoin/RecursiveAction.java b/src/actors/scala/actors/forkjoin/RecursiveAction.java
index fc813ec8b6..2d36f7eb33 100644
--- a/src/actors/scala/actors/forkjoin/RecursiveAction.java
+++ b/src/actors/scala/actors/forkjoin/RecursiveAction.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
/**
* Recursive resultless ForkJoinTasks. This class establishes
diff --git a/src/actors/scala/actors/forkjoin/RecursiveTask.java b/src/actors/scala/actors/forkjoin/RecursiveTask.java
index 5286174fdf..1f3110580b 100644
--- a/src/actors/scala/actors/forkjoin/RecursiveTask.java
+++ b/src/actors/scala/actors/forkjoin/RecursiveTask.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
/**
* Recursive result-bearing ForkJoinTasks.
diff --git a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java
index 1fa3bcd71e..34e2e37f37 100644
--- a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java
+++ b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.util.*;
/**
diff --git a/src/actors/scala/actors/forkjoin/TransferQueue.java b/src/actors/scala/actors/forkjoin/TransferQueue.java
index 27ee9f463b..9c7b2289c4 100644
--- a/src/actors/scala/actors/forkjoin/TransferQueue.java
+++ b/src/actors/scala/actors/forkjoin/TransferQueue.java
@@ -4,7 +4,7 @@
* http://creativecommons.org/licenses/publicdomain
*/
-package scala.actors.forkjoin;
+package scala.concurrent.forkjoin;
import java.util.concurrent.*;
/**
diff --git a/src/actors/scala/actors/forkjoin/package-info.java b/src/actors/scala/actors/forkjoin/package-info.java
index 4945bc80fc..b8fa0fad02 100644
--- a/src/actors/scala/actors/forkjoin/package-info.java
+++ b/src/actors/scala/actors/forkjoin/package-info.java
@@ -26,4 +26,4 @@
* are those that directly implement this algorithmic design pattern.
*
*/
-package jsr166y;
+package scala.concurrent.forkjoin;
diff --git a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala
index af5bc2c595..257fe92a91 100644
--- a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala
+++ b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala
@@ -2,7 +2,7 @@ package scala.actors
package scheduler
import java.util.Collection
-import forkjoin.{ForkJoinPool, ForkJoinTask}
+import scala.concurrent.forkjoin.{ForkJoinPool, ForkJoinTask}
private class DrainableForkJoinPool extends ForkJoinPool {
diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
index 07a014f92d..b1c0da256c 100644
--- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
@@ -11,6 +11,7 @@
package scala.actors
package scheduler
+import java.util.concurrent.Callable
import scala.concurrent.ThreadPoolRunner
/**
@@ -19,7 +20,24 @@ import scala.concurrent.ThreadPoolRunner
*
* @author Philipp Haller
*/
-trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] {
+trait ExecutorScheduler extends IScheduler with ThreadPoolRunner {
+
+ def execute(task: Runnable) {
+ super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]])
+ }
+
+ private class RunCallable(fun: => Unit) extends Callable[Unit] with Runnable {
+ def call() { fun }
+ def run() { fun }
+ }
+
+ /** Submits a closure for execution.
+ *
+ * @param fun the closure to be executed
+ */
+ override def execute(fun: => Unit) {
+ super[ThreadPoolRunner].execute((new RunCallable(fun)).asInstanceOf[Task[Unit]])
+ }
/** This method is called when the scheduler shuts down.
*/
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index 82e8f5c2fd..b7f68be3b3 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -3,7 +3,7 @@ package scheduler
import java.lang.Thread.State
import java.util.{Collection, ArrayList}
-import forkjoin._
+import scala.concurrent.forkjoin._
/** The <code>ForkJoinScheduler</code> is backed by a lightweight
* fork-join task execution framework.
@@ -95,6 +95,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
}
override def executeFromActor(task: Runnable) {
+ // TODO: only pass RecursiveAction (with Runnable), and cast to it
val recAction = new RecursiveAction {
def compute() = task.run()
}
diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala
index ab86161dfb..d1528c9e5b 100644
--- a/src/actors/scala/actors/scheduler/SchedulerService.scala
+++ b/src/actors/scala/actors/scheduler/SchedulerService.scala
@@ -74,15 +74,6 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC {
}
}
- /** Submits a closure for execution.
- *
- * @param fun the closure to be executed
- */
- def execute(fun: => Unit): Unit =
- execute(new Runnable {
- def run() { fun }
- })
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala
index aafa620a08..de63392962 100644
--- a/src/actors/scala/actors/scheduler/TerminationService.scala
+++ b/src/actors/scala/actors/scheduler/TerminationService.scala
@@ -55,15 +55,6 @@ abstract class TerminationService(terminate: Boolean)
}
}
- /** Submits a closure for execution.
- *
- * @param fun the closure to be executed
- */
- def execute(fun: => Unit): Unit =
- execute(new Runnable {
- def run() { fun }
- })
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
index fd8a0e6a64..c43f541cbd 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
@@ -82,15 +82,6 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
}
}
- /** Submits a closure for execution.
- *
- * @param fun the closure to be executed
- */
- def execute(fun: => Unit): Unit =
- execute(new Runnable {
- def run() { fun }
- })
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala
new file mode 100644
index 0000000000..48ad0817a2
--- /dev/null
+++ b/src/library/scala/concurrent/FutureTaskRunner.scala
@@ -0,0 +1,17 @@
+package scala.concurrent
+
+/** The <code>FutureTaskRunner</code> trait...
+ *
+ * @author Philipp Haller
+ */
+trait FutureTaskRunner extends TaskRunner {
+
+ type Future[T]
+
+ implicit def futureAsFunction[S](x: Future[S]): () => S
+
+ def submit[S](task: Task[S]): Future[S]
+
+ def managedBlock(blocker: ManagedBlocker): Unit
+
+}
diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala
index 7ee9096127..7810cee669 100644
--- a/src/library/scala/concurrent/JavaConversions.scala
+++ b/src/library/scala/concurrent/JavaConversions.scala
@@ -8,8 +8,8 @@ import java.util.concurrent.{ExecutorService, Executor}
*/
object JavaConversions {
- implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] =
- new ThreadPoolRunner[Unit] {
+ implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner =
+ new ThreadPoolRunner {
override protected def executor =
exec
@@ -17,17 +17,16 @@ object JavaConversions {
exec.shutdown()
}
- implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] =
- new TaskRunner[Unit] {
- type Future[+R] = () => R
-
- def submit(task: () => Unit): this.Future[Unit] = {
- val result = new SyncVar[Either[Throwable, Unit]]
- val runnable = new Runnable {
- def run() { result set tryCatch(task()) }
- }
- exec.execute(runnable)
- () => ops getOrThrow result.get
+ implicit def asTaskRunner(exec: Executor): TaskRunner =
+ new TaskRunner {
+ type Task[T] = Runnable
+
+ implicit def functionAsTask[T](fun: () => T): Task[T] = new Runnable {
+ def run() { fun() }
+ }
+
+ def execute[S](task: Task[S]) {
+ exec.execute(task)
}
def managedBlock(blocker: ManagedBlocker) {
diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala
index a393b065fa..e9b7c6916b 100644
--- a/src/library/scala/concurrent/TaskRunner.scala
+++ b/src/library/scala/concurrent/TaskRunner.scala
@@ -4,16 +4,15 @@ package scala.concurrent
*
* @author Philipp Haller
*/
-trait TaskRunner[T] extends AsyncInvokable[() => T, T] {
+trait TaskRunner {
- def submit(task: () => T): Future[T]
+ type Task[T]
- def shutdown(): Unit
+ implicit def functionAsTask[S](fun: () => S): Task[S]
- def !!(task: () => T): Future[T] =
- submit(task)
+ def execute[S](task: Task[S]): Unit
- def managedBlock(blocker: ManagedBlocker): Unit
+ def shutdown(): Unit
/** If expression computed successfully return it in <code>Right</code>,
* otherwise return exception in <code>Left</code>.
diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala
index 8219d9d169..cc5c1a5131 100644
--- a/src/library/scala/concurrent/TaskRunners.scala
+++ b/src/library/scala/concurrent/TaskRunners.scala
@@ -8,10 +8,10 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit}
*/
object TaskRunners {
- implicit val threadRunner: TaskRunner[Unit] =
- new ThreadRunner[Unit]
+ implicit val threadRunner: FutureTaskRunner =
+ new ThreadRunner
- implicit val threadPoolRunner: TaskRunner[Unit] = {
+ implicit val threadPoolRunner: FutureTaskRunner = {
val numCores = Runtime.getRuntime().availableProcessors()
val keepAliveTime = 60000L
val workQueue = new LinkedBlockingQueue[Runnable]
diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala
index cbc5ebb293..e532d2bfff 100644
--- a/src/library/scala/concurrent/ThreadPoolRunner.scala
+++ b/src/library/scala/concurrent/ThreadPoolRunner.scala
@@ -2,29 +2,36 @@ package scala.concurrent
import java.util.concurrent.{ExecutorService, Callable, TimeUnit}
-import scala.annotation.unchecked.uncheckedVariance
-
/** The <code>ThreadPoolRunner</code> trait...
*
* @author Philipp Haller
*/
-trait ThreadPoolRunner[T] extends TaskRunner[T] {
+trait ThreadPoolRunner extends FutureTaskRunner {
+
+ type Task[T] = Callable[T] with Runnable
+ type Future[T] = RichFuture[T]
+
+ private class RunCallable[S](fun: () => S) extends Runnable with Callable[S] {
+ def run() = fun()
+ def call() = fun()
+ }
+
+ implicit def functionAsTask[S](fun: () => S): Task[S] =
+ new RunCallable(fun)
- type Future[+R] = RichFuture[R]
+ implicit def futureAsFunction[S](x: Future[S]): () => S =
+ () => x.get()
- trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance]
- with (() => S)
+ trait RichFuture[S] extends java.util.concurrent.Future[S]
+ with (() => S)
protected def executor: ExecutorService
- def submit(task: () => T): this.Future[T] = {
- val callable = new Callable[T] {
- def call() = task()
- }
- toRichFuture(executor.submit[T](callable))
+ def submit[S](task: Task[S]): Future[S] = {
+ toRichFuture(executor.submit[S](task))
}
- def execute(task: Runnable): Unit =
+ def execute[S](task: Task[S]): Unit =
executor execute task
def managedBlock(blocker: ManagedBlocker) {
diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala
index f48e0384b7..16269fa65f 100644
--- a/src/library/scala/concurrent/ThreadRunner.scala
+++ b/src/library/scala/concurrent/ThreadRunner.scala
@@ -6,12 +6,23 @@ import java.lang.Thread
*
* @author Philipp Haller
*/
-class ThreadRunner[T] extends TaskRunner[T] {
+class ThreadRunner extends FutureTaskRunner {
- type Future[+S] = () => S
+ type Task[T] = () => T
+ type Future[T] = () => T
- def submit(task: () => T): this.Future[T] = {
- val result = new SyncVar[Either[Exception, T]]
+ implicit def functionAsTask[S](fun: () => S): Task[S] = fun
+ implicit def futureAsFunction[S](x: Future[S]): () => S = x
+
+ def execute[S](task: Task[S]) {
+ val runnable = new Runnable {
+ def run() { tryCatch(task()) }
+ }
+ (new Thread(runnable)).start()
+ }
+
+ def submit[S](task: Task[S]): Future[S] = {
+ val result = new SyncVar[Either[Exception, S]]
val runnable = new Runnable {
def run() { result set tryCatch(task()) }
}
diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala
index e2fb8f0ceb..4ff26d4465 100644
--- a/src/library/scala/concurrent/ops.scala
+++ b/src/library/scala/concurrent/ops.scala
@@ -20,21 +20,25 @@ import scala.util.control.Exception.allCatch
*/
object ops
{
- implicit val defaultRunner: TaskRunner[Unit] =
+
+ implicit val defaultRunner: FutureTaskRunner =
TaskRunners.threadRunner
/**
* If expression computed successfully return it in <code>Right</code>,
* otherwise return exception in <code>Left</code>.
*/
+ //TODO: make private
def tryCatch[A](body: => A): Either[Throwable, A] =
allCatch[A] either body
+ //TODO: make private
def tryCatchEx[A](body: => A): Either[Exception, A] =
try Right(body) catch {
case ex: Exception => Left(ex)
}
+ //TODO: make private
def getOrThrow[T <: Throwable, A](x: Either[T, A]): A =
x.fold[A](throw _, identity _)
@@ -42,18 +46,16 @@ object ops
*
* @param p the expression to evaluate
*/
- def spawn(p: => Unit)(implicit runner: TaskRunner[Unit] = defaultRunner): Unit = {
- runner submit (() => p)
+ def spawn(p: => Unit)(implicit runner: TaskRunner = defaultRunner): Unit = {
+ runner execute runner.functionAsTask(() => p)
}
/**
* @param p ...
* @return ...
*/
- def future[A](p: => A)(implicit runner: TaskRunner[Unit] = defaultRunner): () => A = {
- val result = new SyncVar[Either[Throwable, A]]
- spawn({ result set tryCatch(p) })(runner)
- () => getOrThrow(result.get)
+ def future[A](p: => A)(implicit runner: FutureTaskRunner = defaultRunner): () => A = {
+ runner.futureAsFunction(runner submit runner.functionAsTask(() => p))
}
/**