diff options
Diffstat (limited to 'src/actors')
20 files changed, 52 insertions, 56 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 5972588089..f8da909459 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -13,7 +13,7 @@ package scala.actors import scala.compat.Platform import scala.util.control.ControlException import java.util.{Timer, TimerTask} -import java.util.concurrent.ExecutionException +import java.util.concurrent.{ExecutionException, Callable} /** * The <code>Actor</code> object provides functions for the definition of @@ -398,11 +398,14 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { */ private var onTimeout: Option[TimerTask] = None + private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable { + def call() = fun() + def run() = fun() + } + protected[this] override def makeReaction(fun: () => Unit): Runnable = { if (isSuspended) - new Runnable { - def run() { fun() } - } + new RunCallable(fun) else new ActorTask(this, fun) } diff --git a/src/actors/scala/actors/ActorTask.scala b/src/actors/scala/actors/ActorTask.scala index 331c84a3ee..16c04aa34f 100644 --- a/src/actors/scala/actors/ActorTask.scala +++ b/src/actors/scala/actors/ActorTask.scala @@ -12,6 +12,7 @@ package scala.actors import java.lang.Runnable +import java.util.concurrent.Callable /** <p> * The class <code>ActorTask</code>... @@ -19,7 +20,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -private[actors] class ActorTask extends Runnable { +private[actors] class ActorTask extends Callable[Unit] with Runnable { private var a: Actor = null private var fun: () => Unit = null @@ -30,6 +31,8 @@ private[actors] class ActorTask extends Runnable { this.fun = fun } + def call() = run() + def run() { val saved = Actor.tl.get Actor.tl set a diff --git a/src/actors/scala/actors/ReactorTask.scala b/src/actors/scala/actors/ReactorTask.scala index 30a0fc988b..824e95090d 100644 --- a/src/actors/scala/actors/ReactorTask.scala +++ b/src/actors/scala/actors/ReactorTask.scala @@ -12,6 +12,7 @@ package scala.actors import java.lang.Runnable +import java.util.concurrent.Callable /** <p> * The class <code>ReactorTask</code>... @@ -19,7 +20,7 @@ import java.lang.Runnable * * @author Philipp Haller */ -private[actors] class ReactorTask extends Runnable { +private[actors] class ReactorTask extends Callable[Unit] with Runnable { private var reactor: Reactor = null private var fun: () => Unit = null @@ -30,6 +31,8 @@ private[actors] class ReactorTask extends Runnable { this.fun = fun } + def call() = run() + def run() { val saved = Actor.tl.get Actor.tl set reactor diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala index 39d87241a6..62247a6b8e 100644 --- a/src/actors/scala/actors/Replyable.scala +++ b/src/actors/scala/actors/Replyable.scala @@ -16,11 +16,8 @@ package scala.actors * * @author Philipp Haller */ -trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { -/* - def apply(msg: T): this.Future[R] = - this !! msg -*/ +trait Replyable[-T, +R] { + /** * Sends <code>msg</code> to this Replyable and awaits reply * (synchronous). @@ -48,8 +45,8 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { * @param msg the message to be sent * @return the future */ - def !!(msg: T): this.Future[R] -// () => this !? msg + def !!(msg: T): () => R = + () => this !? msg /** * Sends <code>msg</code> to this actor and immediately @@ -62,7 +59,7 @@ trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { * @param f the function to be applied to the response * @return the future */ - def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P] -// () => f(this !? msg) + def !![P](msg: T, f: PartialFunction[R, P]): () => P = + () => f(this !? msg) } diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 6752fbb7b4..84168abe0a 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -20,8 +20,6 @@ package scala.actors trait ReplyableReactor extends Replyable[Any, Any] { thiz: ReplyReactor => - type Future[+S] = scala.actors.Future[S] - /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous). diff --git a/src/actors/scala/actors/forkjoin/ForkJoinPool.java b/src/actors/scala/actors/forkjoin/ForkJoinPool.java index cd4444ea97..ba30f3a161 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinPool.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinPool.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.locks.*; diff --git a/src/actors/scala/actors/forkjoin/ForkJoinTask.java b/src/actors/scala/actors/forkjoin/ForkJoinTask.java index 230c7a0a20..e6c0fa7bb4 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinTask.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinTask.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.io.Serializable; import java.util.*; import java.util.concurrent.*; diff --git a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java index 2d75987f91..941f5ec0cb 100644 --- a/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java +++ b/src/actors/scala/actors/forkjoin/ForkJoinWorkerThread.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; diff --git a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java index d4a9760dfd..3055e3b68f 100644 --- a/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java +++ b/src/actors/scala/actors/forkjoin/LinkedTransferQueue.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.concurrent.*; import java.util.concurrent.locks.*; import java.util.concurrent.atomic.*; diff --git a/src/actors/scala/actors/forkjoin/RecursiveAction.java b/src/actors/scala/actors/forkjoin/RecursiveAction.java index fc813ec8b6..2d36f7eb33 100644 --- a/src/actors/scala/actors/forkjoin/RecursiveAction.java +++ b/src/actors/scala/actors/forkjoin/RecursiveAction.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; /** * Recursive resultless ForkJoinTasks. This class establishes diff --git a/src/actors/scala/actors/forkjoin/RecursiveTask.java b/src/actors/scala/actors/forkjoin/RecursiveTask.java index 5286174fdf..1f3110580b 100644 --- a/src/actors/scala/actors/forkjoin/RecursiveTask.java +++ b/src/actors/scala/actors/forkjoin/RecursiveTask.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; /** * Recursive result-bearing ForkJoinTasks. diff --git a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java index 1fa3bcd71e..34e2e37f37 100644 --- a/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java +++ b/src/actors/scala/actors/forkjoin/ThreadLocalRandom.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.*; /** diff --git a/src/actors/scala/actors/forkjoin/TransferQueue.java b/src/actors/scala/actors/forkjoin/TransferQueue.java index 27ee9f463b..9c7b2289c4 100644 --- a/src/actors/scala/actors/forkjoin/TransferQueue.java +++ b/src/actors/scala/actors/forkjoin/TransferQueue.java @@ -4,7 +4,7 @@ * http://creativecommons.org/licenses/publicdomain */ -package scala.actors.forkjoin; +package scala.concurrent.forkjoin; import java.util.concurrent.*; /** diff --git a/src/actors/scala/actors/forkjoin/package-info.java b/src/actors/scala/actors/forkjoin/package-info.java index 4945bc80fc..b8fa0fad02 100644 --- a/src/actors/scala/actors/forkjoin/package-info.java +++ b/src/actors/scala/actors/forkjoin/package-info.java @@ -26,4 +26,4 @@ * are those that directly implement this algorithmic design pattern. * */ -package jsr166y; +package scala.concurrent.forkjoin; diff --git a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala index af5bc2c595..257fe92a91 100644 --- a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala +++ b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala @@ -2,7 +2,7 @@ package scala.actors package scheduler import java.util.Collection -import forkjoin.{ForkJoinPool, ForkJoinTask} +import scala.concurrent.forkjoin.{ForkJoinPool, ForkJoinTask} private class DrainableForkJoinPool extends ForkJoinPool { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index 07a014f92d..b1c0da256c 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,6 +11,7 @@ package scala.actors package scheduler +import java.util.concurrent.Callable import scala.concurrent.ThreadPoolRunner /** @@ -19,7 +20,24 @@ import scala.concurrent.ThreadPoolRunner * * @author Philipp Haller */ -trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] { +trait ExecutorScheduler extends IScheduler with ThreadPoolRunner { + + def execute(task: Runnable) { + super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]]) + } + + private class RunCallable(fun: => Unit) extends Callable[Unit] with Runnable { + def call() { fun } + def run() { fun } + } + + /** Submits a closure for execution. + * + * @param fun the closure to be executed + */ + override def execute(fun: => Unit) { + super[ThreadPoolRunner].execute((new RunCallable(fun)).asInstanceOf[Task[Unit]]) + } /** This method is called when the scheduler shuts down. */ diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala index 82e8f5c2fd..b7f68be3b3 100644 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -3,7 +3,7 @@ package scheduler import java.lang.Thread.State import java.util.{Collection, ArrayList} -import forkjoin._ +import scala.concurrent.forkjoin._ /** The <code>ForkJoinScheduler</code> is backed by a lightweight * fork-join task execution framework. @@ -95,6 +95,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor } override def executeFromActor(task: Runnable) { + // TODO: only pass RecursiveAction (with Runnable), and cast to it val recAction = new RecursiveAction { def compute() = task.run() } diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala index ab86161dfb..d1528c9e5b 100644 --- a/src/actors/scala/actors/scheduler/SchedulerService.scala +++ b/src/actors/scala/actors/scheduler/SchedulerService.scala @@ -74,15 +74,6 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with ActorGC { } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala index aafa620a08..de63392962 100644 --- a/src/actors/scala/actors/scheduler/TerminationService.scala +++ b/src/actors/scala/actors/scheduler/TerminationService.scala @@ -55,15 +55,6 @@ abstract class TerminationService(terminate: Boolean) } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index fd8a0e6a64..c43f541cbd 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -82,15 +82,6 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, } } - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - /** Shuts down the scheduler. */ def shutdown(): Unit = synchronized { |