summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala')
-rw-r--r--src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala197
1 files changed, 0 insertions, 197 deletions
diff --git a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
deleted file mode 100644
index 342579db6c..0000000000
--- a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala
+++ /dev/null
@@ -1,197 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-package scala.actors.scheduler
-
-import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue,
- ThreadFactory}
-import scala.actors.{Debug, IScheduler}
-import scala.concurrent.ManagedBlocker
-
-/**
- * This scheduler class uses a `ThreadPoolExecutor` to execute `Actor`s.
- *
- * The scheduler attempts to shut down itself and the underlying
- * `ThreadPoolExecutor` only if `terminate` is set to true. Otherwise,
- * the scheduler must be shut down explicitly.
- *
- * @author Philipp Haller
- */
-@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0")
-class ResizableThreadPoolScheduler(protected val terminate: Boolean,
- protected val daemon: Boolean)
- extends Thread with IScheduler with TerminationMonitor {
-
- setDaemon(daemon)
-
- // guarded by this
- private var terminating = false
- // guarded by this
- private var suspending = false
-
- // this has to be a java.util.Collection, since this is what
- // the ForkJoinPool returns.
- @volatile
- private var drainedTasks: java.util.List[_] = null
-
- // guarded by this
- private var coreSize = ThreadPoolConfig.corePoolSize
- private val maxSize = ThreadPoolConfig.maxPoolSize
- private val numCores = Runtime.getRuntime().availableProcessors()
-
- protected val CHECK_FREQ = 10
-
- private class DaemonThreadFactory extends ThreadFactory {
- def newThread(r: Runnable): Thread = {
- val t = new Thread(r)
- t.setDaemon(daemon)
- t
- }
- }
- private val threadFac = new DaemonThreadFactory
-
- private def makeNewPool(): ThreadPoolExecutor = {
- val workQueue = new LinkedBlockingQueue
- new ThreadPoolExecutor(coreSize,
- maxSize,
- 60000L,
- TimeUnit.MILLISECONDS,
- workQueue,
- threadFac,
- new ThreadPoolExecutor.CallerRunsPolicy)
- }
-
- // guarded by this
- private var executor = makeNewPool()
-
- Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
-
- def this(d: Boolean) {
- this(true, d)
- }
-
- def this() {
- this(false)
- }
-
- private def numWorkersBlocked = {
- executor.mainLock.lock()
- val iter = executor.workers.iterator()
- var numBlocked = 0
- while (iter.hasNext()) {
- val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker]
- if (w.tryLock()) {
- // worker is idle
- w.unlock()
- } else {
- val s = w.thread.getState()
- if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING)
- numBlocked += 1
- }
- }
- executor.mainLock.unlock()
- numBlocked
- }
-
- override def run() {
- try {
- while (true) {
- this.synchronized {
- try {
- wait(CHECK_FREQ.toLong)
- } catch {
- case _: InterruptedException =>
- }
-
- if (terminating)
- throw new QuitControl
-
- if (!suspending) {
- gc()
-
- // check if we need more worker threads
- val activeBlocked = numWorkersBlocked
- if (coreSize - activeBlocked < numCores && coreSize < maxSize) {
- coreSize = numCores + activeBlocked
- executor.setCorePoolSize(coreSize)
- } else if (terminate && allActorsTerminated) {
- // if all worker threads idle terminate
- if (executor.getActiveCount() == 0) {
- Debug.info(this+": initiating shutdown...")
- Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize)
-
- terminating = true
- throw new QuitControl
- }
- }
- } else {
- drainedTasks = executor.shutdownNow()
- Debug.info(this+": drained "+drainedTasks.size()+" tasks")
- terminating = true
- throw new QuitControl
- }
- } // sync
- }
- } catch {
- case _: QuitControl =>
- executor.shutdown()
- // allow thread to exit
- }
- }
-
- def execute(task: Runnable): Unit =
- executor execute task
-
- def execute(fun: => Unit): Unit =
- executor.execute(new Runnable {
- def run() { fun }
- })
-
- /** Shuts down the scheduler.
- */
- def shutdown(): Unit = synchronized {
- terminating = true
- }
-
- def isActive = synchronized {
- !terminating && (executor ne null) && !executor.isShutdown()
- }
-
- def managedBlock(blocker: ManagedBlocker) {
- blocker.block()
- }
-
- /** Suspends the scheduler. All threads that were in use by the
- * scheduler and its internal thread pool are terminated.
- */
- def snapshot() = synchronized {
- suspending = true
- }
-
- /** Resumes the execution of the scheduler if it was previously
- * suspended using `snapshot`.
- */
- def restart() {
- synchronized {
- if (!suspending)
- sys.error("snapshot has not been invoked")
- else if (isActive)
- sys.error("scheduler is still active")
- else
- suspending = false
-
- executor = makeNewPool()
- }
- val iter = drainedTasks.iterator()
- while (iter.hasNext()) {
- executor.execute(iter.next().asInstanceOf[Runnable])
- }
- start()
- }
-
-}