diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-10-20 16:21:29 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-10-20 16:21:29 +0000 |
commit | 5a817fdbf7652f5ab18c011eea405363507261fe (patch) | |
tree | bdb440a85f00d668f855c9b413baf6f03a3bf90e /src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala | |
parent | feb088b2bc8a3e78d1f8ac32b540b78a5b9c01d6 (diff) | |
download | scala-5a817fdbf7652f5ab18c011eea405363507261fe.tar.gz scala-5a817fdbf7652f5ab18c011eea405363507261fe.tar.bz2 scala-5a817fdbf7652f5ab18c011eea405363507261fe.zip |
Do not use ForkJoinPool when running on IBM J9;...
Do not use ForkJoinPool when running on IBM J9; in this case use 1.5
ThreadPoolScheduler.
Diffstat (limited to 'src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala')
-rw-r--r-- | src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala | 189 |
1 files changed, 189 insertions, 0 deletions
diff --git a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala new file mode 100644 index 0000000000..3433e51fdc --- /dev/null +++ b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala @@ -0,0 +1,189 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: ThreadPoolScheduler.scala 18948 2009-10-06 17:30:27Z phaller $ + +package scala.actors.scheduler + +import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} +import scala.actors.{Debug, IScheduler} +import scala.concurrent.ManagedBlocker + +/** + * This scheduler class uses a <code>ThreadPoolExecutor</code> + * to execute <code>Actor</code>s. + * + * The scheduler attempts to shut down itself and the underlying + * <code>ThreadPoolExecutor</code> only if <code>terminate</code> + * is set to true. Otherwise, the scheduler must be shut down + * explicitly. + * + * @author Philipp Haller + */ +class ResizableThreadPoolScheduler(protected val terminate: Boolean, + protected val daemon: Boolean) + extends Thread with IScheduler with TerminationMonitor { + + setDaemon(daemon) + + // guarded by this + private var terminating = false + // guarded by this + private var suspending = false + + // this has to be a java.util.Collection, since this is what + // the ForkJoinPool returns. + @volatile + private var drainedTasks: java.util.List[_] = null + + // guarded by this + private var coreSize = ThreadPoolConfig.corePoolSize + private val maxSize = ThreadPoolConfig.maxPoolSize + private val numCores = Runtime.getRuntime().availableProcessors() + + protected val CHECK_FREQ = 10 + + private def makeNewPool(): ThreadPoolExecutor = { + val workQueue = new LinkedBlockingQueue + new ThreadPoolExecutor(coreSize, + maxSize, + 60000L, + TimeUnit.MILLISECONDS, + workQueue, + new ThreadPoolExecutor.CallerRunsPolicy) + } + + // guarded by this + private var executor = makeNewPool() + + Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) + + def this(d: Boolean) { + this(true, d) + } + + def this() { + this(false) + } + + private def numWorkersBlocked = { + executor.mainLock.lock() + val iter = executor.workers.iterator() + var numBlocked = 0 + while (iter.hasNext()) { + val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker] + if (w.tryLock()) { + // worker is idle + w.unlock() + } else { + val s = w.thread.getState() + if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING) + numBlocked += 1 + } + } + executor.mainLock.unlock() + numBlocked + } + + override def run() { + try { + while (true) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + } + + if (terminating) + throw new QuitException + + if (!suspending) { + gc() + + // check if we need more worker threads + val activeBlocked = numWorkersBlocked + if (coreSize - activeBlocked < numCores && coreSize < maxSize) { + coreSize = numCores + activeBlocked + executor.setCorePoolSize(coreSize) + } else if (terminate && allTerminated) { + // if all worker threads idle terminate + if (executor.getActiveCount() == 0) { + Debug.info(this+": initiating shutdown...") + Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) + + terminating = true + throw new QuitException + } + } + } else { + drainedTasks = executor.shutdownNow() + Debug.info(this+": drained "+drainedTasks.size()+" tasks") + terminating = true + throw new QuitException + } + } // sync + } + } catch { + case _: QuitException => + executor.shutdown() + // allow thread to exit + } + } + + def execute(task: Runnable): Unit = + executor execute task + + def execute(fun: => Unit): Unit = + executor.execute(new Runnable { + def run() { fun } + }) + + /** Shuts down the scheduler. + */ + def shutdown(): Unit = synchronized { + terminating = true + } + + def isActive = synchronized { + !terminating && (executor ne null) && !executor.isShutdown() + } + + def managedBlock(blocker: ManagedBlocker) { + blocker.block() + } + + /** Suspends the scheduler. All threads that were in use by the + * scheduler and its internal thread pool are terminated. + */ + def snapshot() = synchronized { + suspending = true + } + + /** Resumes the execution of the scheduler if it was previously + * suspended using <code>snapshot</code>. + */ + def restart() { + synchronized { + if (!suspending) + error("snapshot has not been invoked") + else if (isActive) + error("scheduler is still active") + else + suspending = false + + executor = makeNewPool() + } + val iter = drainedTasks.iterator() + while (iter.hasNext()) { + executor.execute(iter.next().asInstanceOf[Runnable]) + } + start() + } + +} |