summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-10-20 16:21:29 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-10-20 16:21:29 +0000
commit5a817fdbf7652f5ab18c011eea405363507261fe (patch)
treebdb440a85f00d668f855c9b413baf6f03a3bf90e /src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
parentfeb088b2bc8a3e78d1f8ac32b540b78a5b9c01d6 (diff)
downloadscala-5a817fdbf7652f5ab18c011eea405363507261fe.tar.gz
scala-5a817fdbf7652f5ab18c011eea405363507261fe.tar.bz2
scala-5a817fdbf7652f5ab18c011eea405363507261fe.zip
Do not use ForkJoinPool when running on IBM J9;...
Do not use ForkJoinPool when running on IBM J9; in this case use 1.5 ThreadPoolScheduler.
Diffstat (limited to 'src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala')
-rw-r--r--src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala189
1 files changed, 189 insertions, 0 deletions
diff --git a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
new file mode 100644
index 0000000000..3433e51fdc
--- /dev/null
+++ b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
@@ -0,0 +1,189 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id: ThreadPoolScheduler.scala 18948 2009-10-06 17:30:27Z phaller $
+
+package scala.actors.scheduler
+
+import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue}
+import scala.actors.{Debug, IScheduler}
+import scala.concurrent.ManagedBlocker
+
+/**
+ * This scheduler class uses a <code>ThreadPoolExecutor</code>
+ * to execute <code>Actor</code>s.
+ *
+ * The scheduler attempts to shut down itself and the underlying
+ * <code>ThreadPoolExecutor</code> only if <code>terminate</code>
+ * is set to true. Otherwise, the scheduler must be shut down
+ * explicitly.
+ *
+ * @author Philipp Haller
+ */
+class ResizableThreadPoolScheduler(protected val terminate: Boolean,
+ protected val daemon: Boolean)
+ extends Thread with IScheduler with TerminationMonitor {
+
+ setDaemon(daemon)
+
+ // guarded by this
+ private var terminating = false
+ // guarded by this
+ private var suspending = false
+
+ // this has to be a java.util.Collection, since this is what
+ // the ForkJoinPool returns.
+ @volatile
+ private var drainedTasks: java.util.List[_] = null
+
+ // guarded by this
+ private var coreSize = ThreadPoolConfig.corePoolSize
+ private val maxSize = ThreadPoolConfig.maxPoolSize
+ private val numCores = Runtime.getRuntime().availableProcessors()
+
+ protected val CHECK_FREQ = 10
+
+ private def makeNewPool(): ThreadPoolExecutor = {
+ val workQueue = new LinkedBlockingQueue
+ new ThreadPoolExecutor(coreSize,
+ maxSize,
+ 60000L,
+ TimeUnit.MILLISECONDS,
+ workQueue,
+ new ThreadPoolExecutor.CallerRunsPolicy)
+ }
+
+ // guarded by this
+ private var executor = makeNewPool()
+
+ Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
+
+ def this(d: Boolean) {
+ this(true, d)
+ }
+
+ def this() {
+ this(false)
+ }
+
+ private def numWorkersBlocked = {
+ executor.mainLock.lock()
+ val iter = executor.workers.iterator()
+ var numBlocked = 0
+ while (iter.hasNext()) {
+ val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker]
+ if (w.tryLock()) {
+ // worker is idle
+ w.unlock()
+ } else {
+ val s = w.thread.getState()
+ if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING)
+ numBlocked += 1
+ }
+ }
+ executor.mainLock.unlock()
+ numBlocked
+ }
+
+ override def run() {
+ try {
+ while (true) {
+ this.synchronized {
+ try {
+ wait(CHECK_FREQ)
+ } catch {
+ case _: InterruptedException =>
+ }
+
+ if (terminating)
+ throw new QuitException
+
+ if (!suspending) {
+ gc()
+
+ // check if we need more worker threads
+ val activeBlocked = numWorkersBlocked
+ if (coreSize - activeBlocked < numCores && coreSize < maxSize) {
+ coreSize = numCores + activeBlocked
+ executor.setCorePoolSize(coreSize)
+ } else if (terminate && allTerminated) {
+ // if all worker threads idle terminate
+ if (executor.getActiveCount() == 0) {
+ Debug.info(this+": initiating shutdown...")
+ Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
+
+ terminating = true
+ throw new QuitException
+ }
+ }
+ } else {
+ drainedTasks = executor.shutdownNow()
+ Debug.info(this+": drained "+drainedTasks.size()+" tasks")
+ terminating = true
+ throw new QuitException
+ }
+ } // sync
+ }
+ } catch {
+ case _: QuitException =>
+ executor.shutdown()
+ // allow thread to exit
+ }
+ }
+
+ def execute(task: Runnable): Unit =
+ executor execute task
+
+ def execute(fun: => Unit): Unit =
+ executor.execute(new Runnable {
+ def run() { fun }
+ })
+
+ /** Shuts down the scheduler.
+ */
+ def shutdown(): Unit = synchronized {
+ terminating = true
+ }
+
+ def isActive = synchronized {
+ !terminating && (executor ne null) && !executor.isShutdown()
+ }
+
+ def managedBlock(blocker: ManagedBlocker) {
+ blocker.block()
+ }
+
+ /** Suspends the scheduler. All threads that were in use by the
+ * scheduler and its internal thread pool are terminated.
+ */
+ def snapshot() = synchronized {
+ suspending = true
+ }
+
+ /** Resumes the execution of the scheduler if it was previously
+ * suspended using <code>snapshot</code>.
+ */
+ def restart() {
+ synchronized {
+ if (!suspending)
+ error("snapshot has not been invoked")
+ else if (isActive)
+ error("scheduler is still active")
+ else
+ suspending = false
+
+ executor = makeNewPool()
+ }
+ val iter = drainedTasks.iterator()
+ while (iter.hasNext()) {
+ executor.execute(iter.next().asInstanceOf[Runnable])
+ }
+ start()
+ }
+
+}