diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-09-11 15:50:39 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-09-11 15:50:39 +0000 |
commit | 514ff83e3983d81f8bf948abebbe5b9141d9690d (patch) | |
tree | 28acd359da510bdff60fa15d9917670446b931aa /src | |
parent | 61ff261346289f7886350a8a4da5688574070e59 (diff) | |
download | scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.gz scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.tar.bz2 scala-514ff83e3983d81f8bf948abebbe5b9141d9690d.zip |
Split TaskRunner into FutureTaskRunner and Task...
Split TaskRunner into FutureTaskRunner and TaskRunner. FutureTaskRunner
has an abstract Future[T] type member and inherits an abstract Task[T]
type member from TaskRunner. Implicit conversions enable tasks and
futures to be treated as parameter-less functions. This allows
TaskRunners to be used by actor schedulers without creating lots of
wrapper objects.
Diffstat (limited to 'src')
27 files changed, 132 insertions, 101 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 5972588089..f8da909459 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -13,7 +13,7 @@ package scala.actors import scala.compat.Platform import scala.util.control.ControlException import java.util.{Timer, TimerTask} -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, Callable} /** * The <code>Actor</code> object provides functions for the definition of @@ -398,11 +398,14 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { */ private var onTimeout: Option[TimerTask] = None + private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable { + def call() = fun() + def run() = fun() + } + protected[this] override def makeReaction(fun: () => Unit): Runnable = { if (isSuspended) - new Runnable { - def run() { fun() } - } + new RunCallable(fun) else new ActorTask(this, fun) } diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index 331c84a3ee..16c04aa34f 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -12,6 +12,7 @@ package scala.actors import java.lang.Runnable +import java.util.concurrent.Callable /** <p> * The class <code>ActorTask</code>... @@ -19,7 +20,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -private[actors] class ActorTask extends Runnable { +private[actors] class ActorTask extends Callable[Unit] with Runnable { private var a: Actor = null private var fun: () => Unit = null @@ -30,6 +31,8 @@ private[actors] class ActorTask extends Runnable { this.fun = fun } + def call() = run() + def run() { val saved = Actor.tl.get Actor.tl set a diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala index 30a0fc988b..824e95090d 100644 --- a/src/actors/scala/actors/ReactorTask.scala +++ b/src/actors/scala/actors/ReactorTask.scala @@ -12,6 +12,7 @@ package scala.actors import java.lang.Runnable +import java.util.concurrent.Callable /** <p> * The class <code>ReactorTask</code>... @@ -19,7 +20,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -private[actors] class ReactorTask extends Runnable { +private[actors] class ReactorTask extends Callable[Unit] with Runnable { private var reactor: Reactor = null private var fun: () => Unit = null @@ -30,6 +31,8 @@ private[actors] class ReactorTask extends Runnable { this.fun = fun } + def call() = run() + def run() { val saved = Actor.tl.get Actor.tl set reactor diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala index 39d87241a6..62247a6b8e 100644 --- a/src/actors/scala/actors/Replyable.scala +++ b/src/actors/scala/actors/Replyable.scala @@ -16,11 +16,8 @@ package scala.actors * * @author Philipp Haller */ -trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { -/* - def apply(msg: T): this.Future[R] = - this !! msg -*/ +trait Replyable[-T, +R] { + /** * Sends <code>msg</code> to this Replyable and awaits reply * (synchronous). @@ -48,8 +45,8 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { * @param msg the message to be sent * @return the future */ - def !!(msg: T): this.Future[R] -// () => this !? msg + def !!(msg: T): () => R = + () => this !? msg /** * Sends <code>msg</code> to this actor and immediately @@ -62,7 +59,7 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { * @param f the function to be applied to the response * @return the future */ - def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P] -// () => f(this !? msg) + def !![P](msg: T, f: PartialFunction[R, P]): () => P = + () => f(this !? msg) } diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 6752fbb7b4..84168abe0a 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -20,8 +20,6 @@ package scala.actors trait ReplyableReactor extends Replyable[Any, Any] { thiz: ReplyReactor => - type Future[+S] = scala.actors.Future[S] - /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous). diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java index cd4444ea97..ba30f3a161 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; diff --git a/src/actors/scala/actors/forkjoin/ForkJoinTask.java b/src/actors/scala/actors/forkjoin/ForkJoinTask.java index 230c7a0a20..e6c0fa7bb4 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinTask.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinTask.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.io.Serializable; import java.util.*; import java.util.concurrent.*; diff --git a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java index 2d75987f91..941f5ec0cb 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; diff --git a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java index d4a9760dfd..3055e3b68f 100644 --- a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java +++ b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; diff --git a/src/actors/scala/actors/forkjoin/RecursiveAction.java b/src/actors/scala/actors/forkjoin/RecursiveAction.java index fc813ec8b6..2d36f7eb33 100644 --- a/src/actors/scala/actors/forkjoin/RecursiveAction.java +++ b/src/actors/scala/actors/forkjoin/RecursiveAction.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; /** * Recursive resultless ForkJoinTasks. This class establishes diff --git a/src/actors/scala/actors/forkjoin/RecursiveTask.java b/src/actors/scala/actors/forkjoin/RecursiveTask.java index 5286174fdf..1f3110580b 100644 --- a/src/actors/scala/actors/forkjoin/RecursiveTask.java +++ b/src/actors/scala/actors/forkjoin/RecursiveTask.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; /** * Recursive result-bearing ForkJoinTasks. diff --git a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java index 1fa3bcd71e..34e2e37f37 100644 --- a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java +++ b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; /** diff --git a/src/actors/scala/actors/forkjoin/TransferQueue.java b/src/actors/scala/actors/forkjoin/TransferQueue.java index 27ee9f463b..9c7b2289c4 100644 --- a/src/actors/scala/actors/forkjoin/TransferQueue.java +++ b/src/actors/scala/actors/forkjoin/TransferQueue.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.concurrent.*; /** diff --git a/src/actors/scala/actors/forkjoin/package-info.java b/src/actors/scala/actors/forkjoin/package-info.java index 4945bc80fc..b8fa0fad02 100644 --- a/src/actors/scala/actors/forkjoin/package-info.java +++ b/src/actors/scala/actors/forkjoin/package-info.java @@ -26,4 +26,4 @@ * are those that directly implement this algorithmic design pattern. * */ -package jsr166y; +package scala.concurrent.forkjoin; diff --git a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala index af5bc2c595..257fe92a91 100644 --- a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala +++ b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala @@ -2,7 +2,7 @@ package scala.actors package scheduler import java.util.Collection -import forkjoin.{ForkJoinPool, ForkJoinTask} +import scala.concurrent.forkjoin.{ForkJoinPool, ForkJoinTask} private class DrainableForkJoinPool extends ForkJoinPool { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index 07a014f92d..b1c0da256c 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,6 +11,7 @@ package scala.actors package scheduler +import java.util.concurrent.Callable import scala.concurrent.ThreadPoolRunner /** @@ -19,7 +20,24 @@ import scala.concurrent.ThreadPoolRunner * * @author Philipp Haller */ -trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] { +trait ExecutorScheduler extends IScheduler with ThreadPoolRunner { + + def execute(task: Runnable) { + super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]]) + } + + private class RunCallable(fun: => Unit) extends Callable[Unit] with Runnable { + def call() { fun } + def run() { fun } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + override def execute(fun: => Unit) { + super[ThreadPoolRunner].execute((new RunCallable(fun)).asInstanceOf[Task[Unit]]) + } /** This method is called when the scheduler shuts down. */ diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala index 82e8f5c2fd..b7f68be3b3 100644 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -3,7 +3,7 @@ package scheduler import java.lang.Thread.State import java.util.{Collection, ArrayList} -import forkjoin._ +import scala.concurrent.forkjoin._ /** The <code>ForkJoinScheduler</code> is backed by a lightweight * fork-join task execution framework. @@ -95,6 +95,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor } override def executeFromActor(task: Runnable) { + // TODO: only pass RecursiveAction (with Runnable), and cast to it val recAction = new RecursiveAction { def compute() = task.run() } diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala index ab86161dfb..d1528c9e5b 100644 --- a/src/actors/scala/actors/scheduler/SchedulerService.scala +++ b/src/actors/scala/actors/scheduler/SchedulerService.scala @@ -74,15 +74,6 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC { } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala index aafa620a08..de63392962 100644 --- a/src/actors/scala/actors/scheduler/TerminationService.scala +++ b/src/actors/scala/actors/scheduler/TerminationService.scala @@ -55,15 +55,6 @@ abstract class TerminationService(terminate: Boolean) } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index fd8a0e6a64..c43f541cbd 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -82,15 +82,6 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { diff --git a/src/library/scala/concurrent/FutureTaskRunner.scala b/src/library/scala/concurrent/FutureTaskRunner.scala new file mode 100644 index 0000000000..48ad0817a2 --- /dev/null +++ b/src/library/scala/concurrent/FutureTaskRunner.scala @@ -0,0 +1,17 @@ +package scala.concurrent + +/** The <code>FutureTaskRunner</code> trait... + * + * @author Philipp Haller + */ +trait FutureTaskRunner extends TaskRunner { + + type Future[T] + + implicit def futureAsFunction[S](x: Future[S]): () => S + + def submit[S](task: Task[S]): Future[S] + + def managedBlock(blocker: ManagedBlocker): Unit + +} diff --git a/src/library/scala/concurrent/JavaConversions.scala b/src/library/scala/concurrent/JavaConversions.scala index 7ee9096127..7810cee669 100644 --- a/src/library/scala/concurrent/JavaConversions.scala +++ b/src/library/scala/concurrent/JavaConversions.scala @@ -8,8 +8,8 @@ import java.util.concurrent.{ExecutorService, Executor} */ object JavaConversions { - implicit def asTaskRunner(exec: ExecutorService): TaskRunner[Unit] = - new ThreadPoolRunner[Unit] { + implicit def asTaskRunner(exec: ExecutorService): FutureTaskRunner = + new ThreadPoolRunner { override protected def executor = exec @@ -17,17 +17,16 @@ object JavaConversions { exec.shutdown() } - implicit def asTaskRunner(exec: Executor): TaskRunner[Unit] = - new TaskRunner[Unit] { - type Future[+R] = () => R - - def submit(task: () => Unit): this.Future[Unit] = { - val result = new SyncVar[Either[Throwable, Unit]] - val runnable = new Runnable { - def run() { result set tryCatch(task()) } - } - exec.execute(runnable) - () => ops getOrThrow result.get + implicit def asTaskRunner(exec: Executor): TaskRunner = + new TaskRunner { + type Task[T] = Runnable + + implicit def functionAsTask[T](fun: () => T): Task[T] = new Runnable { + def run() { fun() } + } + + def execute[S](task: Task[S]) { + exec.execute(task) } def managedBlock(blocker: ManagedBlocker) { diff --git a/src/library/scala/concurrent/TaskRunner.scala b/src/library/scala/concurrent/TaskRunner.scala index a393b065fa..e9b7c6916b 100644 --- a/src/library/scala/concurrent/TaskRunner.scala +++ b/src/library/scala/concurrent/TaskRunner.scala @@ -4,16 +4,15 @@ package scala.concurrent * * @author Philipp Haller */ -trait TaskRunner[T] extends AsyncInvokable[() => T, T] { +trait TaskRunner { - def submit(task: () => T): Future[T] + type Task[T] - def shutdown(): Unit + implicit def functionAsTask[S](fun: () => S): Task[S] - def !!(task: () => T): Future[T] = - submit(task) + def execute[S](task: Task[S]): Unit - def managedBlock(blocker: ManagedBlocker): Unit + def shutdown(): Unit /** If expression computed successfully return it in <code>Right</code>, * otherwise return exception in <code>Left</code>. diff --git a/src/library/scala/concurrent/TaskRunners.scala b/src/library/scala/concurrent/TaskRunners.scala index 8219d9d169..cc5c1a5131 100644 --- a/src/library/scala/concurrent/TaskRunners.scala +++ b/src/library/scala/concurrent/TaskRunners.scala @@ -8,10 +8,10 @@ import java.util.concurrent.{ThreadPoolExecutor, LinkedBlockingQueue, TimeUnit} */ object TaskRunners { - implicit val threadRunner: TaskRunner[Unit] = - new ThreadRunner[Unit] + implicit val threadRunner: FutureTaskRunner = + new ThreadRunner - implicit val threadPoolRunner: TaskRunner[Unit] = { + implicit val threadPoolRunner: FutureTaskRunner = { val numCores = Runtime.getRuntime().availableProcessors() val keepAliveTime = 60000L val workQueue = new LinkedBlockingQueue[Runnable] diff --git a/src/library/scala/concurrent/ThreadPoolRunner.scala b/src/library/scala/concurrent/ThreadPoolRunner.scala index cbc5ebb293..e532d2bfff 100644 --- a/src/library/scala/concurrent/ThreadPoolRunner.scala +++ b/src/library/scala/concurrent/ThreadPoolRunner.scala @@ -2,29 +2,36 @@ package scala.concurrent import java.util.concurrent.{ExecutorService, Callable, TimeUnit} -import scala.annotation.unchecked.uncheckedVariance - /** The <code>ThreadPoolRunner</code> trait... * * @author Philipp Haller */ -trait ThreadPoolRunner[T] extends TaskRunner[T] { +trait ThreadPoolRunner extends FutureTaskRunner { + + type Task[T] = Callable[T] with Runnable + type Future[T] = RichFuture[T] + + private class RunCallable[S](fun: () => S) extends Runnable with Callable[S] { + def run() = fun() + def call() = fun() + } + + implicit def functionAsTask[S](fun: () => S): Task[S] = + new RunCallable(fun) - type Future[+R] = RichFuture[R] + implicit def futureAsFunction[S](x: Future[S]): () => S = + () => x.get() - trait RichFuture[+S] extends java.util.concurrent.Future[S @uncheckedVariance] - with (() => S) + trait RichFuture[S] extends java.util.concurrent.Future[S] + with (() => S) protected def executor: ExecutorService - def submit(task: () => T): this.Future[T] = { - val callable = new Callable[T] { - def call() = task() - } - toRichFuture(executor.submit[T](callable)) + def submit[S](task: Task[S]): Future[S] = { + toRichFuture(executor.submit[S](task)) } - def execute(task: Runnable): Unit = + def execute[S](task: Task[S]): Unit = executor execute task def managedBlock(blocker: ManagedBlocker) { diff --git a/src/library/scala/concurrent/ThreadRunner.scala b/src/library/scala/concurrent/ThreadRunner.scala index f48e0384b7..16269fa65f 100644 --- a/src/library/scala/concurrent/ThreadRunner.scala +++ b/src/library/scala/concurrent/ThreadRunner.scala @@ -6,12 +6,23 @@ import java.lang.Thread * * @author Philipp Haller */ -class ThreadRunner[T] extends TaskRunner[T] { +class ThreadRunner extends FutureTaskRunner { - type Future[+S] = () => S + type Task[T] = () => T + type Future[T] = () => T - def submit(task: () => T): this.Future[T] = { - val result = new SyncVar[Either[Exception, T]] + implicit def functionAsTask[S](fun: () => S): Task[S] = fun + implicit def futureAsFunction[S](x: Future[S]): () => S = x + + def execute[S](task: Task[S]) { + val runnable = new Runnable { + def run() { tryCatch(task()) } + } + (new Thread(runnable)).start() + } + + def submit[S](task: Task[S]): Future[S] = { + val result = new SyncVar[Either[Exception, S]] val runnable = new Runnable { def run() { result set tryCatch(task()) } } diff --git a/src/library/scala/concurrent/ops.scala b/src/library/scala/concurrent/ops.scala index e2fb8f0ceb..4ff26d4465 100644 --- a/src/library/scala/concurrent/ops.scala +++ b/src/library/scala/concurrent/ops.scala @@ -20,21 +20,25 @@ import scala.util.control.Exception.allCatch */ object ops { - implicit val defaultRunner: TaskRunner[Unit] = + + implicit val defaultRunner: FutureTaskRunner = TaskRunners.threadRunner /** * If expression computed successfully return it in <code>Right</code>, * otherwise return exception in <code>Left</code>. */ + //TODO: make private def tryCatch[A](body: => A): Either[Throwable, A] = allCatch[A] either body + //TODO: make private def tryCatchEx[A](body: => A): Either[Exception, A] = try Right(body) catch { case ex: Exception => Left(ex) } + //TODO: make private def getOrThrow[T <: Throwable, A](x: Either[T, A]): A = x.fold[A](throw _, identity _) @@ -42,18 +46,16 @@ object ops * * @param p the expression to evaluate */ - def spawn(p: => Unit)(implicit runner: TaskRunner[Unit] = defaultRunner): Unit = { - runner submit (() => p) + def spawn(p: => Unit)(implicit runner: TaskRunner = defaultRunner): Unit = { + runner execute runner.functionAsTask(() => p) } /** * @param p ... * @return ... */ - def future[A](p: => A)(implicit runner: TaskRunner[Unit] = defaultRunner): () => A = { - val result = new SyncVar[Either[Throwable, A]] - spawn({ result set tryCatch(p) })(runner) - () => getOrThrow(result.get) + def future[A](p: => A)(implicit runner: FutureTaskRunner = defaultRunner): () => A = { + runner.futureAsFunction(runner submit runner.functionAsTask(() => p)) } /** |