diff options
Diffstat (limited to 'src/actors/scala/actors/scheduler')
12 files changed, 0 insertions, 961 deletions
diff --git a/src/actors/scala/actors/scheduler/ActorGC.scala b/src/actors/scala/actors/scheduler/ActorGC.scala deleted file mode 100644 index a27799d132..0000000000 --- a/src/actors/scala/actors/scheduler/ActorGC.scala +++ /dev/null @@ -1,101 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package scheduler - -import java.lang.ref.{Reference, WeakReference, ReferenceQueue} -import scala.collection.mutable - -/** - * ActorGC keeps track of the number of live actors being managed by a - * a scheduler so that it can shutdown when all of the actors it manages have - * either been explicitly terminated or garbage collected. - * - * When an actor is started, it is registered with the ActorGC via the - * `newActor` method, and when an actor is knowingly terminated - * (e.g. act method finishes, exit explicitly called, an exception is thrown), - * the ActorGC is informed via the `terminated` method. - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -trait ActorGC extends TerminationMonitor { - self: IScheduler => - - /** Actors are added to refQ in newActor. */ - private val refQ = new ReferenceQueue[TrackedReactor] - - /** - * This is a set of references to all the actors registered with - * this ActorGC. It is maintained so that the WeakReferences will - * not be GC'd before the actors to which they point. - */ - private val refSet = new mutable.HashSet[Reference[t] forSome { type t <: TrackedReactor }] - - /** newActor is invoked whenever a new actor is started. */ - override def newActor(a: TrackedReactor) = synchronized { - // registers a reference to the actor with the ReferenceQueue - val wr = new WeakReference[TrackedReactor](a, refQ) - refSet += wr - activeActors += 1 - } - - /** Checks for actors that have become garbage. */ - protected override def gc() = synchronized { - // check for unreachable actors - def drainRefQ() { - val wr = refQ.poll - if (wr != null) { - activeActors -= 1 - refSet -= wr - // continue draining - drainRefQ() - } - } - drainRefQ() - } - - /** Prints some status information on currently managed actors. */ - protected def status() { - println(this+": size of refSet: "+refSet.size) - } - - /** Checks whether all actors have terminated. */ - override private[actors] def allActorsTerminated: Boolean = synchronized { - activeActors <= 0 - } - - override def onTerminate(a: TrackedReactor)(f: => Unit): Unit = synchronized { - terminationHandlers += (a -> (() => f)) - } - - override def terminated(a: TrackedReactor) = { - super.terminated(a) - - synchronized { - // find the weak reference that points to the terminated actor, if any - refSet.find((ref: Reference[t] forSome { type t <: TrackedReactor }) => ref.get() == a) match { - case Some(r) => - // invoking clear will not cause r to be enqueued - r.clear() - refSet -= r.asInstanceOf[Reference[t] forSome { type t <: TrackedReactor }] - case None => - // do nothing - } - } - } - - private[actors] def getPendingCount = synchronized { - activeActors - } - - private[actors] def setPendingCount(cnt: Int) = synchronized { - activeActors = cnt - } - -} diff --git a/src/actors/scala/actors/scheduler/DaemonScheduler.scala b/src/actors/scala/actors/scheduler/DaemonScheduler.scala deleted file mode 100644 index b21a1aa3e6..0000000000 --- a/src/actors/scala/actors/scheduler/DaemonScheduler.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors -package scheduler - -/** - * Default scheduler for actors with daemon semantics, such as those backing futures. - * - * @author Erik Engbrecht - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -object DaemonScheduler extends DelegatingScheduler { - - protected def makeNewScheduler(): IScheduler = { - val sched = if (!ThreadPoolConfig.useForkJoin) { - val s = new ResizableThreadPoolScheduler(true) - s.start() - s - } else { - val s = new ForkJoinScheduler(true) - s.start() - s - } - Debug.info(this+": starting new "+sched+" ["+sched.getClass+"]") - sched - } - -} diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala deleted file mode 100644 index b8a81d11a9..0000000000 --- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors -package scheduler - -import scala.concurrent.ManagedBlocker - -/** - * @author Erik Engbrecht - */ -private[actors] trait DelegatingScheduler extends IScheduler { - protected def makeNewScheduler(): IScheduler - - protected var sched: IScheduler = null - - final def impl = synchronized { - if ((sched eq null) || (!sched.isActive)) - sched = makeNewScheduler() - sched - } - - final def impl_= (scheduler: IScheduler): Unit = synchronized { - //TODO: if there is already a scheduler, should it be shutdown? - sched = scheduler - } - - /** - * Always active because it will just make a new scheduler if required - */ - def isActive: Boolean = true - - def execute(fun: => Unit) = impl.execute(fun) - - def execute(task: Runnable) = impl.execute(task) - - override def executeFromActor(task: Runnable) = impl.executeFromActor(task) - - def shutdown(): Unit = synchronized { - if (sched ne null) { - sched.shutdown() - sched = null - } - } - - def newActor(actor: TrackedReactor) = synchronized { - val createNew = if (sched eq null) - true - else sched.synchronized { - if (!sched.isActive) - true - else { - sched.newActor(actor) - false - } - } - if (createNew) { - sched = makeNewScheduler() - sched.newActor(actor) - } - } - - def terminated(actor: TrackedReactor) = impl.terminated(actor) - - def onTerminate(actor: TrackedReactor)(f: => Unit) = impl.onTerminate(actor)(f) - - override def managedBlock(blocker: ManagedBlocker): Unit = - impl.managedBlock(blocker) -} diff --git a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala b/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala deleted file mode 100644 index 37710ec037..0000000000 --- a/src/actors/scala/actors/scheduler/DrainableForkJoinPool.scala +++ /dev/null @@ -1,11 +0,0 @@ -package scala.actors -package scheduler - -import java.util.Collection -import scala.concurrent.forkjoin.{ForkJoinPool, ForkJoinTask} - -private class DrainableForkJoinPool(parallelism: Int, maxPoolSize: Int) extends ForkJoinPool(parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true) { - - override def drainTasksTo(c: Collection[ _ >: ForkJoinTask[_]]): Int = - super.drainTasksTo(c) -} diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala deleted file mode 100644 index 4d3ebc3c04..0000000000 --- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala +++ /dev/null @@ -1,95 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package scheduler - -import java.util.concurrent.{Callable, ExecutorService} -import scala.concurrent.ThreadPoolRunner - -/** - * The <code>ExecutorScheduler</code> object is used to create - * <code>ExecutorScheduler</code> instances. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -object ExecutorScheduler { - - private def start(sched: ExecutorScheduler): ExecutorScheduler = { - sched.start() - sched - } - - /** Creates an <code>ExecutorScheduler</code> using the provided - * <code>ExecutorService</code>. - * - * @param exec the executor to use - * @return the scheduler - */ - def apply(exec: ExecutorService): ExecutorScheduler = - start(new ExecutorScheduler { - val executor: ExecutorService = exec - }) - - /** Creates an <code>ExecutorScheduler</code> using the provided - * <code>ExecutorService</code>. - * - * @param exec the executor to use - * @param term whether the scheduler should automatically terminate - * @return the scheduler - */ - def apply(exec: ExecutorService, term: Boolean): ExecutorScheduler = - start(new ExecutorScheduler { - val executor: ExecutorService = exec - override val terminate = term - }) - -} - -/** - * The <code>ExecutorScheduler</code> class uses an - * <code>ExecutorService</code> to execute <code>Actor</code>s. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -trait ExecutorScheduler extends Thread - with IScheduler with TerminationService - with ThreadPoolRunner { - - def execute(task: Runnable) { - super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]]) - } - - private class RunCallable(fun: => Unit) extends Callable[Unit] with Runnable { - def call() { fun } - def run() { fun } - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - override def execute(fun: => Unit) { - super[ThreadPoolRunner].execute((new RunCallable(fun)).asInstanceOf[Task[Unit]]) - } - - /** This method is called when the scheduler shuts down. - */ - def onShutdown(): Unit = - executor.shutdown() - - /** The scheduler is active if the underlying <code>ExecutorService</code> - * has not been shut down. - */ - def isActive = - (executor ne null) && !executor.isShutdown - -} diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala deleted file mode 100644 index 75a98db6c8..0000000000 --- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala +++ /dev/null @@ -1,174 +0,0 @@ -package scala.actors -package scheduler - -import java.util.{Collection, ArrayList} -import scala.concurrent.forkjoin._ - -/** The <code>ForkJoinScheduler</code> is backed by a lightweight - * fork-join task execution framework. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean, fair: Boolean) - extends Runnable with IScheduler with TerminationMonitor { - - private var pool = makeNewPool() // guarded by this - private var terminating = false // guarded by this - private var snapshoting = false // guarded by this - - // this has to be a java.util.Collection, since this is what - // the ForkJoinPool returns. - private var drainedTasks: Collection[ForkJoinTask[_]] = null - - protected val CHECK_FREQ = 10 - - // this random number generator is only used in fair mode - private lazy val random = new java.util.Random // guarded by random - - def this(d: Boolean, f: Boolean) { - this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, d, f) - } - - def this(d: Boolean) { - this(d, true) // default is fair - } - - def this() { - this(false) // default is non-daemon - } - - private def makeNewPool(): DrainableForkJoinPool = { - val p = new DrainableForkJoinPool(initCoreSize, maxSize) - Debug.info(this+": parallelism "+p.getParallelism()) - p - } - - /** Starts this scheduler. - */ - def start() { - try { - val t = new Thread(this) - t.setDaemon(daemon) - t.setName("ForkJoinScheduler") - t.start() - } catch { - case e: Exception => - Debug.info(this+": could not create scheduler thread: "+e) - } - } - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ.toLong) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitControl - - if (allActorsTerminated) { - Debug.info(this+": all actors terminated") - terminating = true - throw new QuitControl - } - - if (!snapshoting) { - gc() - } else if (pool.isQuiescent()) { - val list = new ArrayList[ForkJoinTask[_]] - val num = pool.drainTasksTo(list) - Debug.info(this+": drained "+num+" tasks") - drainedTasks = list - terminating = true - throw new QuitControl - } - } - } - } catch { - case _: QuitControl => - Debug.info(this+": initiating shutdown...") - while (!pool.isQuiescent()) { - try { - Thread.sleep(10) - } catch { - case ignore: InterruptedException => - } - } - pool.shutdown() - // allow thread to exit - } - } - - // TODO: when do we pass a task that is not a RecursiveAction? - def execute(task: Runnable) { - pool.execute(task) - } - - override def executeFromActor(task: Runnable) { - // in fair mode: 2% chance of submitting to global task queue - if (fair && random.synchronized { random.nextInt(50) == 1 }) - pool.execute(task) - else - task.asInstanceOf[RecursiveAction].fork() - } - - /** Submits a closure for execution. - * - * @param fun the closure to be executed - */ - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def isActive = synchronized { - !terminating && (pool ne null) && !pool.isShutdown() - } - - override def managedBlock(blocker: scala.concurrent.ManagedBlocker) { - ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker { - def block = blocker.block() - def isReleasable() = blocker.isReleasable - }) - } - - /** Suspends the scheduler. All threads that were in use by the - * scheduler and its internal thread pool are terminated. - */ - def snapshot() = synchronized { - snapshoting = true - } - - /** Resumes the execution of the scheduler if it was previously - * suspended using <code>ForkJoinScheduler.snapshot</code>. - */ - def restart() { - synchronized { - if (!snapshoting) - sys.error("snapshot has not been invoked") - else if (isActive) - sys.error("scheduler is still active") - else - snapshoting = false - - pool = makeNewPool() - } - val iter = drainedTasks.iterator() - while (iter.hasNext()) { - pool.execute(iter.next()) - } - start() - } - -} diff --git a/src/actors/scala/actors/scheduler/QuitControl.scala b/src/actors/scala/actors/scheduler/QuitControl.scala deleted file mode 100644 index b3e288aaff..0000000000 --- a/src/actors/scala/actors/scheduler/QuitControl.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.scheduler - -import scala.util.control.ControlThrowable - -/** - * The `QuitControl` class is used to manage control flow of certain - * schedulers. - * - * @author Philipp Haller - */ -private[scheduler] class QuitControl extends ControlThrowable diff --git a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala deleted file mode 100644 index 342579db6c..0000000000 --- a/src/actors/scala/actors/scheduler/ResizableThreadPoolScheduler.scala +++ /dev/null @@ -1,197 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors.scheduler - -import scala.actors.threadpool.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, - ThreadFactory} -import scala.actors.{Debug, IScheduler} -import scala.concurrent.ManagedBlocker - -/** - * This scheduler class uses a `ThreadPoolExecutor` to execute `Actor`s. - * - * The scheduler attempts to shut down itself and the underlying - * `ThreadPoolExecutor` only if `terminate` is set to true. Otherwise, - * the scheduler must be shut down explicitly. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class ResizableThreadPoolScheduler(protected val terminate: Boolean, - protected val daemon: Boolean) - extends Thread with IScheduler with TerminationMonitor { - - setDaemon(daemon) - - // guarded by this - private var terminating = false - // guarded by this - private var suspending = false - - // this has to be a java.util.Collection, since this is what - // the ForkJoinPool returns. - @volatile - private var drainedTasks: java.util.List[_] = null - - // guarded by this - private var coreSize = ThreadPoolConfig.corePoolSize - private val maxSize = ThreadPoolConfig.maxPoolSize - private val numCores = Runtime.getRuntime().availableProcessors() - - protected val CHECK_FREQ = 10 - - private class DaemonThreadFactory extends ThreadFactory { - def newThread(r: Runnable): Thread = { - val t = new Thread(r) - t.setDaemon(daemon) - t - } - } - private val threadFac = new DaemonThreadFactory - - private def makeNewPool(): ThreadPoolExecutor = { - val workQueue = new LinkedBlockingQueue - new ThreadPoolExecutor(coreSize, - maxSize, - 60000L, - TimeUnit.MILLISECONDS, - workQueue, - threadFac, - new ThreadPoolExecutor.CallerRunsPolicy) - } - - // guarded by this - private var executor = makeNewPool() - - Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) - - def this(d: Boolean) { - this(true, d) - } - - def this() { - this(false) - } - - private def numWorkersBlocked = { - executor.mainLock.lock() - val iter = executor.workers.iterator() - var numBlocked = 0 - while (iter.hasNext()) { - val w = iter.next().asInstanceOf[ThreadPoolExecutor#Worker] - if (w.tryLock()) { - // worker is idle - w.unlock() - } else { - val s = w.thread.getState() - if (s == Thread.State.WAITING || s == Thread.State.TIMED_WAITING) - numBlocked += 1 - } - } - executor.mainLock.unlock() - numBlocked - } - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ.toLong) - } catch { - case _: InterruptedException => - } - - if (terminating) - throw new QuitControl - - if (!suspending) { - gc() - - // check if we need more worker threads - val activeBlocked = numWorkersBlocked - if (coreSize - activeBlocked < numCores && coreSize < maxSize) { - coreSize = numCores + activeBlocked - executor.setCorePoolSize(coreSize) - } else if (terminate && allActorsTerminated) { - // if all worker threads idle terminate - if (executor.getActiveCount() == 0) { - Debug.info(this+": initiating shutdown...") - Debug.info(this+": corePoolSize = "+coreSize+", maxPoolSize = "+maxSize) - - terminating = true - throw new QuitControl - } - } - } else { - drainedTasks = executor.shutdownNow() - Debug.info(this+": drained "+drainedTasks.size()+" tasks") - terminating = true - throw new QuitControl - } - } // sync - } - } catch { - case _: QuitControl => - executor.shutdown() - // allow thread to exit - } - } - - def execute(task: Runnable): Unit = - executor execute task - - def execute(fun: => Unit): Unit = - executor.execute(new Runnable { - def run() { fun } - }) - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - - def isActive = synchronized { - !terminating && (executor ne null) && !executor.isShutdown() - } - - def managedBlock(blocker: ManagedBlocker) { - blocker.block() - } - - /** Suspends the scheduler. All threads that were in use by the - * scheduler and its internal thread pool are terminated. - */ - def snapshot() = synchronized { - suspending = true - } - - /** Resumes the execution of the scheduler if it was previously - * suspended using `snapshot`. - */ - def restart() { - synchronized { - if (!suspending) - sys.error("snapshot has not been invoked") - else if (isActive) - sys.error("scheduler is still active") - else - suspending = false - - executor = makeNewPool() - } - val iter = drainedTasks.iterator() - while (iter.hasNext()) { - executor.execute(iter.next().asInstanceOf[Runnable]) - } - start() - } - -} diff --git a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala b/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala deleted file mode 100644 index 03b235fe74..0000000000 --- a/src/actors/scala/actors/scheduler/SingleThreadedScheduler.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package scheduler - -import scala.collection.mutable - -/** - * This scheduler executes actor tasks on the current thread. - * - * @author Philipp Haller - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -class SingleThreadedScheduler extends IScheduler { - - private val tasks = new mutable.Queue[Runnable] - - /** The maximum number of nested tasks that are run - * without unwinding the call stack. - */ - protected val maxNesting = 10 - - private var curNest = 0 - private var isShutdown = false - - def execute(task: Runnable) { - if (curNest < maxNesting) { - curNest += 1 - task.run() - } else { - curNest = 0 - tasks += task - } - } - - def execute(fun: => Unit): Unit = - execute(new Runnable { - def run() { fun } - }) - - def shutdown() { - isShutdown = false - while (!tasks.isEmpty) { - val task = tasks.dequeue() - task.run() - } - isShutdown = true - } - - def newActor(actor: TrackedReactor) {} - def terminated(actor: TrackedReactor) {} - - // TODO: run termination handlers at end of shutdown. - def onTerminate(actor: TrackedReactor)(f: => Unit) {} - - def isActive = - !isShutdown - - def managedBlock(blocker: scala.concurrent.ManagedBlocker) { - blocker.block() - } -} diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala deleted file mode 100644 index 9f26ca8d69..0000000000 --- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala +++ /dev/null @@ -1,69 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors -package scheduler - -import scala.collection.mutable - -private[scheduler] trait TerminationMonitor { - _: IScheduler => - - protected var activeActors = 0 - protected val terminationHandlers = new mutable.HashMap[TrackedReactor, () => Unit] - private var started = false - - /** newActor is invoked whenever a new actor is started. */ - def newActor(a: TrackedReactor) = synchronized { - activeActors += 1 - if (!started) - started = true - } - - /** Registers a closure to be executed when the specified - * actor terminates. - * - * @param a the actor - * @param f the closure to be registered - */ - def onTerminate(a: TrackedReactor)(f: => Unit): Unit = synchronized { - terminationHandlers += (a -> (() => f)) - } - - /** Registers that the specified actor has terminated. - * - * @param a the actor that has terminated - */ - def terminated(a: TrackedReactor) = { - // obtain termination handler (if any) - val todo = synchronized { - terminationHandlers.get(a) match { - case Some(handler) => - terminationHandlers -= a - handler - case None => - () => { /* do nothing */ } - } - } - - // invoke termination handler (if any) - todo() - - synchronized { - activeActors -= 1 - } - } - - /** Checks whether all actors have terminated. */ - private[actors] def allActorsTerminated: Boolean = synchronized { - started && activeActors <= 0 - } - - /** Checks for actors that have become garbage. */ - protected def gc() {} -} diff --git a/src/actors/scala/actors/scheduler/TerminationService.scala b/src/actors/scala/actors/scheduler/TerminationService.scala deleted file mode 100644 index ed1805ee1e..0000000000 --- a/src/actors/scala/actors/scheduler/TerminationService.scala +++ /dev/null @@ -1,68 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package scheduler - -import java.lang.{Thread, InterruptedException} - -/** - * The <code>TerminationService</code> class starts a new thread - * that is used to check regularly if the scheduler can be - * shut down, because all started actors are known to - * have terminated. - * - * @author Philipp Haller - */ -private[scheduler] trait TerminationService extends TerminationMonitor { - _: Thread with IScheduler => - - private var terminating = false - - /** Indicates whether the scheduler should terminate when all - * actors have terminated. - */ - protected val terminate = true - - protected val CHECK_FREQ = 50 - - def onShutdown(): Unit - - override def run() { - try { - while (true) { - this.synchronized { - try { - wait(CHECK_FREQ.toLong) - } catch { - case _: InterruptedException => - } - - if (terminating || (terminate && allActorsTerminated)) - throw new QuitControl - - gc() - } - } - } catch { - case _: QuitControl => - Debug.info(this+": initiating shutdown...") - // invoke shutdown hook - onShutdown() - // allow thread to exit - } - } - - /** Shuts down the scheduler. - */ - def shutdown(): Unit = synchronized { - terminating = true - } - -} diff --git a/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala b/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala deleted file mode 100644 index bfd4e7ac40..0000000000 --- a/src/actors/scala/actors/scheduler/ThreadPoolConfig.scala +++ /dev/null @@ -1,50 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors -package scheduler - -import scala.util.Properties.{ javaVersion, javaVmVendor, isJavaAtLeast, propIsSetTo, propOrNone } - -/** - * @author Erik Engbrecht - * @author Philipp Haller - */ -private[actors] object ThreadPoolConfig { - private val rt = Runtime.getRuntime() - private val minNumThreads = 4 - - private def getIntegerProp(propName: String): Option[Int] = - try propOrNone(propName) map (_.toInt) - catch { case _: SecurityException | _: NumberFormatException => None } - - val corePoolSize = getIntegerProp("actors.corePoolSize") match { - case Some(i) if i > 0 => i - case _ => { - val byCores = rt.availableProcessors() * 2 - if (byCores > minNumThreads) byCores else minNumThreads - } - } - - val maxPoolSize = { - val preMaxSize = getIntegerProp("actors.maxPoolSize") getOrElse 256 - if (preMaxSize >= corePoolSize) preMaxSize else corePoolSize - } - - private[actors] def useForkJoin: Boolean = - try !propIsSetTo("actors.enableForkJoin", "false") && - (propIsSetTo("actors.enableForkJoin", "true") || { - Debug.info(this+": java.version = "+javaVersion) - Debug.info(this+": java.vm.vendor = "+javaVmVendor) - isJavaAtLeast("1.6") - }) - catch { - case _: SecurityException => false - } -} |