summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-10-06 17:20:56 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-10-06 17:20:56 +0000
commit8828cd99841f678dd69f3aabea77f6f9ddfeea26 (patch)
tree484b0484faa41137dd93f0cfee9324917a92c11c /src/actors
parente3d9ce3e0923077911fbc26df0cac3a9137be966 (diff)
downloadscala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.tar.gz
scala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.tar.bz2
scala-8828cd99841f678dd69f3aabea77f6f9ddfeea26.zip
ForkJoinScheduler only adjusts pool size when a...
ForkJoinScheduler only adjusts pool size when an actor calls receive/receiveWithin.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala30
1 files changed, 8 insertions, 22 deletions
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index bbcbfa90f4..86fb14db50 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -1,7 +1,6 @@
package scala.actors
package scheduler
-import java.lang.Thread.State
import java.util.{Collection, ArrayList}
import scala.concurrent.forkjoin._
@@ -12,9 +11,9 @@ import scala.concurrent.forkjoin._
*/
class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Runnable with IScheduler with TerminationMonitor {
- private var pool = makeNewPool()
- private var terminating = false
- private var snapshoting = false
+ private var pool = makeNewPool() // guarded by this
+ private var terminating = false // guarded by this
+ private var snapshoting = false // guarded by this
// this has to be a java.util.Collection, since this is what
// the ForkJoinPool returns.
@@ -55,15 +54,6 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
}
}
- private def allWorkersBlocked: Boolean =
- (pool.workers != null) &&
- pool.workers.forall(t => {
- (t == null) || {
- val s = t.getState()
- s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
- }
- })
-
override def run() {
try {
while (true) {
@@ -78,18 +68,12 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
throw new QuitException
if (allTerminated) {
- //Debug.info(this+": all actors terminated")
+ Debug.info(this+": all actors terminated")
throw new QuitException
}
if (!snapshoting) {
gc()
-
- // check if we need more threads to avoid deadlock
- val poolSize = pool.getPoolSize()
- if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) {
- pool.setParallelism(poolSize + 1)
- }
} else if (pool.isQuiescent()) {
val list = new ArrayList[ForkJoinTask[_]]
val num = pool.drainTasksTo(list)
@@ -142,8 +126,9 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
terminating = true
}
- def isActive =
+ def isActive = synchronized {
(pool ne null) && !pool.isShutdown()
+ }
override def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
@@ -170,8 +155,9 @@ class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean
error("scheduler is still active")
else
snapshoting = false
+
+ pool = makeNewPool()
}
- pool = makeNewPool()
val iter = drainedTasks.iterator()
while (iter.hasNext()) {
pool.execute(iter.next())