diff options
Diffstat (limited to 'src/actors/scala/actors/scheduler/ForkJoinScheduler.scala')
-rw-r--r-- | src/actors/scala/actors/scheduler/ForkJoinScheduler.scala | 174 |
1 files changed, 0 insertions, 174 deletions
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala deleted file mode 100644 index 75a98db6c8..0000000000 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ /dev/null @@ -1,174 +0,0 @@ -package scala.actors -package scheduler - -import java.util.{Collection, ArrayList} -import scala.concurrent.forkjoin._ - -/** The <code>ForkJoinScheduler</code> is backed by a lightweight - * fork-join task execution framework. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean, fair: Boolean) - extends Runnable with IScheduler with TerminationMonitor { - - private var pool = makeNewPool() // guarded by this - private var terminating = false // guarded by this - private var snapshoting = false // guarded by this - - // this has to be a java.util.Collection, since this is what - // the ForkJoinPool returns. - private var drainedTasks: Collection[ForkJoinTask[_]] = null - - protected val CHECK_FREQ = 10 - - // this random number generator is only used in fair mode - private lazy val random = new java.util.Random // guarded by random - - def this(d: Boolean, f: Boolean) { - this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, d, f) - } - - def this(d: Boolean) { - this(d, true) // default is fair - } - - def this() { - this(false) // default is non-daemon - } - - private def makeNewPool(): DrainableForkJoinPool = { - val p = new DrainableForkJoinPool(initCoreSize, maxSize) - Debug.info(this+": parallelism "+p.getParallelism()) - p - } - - /** Starts this scheduler. - */ - def start() { - try { - val t = new Thread(this) - t.setDaemon(daemon) - t.setName("ForkJoinScheduler") - t.start() - } catch { - case e: Exception => - Debug.info(this+": could not create scheduler thread: "+e) - } - } - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ.toLong) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitControl - - if (allActorsTerminated) { - Debug.info(this+": all actors terminated") - terminating = true - throw new QuitControl - } - - if (!snapshoting) { - gc() - } else if (pool.isQuiescent()) { - val list = new ArrayList[ForkJoinTask[_]] - val num = pool.drainTasksTo(list) - Debug.info(this+": drained "+num+" tasks") - drainedTasks = list - terminating = true - throw new QuitControl - } - } - } - } catch { - case _: QuitControl => - Debug.info(this+": initiating shutdown...") - while (!pool.isQuiescent()) { - try { - Thread.sleep(10) - } catch { - case ignore: InterruptedException => - } - } - pool.shutdown() - // allow thread to exit - } - } - - // TODO: when do we pass a task that is not a RecursiveAction? - def execute(task: Runnable) { - pool.execute(task) - } - - override def executeFromActor(task: Runnable) { - // in fair mode: 2% chance of submitting to global task queue - if (fair && random.synchronized { random.nextInt(50) == 1 }) - pool.execute(task) - else - task.asInstanceOf[RecursiveAction].fork() - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def isActive = synchronized { - !terminating && (pool ne null) && !pool.isShutdown() - } - - override def managedBlock(blocker: scala.concurrent.ManagedBlocker) { - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - def block = blocker.block() - def isReleasable() = blocker.isReleasable - }) - } - - /** Suspends the scheduler. All threads that were in use by the - * scheduler and its internal thread pool are terminated. - */ - def snapshot() = synchronized { - snapshoting = true - } - - /** Resumes the execution of the scheduler if it was previously - * suspended using <code>ForkJoinScheduler.snapshot</code>. - */ - def restart() { - synchronized { - if (!snapshoting) - sys.error("snapshot has not been invoked") - else if (isActive) - sys.error("scheduler is still active") - else - snapshoting = false - - pool = makeNewPool() - } - val iter = drainedTasks.iterator() - while (iter.hasNext()) { - pool.execute(iter.next()) - } - start() - } - -} |