From ac89702827559cab8835edecb35cc09a1ca3fe10 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Mon, 3 Aug 2009 15:56:44 +0000 Subject: Added the scala.concurrent.TaskRunner and scala... Added the scala.concurrent.TaskRunner and scala.concurrent.AsyncInvokable abstractions with corresponding refactorings in scala.actors and scala.concurrent. --- src/actors/scala/actors/Actor.scala | 4 ++-- src/actors/scala/actors/FJTaskScheduler2.scala | 6 ++--- src/actors/scala/actors/IScheduler.scala | 10 ++++---- src/actors/scala/actors/ManagedBlocker.scala | 20 ---------------- src/actors/scala/actors/Replyable.scala | 15 +++++++----- src/actors/scala/actors/ReplyableActor.scala | 1 + src/actors/scala/actors/ReplyableReactor.scala | 2 ++ src/actors/scala/actors/Scheduler.scala | 7 +++--- src/actors/scala/actors/SchedulerAdapter.scala | 7 +++--- src/actors/scala/actors/remote/Proxy.scala | 2 ++ .../scheduler/DefaultExecutorScheduler.scala | 4 ++-- .../actors/scheduler/DelegatingScheduler.scala | 4 +++- .../scala/actors/scheduler/ExecutorScheduler.scala | 27 ++++------------------ .../scala/actors/scheduler/ForkJoinScheduler.scala | 9 +++++--- .../actors/scheduler/SingleThreadedScheduler.scala | 7 +++--- .../actors/scheduler/ThreadPoolScheduler.scala | 5 ++-- 16 files changed, 53 insertions(+), 77 deletions(-) delete mode 100644 src/actors/scala/actors/ManagedBlocker.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index f9e79ce98f..aa65f5a6ac 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -643,7 +643,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { scheduler executeFromActor task } - private class ActorBlocker(timeout: Long) extends ManagedBlocker { + private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker { def block() = { if (timeout > 0) Actor.this.suspendActorFor(timeout) @@ -651,7 +651,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { Actor.this.suspendActor() true } - def isReleasable() = + def isReleasable = !Actor.this.isSuspended } diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index e789e96be8..b12ef2d925 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -121,9 +121,6 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) def execute(task: Runnable): Unit = executor execute task - def executeFromActor(task: Runnable) = - execute(task) - def execute(fun: => Unit): Unit = executor.execute(new Runnable { def run() { fun } @@ -142,4 +139,7 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) def isActive = !terminating && !suspending + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index 8f3b243b71..f4034fbfab 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -16,8 +16,7 @@ package scala.actors * * Subclasses of Actor that override its * scheduler member must provide - * an implementation of the IScheduler - * trait. + * an IScheduler implementation. * * @author Philipp Haller */ @@ -35,7 +34,8 @@ trait IScheduler { */ def execute(task: Runnable): Unit - def executeFromActor(task: Runnable): Unit + def executeFromActor(task: Runnable): Unit = + execute(task) /** Shuts down the scheduler. */ @@ -66,7 +66,5 @@ trait IScheduler { */ def onTerminate(a: Reactor)(f: => Unit): Unit - def managedBlock(blocker: ManagedBlocker) { - blocker.block() - } + def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit } diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala deleted file mode 100644 index f3fd08301b..0000000000 --- a/src/actors/scala/actors/ManagedBlocker.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -import forkjoin.ForkJoinPool - -/** - * The ManagedBlocker trait... - * - * @author Philipp Haller - */ -trait ManagedBlocker extends ForkJoinPool.ManagedBlocker diff --git a/src/actors/scala/actors/Replyable.scala b/src/actors/scala/actors/Replyable.scala index 99c65d29f3..39d87241a6 100644 --- a/src/actors/scala/actors/Replyable.scala +++ b/src/actors/scala/actors/Replyable.scala @@ -16,8 +16,11 @@ package scala.actors * * @author Philipp Haller */ -trait Replyable[-T, +R] { - +trait Replyable[-T, +R] extends scala.concurrent.AsyncInvokable[T, R] { +/* + def apply(msg: T): this.Future[R] = + this !! msg +*/ /** * Sends msg to this Replyable and awaits reply * (synchronous). @@ -45,8 +48,8 @@ trait Replyable[-T, +R] { * @param msg the message to be sent * @return the future */ - def !!(msg: T): (() => R) = - () => this !? msg + def !!(msg: T): this.Future[R] +// () => this !? msg /** * Sends msg to this actor and immediately @@ -59,7 +62,7 @@ trait Replyable[-T, +R] { * @param f the function to be applied to the response * @return the future */ - def !![P](msg: T, f: PartialFunction[R, P]): (() => P) = - () => f(this !? msg) + def !![P](msg: T, f: PartialFunction[R, P]): this.Future[P] +// () => f(this !? msg) } diff --git a/src/actors/scala/actors/ReplyableActor.scala b/src/actors/scala/actors/ReplyableActor.scala index 1e1487bf39..8ba4f13842 100644 --- a/src/actors/scala/actors/ReplyableActor.scala +++ b/src/actors/scala/actors/ReplyableActor.scala @@ -84,6 +84,7 @@ trait ReplyableActor extends ReplyableReactor { override def !!(msg: Any): Future[Any] = { val ftch = new Channel[Any](Actor.self(thiz.scheduler)) val linkedChannel = new AbstractActor { + type Future[+R] = scala.actors.Future[R] def !(msg: Any) = ftch ! msg def send(msg: Any, replyTo: OutputChannel[Any]) = diff --git a/src/actors/scala/actors/ReplyableReactor.scala b/src/actors/scala/actors/ReplyableReactor.scala index 84168abe0a..6752fbb7b4 100644 --- a/src/actors/scala/actors/ReplyableReactor.scala +++ b/src/actors/scala/actors/ReplyableReactor.scala @@ -20,6 +20,8 @@ package scala.actors trait ReplyableReactor extends Replyable[Any, Any] { thiz: ReplyReactor => + type Future[+S] = scala.actors.Future[S] + /** * Sends msg to this actor and awaits reply * (synchronous). diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index cb764e963b..894dbc93e8 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -24,12 +24,13 @@ object Scheduler extends DelegatingScheduler { Debug.info("initializing "+this+"...") def makeNewScheduler: IScheduler = { - val workQueue = new LinkedBlockingQueue[Runnable](100000) + val workQueue = new LinkedBlockingQueue[Runnable] val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, - 50L, + 60000L, TimeUnit.MILLISECONDS, - workQueue) + workQueue, + new ThreadPoolExecutor.CallerRunsPolicy) val s = new ThreadPoolScheduler(threadPool, true) //val s = new ForkJoinScheduler Debug.info(this+": starting new "+s+" ["+s.getClass+"]") diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala index 6355ee1ace..ba7822c372 100644 --- a/src/actors/scala/actors/SchedulerAdapter.scala +++ b/src/actors/scala/actors/SchedulerAdapter.scala @@ -28,9 +28,6 @@ trait SchedulerAdapter extends IScheduler { def execute(task: Runnable): Unit = execute { task.run() } - def executeFromActor(task: Runnable): Unit = - execute(task) - /** Shuts down the scheduler. */ def shutdown(): Unit = @@ -64,4 +61,8 @@ trait SchedulerAdapter extends IScheduler { */ def onTerminate(a: Reactor)(f: => Unit) = Scheduler.onTerminate(a)(f) + + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/remote/Proxy.scala b/src/actors/scala/actors/remote/Proxy.scala index d99a8d3c1c..60bcd34221 100644 --- a/src/actors/scala/actors/remote/Proxy.scala +++ b/src/actors/scala/actors/remote/Proxy.scala @@ -20,6 +20,8 @@ import scala.collection.mutable.HashMap private[remote] class Proxy(node: Node, name: Symbol, @transient var kernel: NetKernel) extends AbstractActor { import java.io.{IOException, ObjectOutputStream, ObjectInputStream} + type Future[+R] = scala.actors.Future[R] + @transient private[remote] var del: Actor = null startDelegate() diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala index 88ccb6dce8..4a8f9d034d 100644 --- a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala @@ -42,12 +42,12 @@ class DefaultExecutorScheduler(daemon: Boolean) private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, - 50L, + 60000L, TimeUnit.MILLISECONDS, workQueue, threadFactory) val executor = threadPool - override val CHECK_FREQ = 50 + override val CHECK_FREQ = 10 } diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala index 4bd2d09bd0..e72acd09d8 100644 --- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala +++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala @@ -9,6 +9,8 @@ package scala.actors package scheduler +import scala.concurrent.ManagedBlocker + /** * @author Erik Engbrecht */ @@ -37,7 +39,7 @@ trait DelegatingScheduler extends IScheduler { def execute(task: Runnable) = impl.execute(task) - def executeFromActor(task: Runnable) = impl.executeFromActor(task) + override def executeFromActor(task: Runnable) = impl.executeFromActor(task) def shutdown(): Unit = synchronized { if (sched ne null) { diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala index 8e032242f6..07a014f92d 100644 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala @@ -11,7 +11,7 @@ package scala.actors package scheduler -import java.util.concurrent.{ExecutorService, RejectedExecutionException} +import scala.concurrent.ThreadPoolRunner /** * The ExecutorScheduler class uses an @@ -19,29 +19,9 @@ import java.util.concurrent.{ExecutorService, RejectedExecutionException} * * @author Philipp Haller */ -trait ExecutorScheduler extends IScheduler { +trait ExecutorScheduler extends IScheduler with ThreadPoolRunner[Unit] { - protected def executor: ExecutorService - - /** Submits a Runnable for execution. - * - * @param task the task to be executed - */ - def execute(task: Runnable) { - try { - executor execute task - } catch { - case ree: RejectedExecutionException => - // run task on current thread - task.run() - } - } - - def executeFromActor(task: Runnable) = - execute(task) - - /** This method is called when the SchedulerService - * shuts down. + /** This method is called when the scheduler shuts down. */ def onShutdown(): Unit = executor.shutdown() @@ -51,4 +31,5 @@ trait ExecutorScheduler extends IScheduler { */ def isActive = (executor ne null) && !executor.isShutdown + } diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala index 5d6d676b1a..82e8f5c2fd 100644 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -94,7 +94,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor pool.execute(task) } - def executeFromActor(task: Runnable) { + override def executeFromActor(task: Runnable) { val recAction = new RecursiveAction { def compute() = task.run() } @@ -110,8 +110,11 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor def run() { fun } }) - override def managedBlock(blocker: ManagedBlocker) { - ForkJoinPool.managedBlock(blocker, true) + override def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { + def block = blocker.block() + def isReleasable() = blocker.isReleasable + }, true) } /** Shuts down the scheduler. diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala index c7e588f2ed..4ad865a15d 100644 --- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala +++ b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala @@ -24,9 +24,6 @@ class SingleThreadedScheduler extends IScheduler { task.run() } - def executeFromActor(task: Runnable) = - execute(task) - def execute(fun: => Unit): Unit = execute(new Runnable { def run() { fun } @@ -39,4 +36,8 @@ class SingleThreadedScheduler extends IScheduler { def onTerminate(actor: Reactor)(f: => Unit) {} def isActive = true + + def managedBlock(blocker: scala.concurrent.ManagedBlocker) { + blocker.block() + } } diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala index 4826d44383..fd8a0e6a64 100644 --- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala +++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala @@ -12,6 +12,7 @@ package scala.actors package scheduler import java.util.concurrent.ThreadPoolExecutor +import scala.concurrent.ManagedBlocker /** * The ThreadPoolScheduler class uses an @@ -44,7 +45,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, override def managedBlock(blocker: ManagedBlocker) { val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) { executor.setCorePoolSize(coreSize + 1) } blocker.block() @@ -67,7 +68,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, throw new QuitException val coreSize = executor.getCorePoolSize() - if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) { executor.setCorePoolSize(coreSize + 1) } } -- cgit v1.2.3