diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-07-11 21:07:42 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-07-11 21:07:42 +0000 |
commit | fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa (patch) | |
tree | 2da2ef33c673a5ea2bd02d142ea6aa3c5e852669 /src | |
parent | a25a8c309a6ec7dfa23d039d0f2ed1110eb12652 (diff) | |
download | scala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.tar.gz scala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.tar.bz2 scala-fdd7b82c5a6fe2fb2c9ac1520d28f0dffc7580fa.zip |
Added ThreadPoolScheduler supporting managedBlock.
Diffstat (limited to 'src')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/DelegatingScheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/ForkJoinScheduler.scala | 9 | ||||
-rw-r--r-- | src/actors/scala/actors/IScheduler.scala | 4 | ||||
-rw-r--r-- | src/actors/scala/actors/ManagedBlocker.scala | 20 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/ThreadPoolScheduler.scala | 73 |
7 files changed, 107 insertions, 15 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 64e6e31eab..015ec917b6 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -14,8 +14,6 @@ import scala.compat.Platform import java.util.{Timer, TimerTask} import java.util.concurrent.ExecutionException -import forkjoin.ForkJoinPool - /** * The <code>Actor</code> object provides functions for the definition of * actors, as well as actor operations, such as @@ -644,7 +642,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { scheduler execute task } - class ActorBlocker(timeout: Long) extends ForkJoinPool.ManagedBlocker { + class ActorBlocker(timeout: Long) extends ManagedBlocker { def block() = { if (timeout > 0) Actor.this.suspendActorFor(timeout) diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala index 78c4020bb0..50fad31606 100644 --- a/src/actors/scala/actors/DelegatingScheduler.scala +++ b/src/actors/scala/actors/DelegatingScheduler.scala @@ -8,8 +8,6 @@ package scala.actors -import forkjoin.ForkJoinPool - /** * @author Erik Engbrecht */ @@ -51,6 +49,6 @@ trait DelegatingScheduler extends IScheduler { def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f) - override def managedBlock(blocker: ForkJoinPool.ManagedBlocker): Unit = + override def managedBlock(blocker: ManagedBlocker): Unit = impl.managedBlock(blocker) } diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala index 59cc14fbb4..74595acde6 100644 --- a/src/actors/scala/actors/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/ForkJoinScheduler.scala @@ -16,7 +16,7 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor { private val CHECK_FREQ = 50 - override def managedBlock(blocker: ForkJoinPool.ManagedBlocker) { + override def managedBlock(blocker: ManagedBlocker) { ForkJoinPool.managedBlock(blocker, true) } @@ -41,6 +41,13 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor { } catch { case _: QuitException => Debug.info(this+": initiating shutdown...") + while (!pool.isQuiescent()) { + try { + Thread.sleep(10) + } catch { + case ignore: InterruptedException => + } + } pool.shutdown() // allow thread to exit } diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala index 811fd47ce3..3c87c64b1f 100644 --- a/src/actors/scala/actors/IScheduler.scala +++ b/src/actors/scala/actors/IScheduler.scala @@ -10,8 +10,6 @@ package scala.actors -import forkjoin.ForkJoinPool - /** * The <code>IScheduler</code> trait provides a common interface * for all schedulers used to execute actor tasks. @@ -66,7 +64,7 @@ trait IScheduler { */ def onTerminate(a: Reactor)(f: => Unit): Unit - def managedBlock(blocker: ForkJoinPool.ManagedBlocker) { + def managedBlock(blocker: ManagedBlocker) { blocker.block() } } diff --git a/src/actors/scala/actors/ManagedBlocker.scala b/src/actors/scala/actors/ManagedBlocker.scala new file mode 100644 index 0000000000..f3fd08301b --- /dev/null +++ b/src/actors/scala/actors/ManagedBlocker.scala @@ -0,0 +1,20 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import forkjoin.ForkJoinPool + +/** + * The <code>ManagedBlocker</code> trait... + * + * @author Philipp Haller + */ +trait ManagedBlocker extends ForkJoinPool.ManagedBlocker diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index bd083691e0..d81a9d1bc5 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -25,16 +25,14 @@ object Scheduler extends DelegatingScheduler { Debug.info("initializing "+this+"...") def makeNewScheduler: IScheduler = { -/* val workQueue = new LinkedBlockingQueue[Runnable](100000) val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, 50L, TimeUnit.MILLISECONDS, workQueue) - val s = new SimpleExecutorScheduler(threadPool, true) -*/ - val s = new ForkJoinScheduler + val s = new ThreadPoolScheduler(threadPool, true) + //val s = new ForkJoinScheduler Debug.info(this+": starting new "+s+" ["+s.getClass+"]") s.start() s @@ -45,7 +43,7 @@ object Scheduler extends DelegatingScheduler { /* Assumes <code>sched</code> holds an instance * of <code>FJTaskScheduler2</code>. */ - def snapshot(): Unit = synchronized { + @deprecated def snapshot(): Unit = synchronized { if (sched.isInstanceOf[FJTaskScheduler2]) { val fjts = sched.asInstanceOf[FJTaskScheduler2] tasks = fjts.snapshot() diff --git a/src/actors/scala/actors/ThreadPoolScheduler.scala b/src/actors/scala/actors/ThreadPoolScheduler.scala new file mode 100644 index 0000000000..c76d2c4f5c --- /dev/null +++ b/src/actors/scala/actors/ThreadPoolScheduler.scala @@ -0,0 +1,73 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ + +package scala.actors + +import scala.collection.mutable.HashMap +import java.util.concurrent.{ThreadPoolExecutor, RejectedExecutionException} + +/** + * The <code>ThreadPoolScheduler</code> class uses an + * <code>ExecutorService</code> to execute <code>Actor</code>s. It + * does not start an additional thread. + * + * A <code>ThreadPoolScheduler</code> attempts to shut down + * the underlying <code>ExecutorService</code> only if + * <code>terminate</code> is set to true. + * + * Otherwise, the <code>ExecutorService</code> must be shut down either + * directly or by shutting down the + * <code>ThreadPoolScheduler</code> instance. + * + * @author Philipp Haller + */ +class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor, + protected var terminate: Boolean) extends TerminationService(terminate) { + + /* This constructor (and the var above) is currently only used to work + * around a bug in scaladoc, which cannot deal with early initializers + * (to be used in subclasses such as DefaultExecutorScheduler) properly. + */ + def this() { + this(null, true) + } + + /** Submits a <code>Runnable</code> for execution. + * + * @param task the task to be executed + */ + def execute(task: Runnable) { + try { + executor execute task + } catch { + case ree: RejectedExecutionException => + // run task on current thread + task.run() + } + } + + def onShutdown() { + executor.shutdown() + } + + /** The scheduler is active if the underlying <code>ExecutorService</code> + * has not been shut down. + */ + def isActive = + (executor ne null) && !executor.isShutdown() + + override def managedBlock(blocker: ManagedBlocker) { + val coreSize = executor.getCorePoolSize() + if ((executor.getActiveCount() >= coreSize - 1) && coreSize < ThreadPoolConfig.maxPoolSize) { + executor.setCorePoolSize(coreSize + 1) + } + blocker.block() + } +} |