diff options
Diffstat (limited to 'src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala')
-rw-r--r-- | src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala deleted file mode 100644 index 342579db6c..0000000000 --- a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.scheduler - -import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, - ThreadFactory} -import scala.actors.{Debug, IScheduler} -import scala.concurrent.ManagedBlocker - -/** - * This scheduler class uses a `ThreadPoolExecutor` to execute `Actor`s. - * - * The scheduler attempts to shut down itself and the underlying - * `ThreadPoolExecutor` only if `terminate` is set to true. Otherwise, - * the scheduler must be shut down explicitly. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class ResizableThreadPoolScheduler(protected val terminate: Boolean, - protected val daemon: Boolean) - extends Thread with IScheduler with TerminationMonitor { - - setDaemon(daemon) - - // guarded by this - private var terminating = false - // guarded by this - private var suspending = false - - // this has to be a java.util.Collection, since this is what - // the ForkJoinPool returns. - @volatile - private var drainedTasks: java.util.List[_] = null - - // guarded by this - private var coreSize = ThreadPoolConfig.corePoolSize - private val maxSize = ThreadPoolConfig.maxPoolSize - private val numCores = Runtime.getRuntime().availableProcessors() - - protected val CHECK_FREQ = 10 - - private class DaemonThreadFactory extends ThreadFactory { - def newThread(r: Runnable): Thread = { - val t = new Thread(r) - t.setDaemon(daemon) - t - } - } - private val threadFac = new DaemonThreadFactory - - private def makeNewPool(): ThreadPoolExecutor = { - val workQueue = new LinkedBlockingQueue - new ThreadPoolExecutor(coreSize, - maxSize, - 60000L, - TimeUnit.MILLISECONDS, - workQueue, - threadFac, - new ThreadPoolExecutor.CallerRunsPolicy) - } - - // guarded by this - private var executor = makeNewPool() - - Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) - - def this(d: Boolean) { - this(true, d) - } - - def this() { - this(false) - } - - private def numWorkersBlocked = { - executor.mainLock.lock() - val iter = executor.workers.iterator() - var numBlocked = 0 - while (iter.hasNext()) { - val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker] - if (w.tryLock()) { - // worker is idle - w.unlock() - } else { - val s = w.thread.getState() - if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING) - numBlocked += 1 - } - } - executor.mainLock.unlock() - numBlocked - } - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ.toLong) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitControl - - if (!suspending) { - gc() - - // check if we need more worker threads - val activeBlocked = numWorkersBlocked - if (coreSize - activeBlocked < numCores && coreSize < maxSize) { - coreSize = numCores + activeBlocked - executor.setCorePoolSize(coreSize) - } else if (terminate && allActorsTerminated) { - // if all worker threads idle terminate - if (executor.getActiveCount() == 0) { - Debug.info(this+": initiating shutdown...") - Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) - - terminating = true - throw new QuitControl - } - } - } else { - drainedTasks = executor.shutdownNow() - Debug.info(this+": drained "+drainedTasks.size()+" tasks") - terminating = true - throw new QuitControl - } - } // sync - } - } catch { - case _: QuitControl => - executor.shutdown() - // allow thread to exit - } - } - - def execute(task: Runnable): Unit = - executor execute task - - def execute(fun: => Unit): Unit = - executor.execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def isActive = synchronized { - !terminating && (executor ne null) && !executor.isShutdown() - } - - def managedBlock(blocker: ManagedBlocker) { - blocker.block() - } - - /** Suspends the scheduler. All threads that were in use by the - * scheduler and its internal thread pool are terminated. - */ - def snapshot() = synchronized { - suspending = true - } - - /** Resumes the execution of the scheduler if it was previously - * suspended using `snapshot`. - */ - def restart() { - synchronized { - if (!suspending) - sys.error("snapshot has not been invoked") - else if (isActive) - sys.error("scheduler is still active") - else - suspending = false - - executor = makeNewPool() - } - val iter = drainedTasks.iterator() - while (iter.hasNext()) { - executor.execute(iter.next().asInstanceOf[Runnable]) - } - start() - } - -} |