diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-10-06 17:20:56 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-10-06 17:20:56 +0000 |
commit | 8828cd99841f678dd69f3aabea77f6f9ddfeea26 (patch) | |
tree | 484b0484faa41137dd93f0cfee9324917a92c11c /src/actors | |
parent | e3d9ce3e0923077911fbc26df0cac3a9137be966 (diff) | |
download | scala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.tar.gz scala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.tar.bz2 scala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.zip |
ForkJoinScheduler only adjusts pool size when a...
ForkJoinScheduler only adjusts pool size when an actor calls
receive/receiveWithin.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/scheduler/ForkJoinScheduler.scala | 30 |
1 files changed, 8 insertions, 22 deletions
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala index bbcbfa90f4..86fb14db50 100644 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala @@ -1,7 +1,6 @@ package scala.actors package scheduler -import java.lang.Thread.State import java.util.{Collection, ArrayList} import scala.concurrent.forkjoin._ @@ -12,9 +11,9 @@ import scala.concurrent.forkjoin._ */ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Runnable with IScheduler with TerminationMonitor { - private var pool = makeNewPool() - private var terminating = false - private var snapshoting = false + private var pool = makeNewPool() // guarded by this + private var terminating = false // guarded by this + private var snapshoting = false // guarded by this // this has to be a java.util.Collection, since this is what // the ForkJoinPool returns. @@ -55,15 +54,6 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean } } - private def allWorkersBlocked: Boolean = - (pool.workers != null) && - pool.workers.forall(t => { - (t == null) || { - val s = t.getState() - s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING - } - }) - override def run() { try { while (true) { @@ -78,18 +68,12 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean throw new QuitException if (allTerminated) { - //Debug.info(this+": all actors terminated") + Debug.info(this+": all actors terminated") throw new QuitException } if (!snapshoting) { gc() - - // check if we need more threads to avoid deadlock - val poolSize = pool.getPoolSize() - if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) { - pool.setParallelism(poolSize + 1) - } } else if (pool.isQuiescent()) { val list = new ArrayList[ForkJoinTask[_]] val num = pool.drainTasksTo(list) @@ -142,8 +126,9 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean terminating = true } - def isActive = + def isActive = synchronized { (pool ne null) && !pool.isShutdown() + } override def managedBlock(blocker: scala.concurrent.ManagedBlocker) { ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { @@ -170,8 +155,9 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean error("scheduler is still active") else snapshoting = false + + pool = makeNewPool() } - pool = makeNewPool() val iter = drainedTasks.iterator() while (iter.hasNext()) { pool.execute(iter.next()) |