summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-09-24 18:28:17 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-09-24 18:28:17 +0000
commit92fc7b37b01bc871966a49524197c1cbbeec8988 (patch)
tree7fad3d6cff9bf825e7018d26407413988c9a04f4 /src/actors
parent6fea2488af12d942c1cf7ba781d2f60fe6ddc9c0 (diff)
downloadscala-92fc7b37b01bc871966a49524197c1cbbeec8988.tar.gz
scala-92fc7b37b01bc871966a49524197c1cbbeec8988.tar.bz2
scala-92fc7b37b01bc871966a49524197c1cbbeec8988.zip
Introduced actors package object to deprecate a...
Introduced actors package object to deprecate a number of classes. Made ForkJoinScheduler more configurable and let it read ThreadPoolConfig. Clean-ups in TerminationMonitor and ActorGC. Removed DefaultExecutorScheduler. Made DelegatingScheduler and ExecutorScheduler private. Deprecated MessageQueue and MessageQueueElement, so that we can later make them private. Deprecated a number of methods in IScheduler. Tightened access modifiers in Reactor.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala2
-rw-r--r--src/actors/scala/actors/ActorGC.scala56
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala12
-rw-r--r--src/actors/scala/actors/Future.scala11
-rw-r--r--src/actors/scala/actors/IScheduler.scala13
-rw-r--r--src/actors/scala/actors/MessageQueue.scala4
-rw-r--r--src/actors/scala/actors/Reaction.scala4
-rw-r--r--src/actors/scala/actors/Reactor.scala14
-rw-r--r--src/actors/scala/actors/SchedulerAdapter.scala2
-rw-r--r--src/actors/scala/actors/package.scala17
-rw-r--r--src/actors/scala/actors/scheduler/DaemonScheduler.scala2
-rw-r--r--src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala53
-rw-r--r--src/actors/scala/actors/scheduler/DelegatingScheduler.scala2
-rw-r--r--src/actors/scala/actors/scheduler/ExecutorScheduler.scala2
-rw-r--r--src/actors/scala/actors/scheduler/ForkJoinScheduler.scala45
-rw-r--r--src/actors/scala/actors/scheduler/SchedulerService.scala12
-rw-r--r--src/actors/scala/actors/scheduler/TerminationMonitor.scala29
-rw-r--r--src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala2
18 files changed, 128 insertions, 154 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 06b8af788f..82ebe5059f 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -638,7 +638,7 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
}
// guarded by lock of this
- protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ protected[this] override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
}
diff --git a/src/actors/scala/actors/ActorGC.scala b/src/actors/scala/actors/ActorGC.scala
index 1d2349d028..68d8bda8e2 100644
--- a/src/actors/scala/actors/ActorGC.scala
+++ b/src/actors/scala/actors/ActorGC.scala
@@ -12,7 +12,7 @@ package scala.actors
import java.lang.ref.{Reference, WeakReference, ReferenceQueue}
-import scala.collection.mutable.{HashMap, HashSet}
+import scala.collection.mutable.HashSet
import scala.actors.scheduler.TerminationMonitor
/**
@@ -43,16 +43,16 @@ trait ActorGC extends TerminationMonitor {
// registers a reference to the actor with the ReferenceQueue
val wr = new WeakReference[Reactor](a, refQ)
refSet += wr
- pendingReactions += 1
+ activeActors += 1
}
- /** Removes unreachable actors from refSet. */
+ /** Checks for actors that have become garbage. */
protected override def gc() = synchronized {
// check for unreachable actors
def drainRefQ() {
val wr = refQ.poll
if (wr != null) {
- pendingReactions -= 1
+ activeActors -= 1
refSet -= wr
// continue draining
drainRefQ()
@@ -61,50 +61,42 @@ trait ActorGC extends TerminationMonitor {
drainRefQ()
}
+ /** Prints some status information on currently managed actors. */
protected def status() {
println(this+": size of refSet: "+refSet.size)
}
- protected override def allTerminated: Boolean = synchronized {
- pendingReactions <= 0
+ /** Checks whether all actors have terminated. */
+ override def allTerminated: Boolean = synchronized {
+ activeActors <= 0
}
override def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
- termHandlers += (a -> (() => f))
+ terminationHandlers += (a -> (() => f))
}
- /* Called only from <code>Reaction</code>.
- */
- override def terminated(a: Reactor) = synchronized {
- // execute registered termination handler (if any)
- termHandlers.get(a) match {
- case Some(handler) =>
- handler()
- // remove mapping
- termHandlers -= a
- case None =>
- // do nothing
- }
-
- // find the weak reference that points to the terminated actor, if any
- refSet.find((ref: Reference[t] forSome { type t <: Reactor }) => ref.get() == a) match {
- case Some(r) =>
- // invoking clear will not cause r to be enqueued
- r.clear()
- refSet -= r.asInstanceOf[Reference[t] forSome { type t <: Reactor }]
- case None =>
- // do nothing
+ override def terminated(a: Reactor) = {
+ super.terminated(a)
+
+ synchronized {
+ // find the weak reference that points to the terminated actor, if any
+ refSet.find((ref: Reference[t] forSome { type t <: Reactor }) => ref.get() == a) match {
+ case Some(r) =>
+ // invoking clear will not cause r to be enqueued
+ r.clear()
+ refSet -= r.asInstanceOf[Reference[t] forSome { type t <: Reactor }]
+ case None =>
+ // do nothing
+ }
}
-
- pendingReactions -= 1
}
private[actors] def getPendingCount = synchronized {
- pendingReactions
+ activeActors
}
private[actors] def setPendingCount(cnt: Int) = synchronized {
- pendingReactions = cnt
+ activeActors = cnt
}
}
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 9a2832d85f..cf26b33916 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -48,20 +48,8 @@ class FJTaskScheduler2(val initCoreSize: Int, val maxSize: Int, daemon: Boolean)
private var submittedTasks = 0
- def printActorDump {}
-
private val CHECK_FREQ = 100
- def onLockup(handler: () => Unit) =
- lockupHandler = handler
-
- def onLockup(millis: Int)(handler: () => Unit) = {
- //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
- lockupHandler = handler
- }
-
- private var lockupHandler: () => Unit = null
-
private def allWorkersBlocked: Boolean =
executor.threads.forall(t => {
val s = t.getState()
diff --git a/src/actors/scala/actors/Future.scala b/src/actors/scala/actors/Future.scala
index 5ed758a65b..ec07f2a1a8 100644
--- a/src/actors/scala/actors/Future.scala
+++ b/src/actors/scala/actors/Future.scala
@@ -25,9 +25,9 @@ import scheduler.DefaultThreadPoolScheduler
* </p>
*
* @author Philipp Haller
- * @version 0.9.16
*/
abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T] with Function0[T] {
+ @deprecated("this member is going to be removed in a future release")
protected var value: Option[Any] = None
def isSet: Boolean
}
@@ -35,17 +35,14 @@ abstract class Future[+T](val inputChannel: InputChannel[T]) extends Responder[T
/**
* The <code>Futures</code> object contains methods that operate on Futures.
*
- * @version 0.9.8
* @author Philipp Haller
*/
object Futures {
- private lazy val sched = new DefaultThreadPoolScheduler(true)
+ private case object Eval
def future[T](body: => T): Future[T] = {
- case object Eval
- val a = new Actor {
- override def scheduler: IScheduler = sched
+ val a = new DaemonActor {
def act() {
Actor.react {
case Eval => Actor.reply(body)
@@ -139,7 +136,7 @@ object Futures {
results
}
- def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] =
+ private[actors] def fromInputChannel[T](inputChannel: InputChannel[T]): Future[T] =
new Future[T](inputChannel) {
def apply() =
if (isSet) value.get.asInstanceOf[T]
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index f4034fbfab..8afed3aa86 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -67,4 +67,17 @@ trait IScheduler {
def onTerminate(a: Reactor)(f: => Unit): Unit
def managedBlock(blocker: scala.concurrent.ManagedBlocker): Unit
+
+ @deprecated("this member is going to be removed in a future release")
+ def tick(a: Actor) {}
+
+ @deprecated("this member is going to be removed in a future release")
+ def onLockup(handler: () => Unit) {}
+
+ @deprecated("this member is going to be removed in a future release")
+ def onLockup(millis: Int)(handler: () => Unit) {}
+
+ @deprecated("this member is going to be removed in a future release")
+ def printActorDump {}
+
}
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 4435759264..540a992640 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -14,10 +14,10 @@ package scala.actors
* This class is used by our efficient message queue
* implementation.
*
- * @version 0.9.9
* @author Philipp Haller
*/
@serializable @SerialVersionUID(7124278808020037465L)
+@deprecated("this class is going to be removed in a future release")
class MessageQueueElement(val msg: Any, val session: OutputChannel[Any], var next: MessageQueueElement) {
def this() = this(null, null, null)
def this(msg: Any, session: OutputChannel[Any]) = this(msg, session, null)
@@ -29,10 +29,10 @@ class MessageQueueElement(val msg: Any, val session: OutputChannel[Any], var nex
* library. Classes in this package are supposed to be the only
* clients of this class.
*
- * @version 0.9.9
* @author Philipp Haller
*/
@serializable @SerialVersionUID(2168935872884095767L)
+@deprecated("this class is going to be removed in a future release")
class MessageQueue(protected val label: String) {
protected var first: MessageQueueElement = null
protected var last: MessageQueueElement = null // last eq null iff list is empty
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 1c9541546a..80db4b2c1e 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -23,10 +23,10 @@ private[actors] class KillActorException extends Throwable with ControlException
* <code>java.lang.Runnable</code></a>.
* </p>
*
- * @version 0.9.10
+ * @deprecated("this class is going to be removed in a future release")
* @author Philipp Haller
*/
-private[actors] class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, () => {
+class Reaction(a: Actor, f: PartialFunction[Any, Unit], msg: Any) extends ActorTask(a, () => {
if (f == null)
a.act()
else
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 9c8a7a223e..c07612f1eb 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -20,14 +20,14 @@ import scala.collection.mutable.Queue
trait Reactor extends OutputChannel[Any] {
/* The actor's mailbox. */
- protected val mailbox = new MessageQueue("Reactor")
+ protected[this] val mailbox = new MessageQueue("Reactor")
- protected var sendBuffer = new Queue[(Any, OutputChannel[Any])]
+ protected[this] var sendBuffer = new Queue[(Any, OutputChannel[Any])]
/* If the actor waits in a react, continuation holds the
* message handler that react was called with.
*/
- protected var continuation: PartialFunction[Any, Unit] = null
+ protected[this] var continuation: PartialFunction[Any, Unit] = null
/* Whenever this Actor executes on some thread, waitingFor is
* guaranteed to be equal to waitingForNone.
@@ -36,8 +36,8 @@ trait Reactor extends OutputChannel[Any] {
* waitingForNone, this Actor is guaranteed not to execute on some
* thread.
*/
- protected val waitingForNone = (m: Any) => false
- protected var waitingFor: Any => Boolean = waitingForNone
+ protected[this] val waitingForNone = (m: Any) => false
+ protected[this] var waitingFor: Any => Boolean = waitingForNone
/**
* The behavior of an actor is specified by implementing this
@@ -147,7 +147,7 @@ trait Reactor extends OutputChannel[Any] {
/* This method is guaranteed to be executed from inside
an actors act method.
*/
- protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ protected[this] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
scheduler executeFromActor (new LightReaction(this,
if (f eq null) continuation else f,
msg))
@@ -184,7 +184,7 @@ trait Reactor extends OutputChannel[Any] {
throw new KillActorException
}
- protected[actors] def exit(): Nothing = {
+ protected[this] def exit(): Nothing = {
terminated()
throw Actor.suspendException
}
diff --git a/src/actors/scala/actors/SchedulerAdapter.scala b/src/actors/scala/actors/SchedulerAdapter.scala
index ba7822c372..2c5ccd2dd8 100644
--- a/src/actors/scala/actors/SchedulerAdapter.scala
+++ b/src/actors/scala/actors/SchedulerAdapter.scala
@@ -15,7 +15,7 @@ package scala.actors
*
* Providing an implementation for the
* <code>execute(f: => Unit)</code> method is sufficient to
- * obtain a concrete class that extends <code>IScheduler</code>.
+ * obtain a concrete <code>IScheduler</code> implementation.
*
* @author Philipp Haller
*/
diff --git a/src/actors/scala/actors/package.scala b/src/actors/scala/actors/package.scala
new file mode 100644
index 0000000000..d74fb23086
--- /dev/null
+++ b/src/actors/scala/actors/package.scala
@@ -0,0 +1,17 @@
+
+package object actors {
+ @deprecated("use scala.actors.scheduler.ForkJoinScheduler instead")
+ type FJTaskScheduler2 = scala.actors.scheduler.ForkJoinScheduler
+
+ @deprecated("use scala.actors.scheduler.ForkJoinScheduler instead")
+ type TickedScheduler = scala.actors.scheduler.ForkJoinScheduler
+
+ @deprecated("use scala.actors.scheduler.ForkJoinScheduler instead")
+ type WorkerThreadScheduler = scala.actors.scheduler.ForkJoinScheduler
+
+ @deprecated("this class is going to be removed in a future release")
+ type WorkerThread = java.lang.Thread
+
+ @deprecated("this value is going to be removed in a future release")
+ val ActorGC = scala.actors.Scheduler.impl.asInstanceOf[scala.actors.scheduler.ThreadPoolScheduler]
+}
diff --git a/src/actors/scala/actors/scheduler/DaemonScheduler.scala b/src/actors/scala/actors/scheduler/DaemonScheduler.scala
index d748e8fa75..d88842535c 100644
--- a/src/actors/scala/actors/scheduler/DaemonScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DaemonScheduler.scala
@@ -16,6 +16,6 @@ package scheduler
*/
object DaemonScheduler extends DelegatingScheduler {
- def makeNewScheduler(): IScheduler = new DefaultExecutorScheduler(true)
+ def makeNewScheduler(): IScheduler = new DefaultThreadPoolScheduler(true)
}
diff --git a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala b/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
deleted file mode 100644
index 4a8f9d034d..0000000000
--- a/src/actors/scala/actors/scheduler/DefaultExecutorScheduler.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2009, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors
-package scheduler
-
-import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue, ThreadFactory}
-
-/**
- * The <code>DefaultExecutorScheduler</code> class uses a default
- * <code>ThreadPoolExecutor</code> for executing <code>Actor</code>s.
- *
- * It can be configured using the two JVM properties
- * <code>actors.corePoolSize</code> and
- * <code>actors.maxPoolSize</code> that control the initial and
- * maximum size of the thread pool, respectively.
- *
- * @author Philipp Haller
- */
-class DefaultExecutorScheduler(daemon: Boolean)
- extends SchedulerService(daemon) with ExecutorScheduler {
-
- def this() =
- this(false)
-
- private val workQueue = new LinkedBlockingQueue[Runnable]
-
- private val threadFactory = new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- val result = new Thread(r)
- result.setDaemon(daemon)
- result
- }
- }
-
- private val threadPool = new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize,
- ThreadPoolConfig.maxPoolSize,
- 60000L,
- TimeUnit.MILLISECONDS,
- workQueue,
- threadFactory)
-
- val executor = threadPool
-
- override val CHECK_FREQ = 10
-}
diff --git a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
index e72acd09d8..b25e3d26f5 100644
--- a/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/scheduler/DelegatingScheduler.scala
@@ -14,7 +14,7 @@ import scala.concurrent.ManagedBlocker
/**
* @author Erik Engbrecht
*/
-trait DelegatingScheduler extends IScheduler {
+private[actors] trait DelegatingScheduler extends IScheduler {
protected def makeNewScheduler(): IScheduler
protected var sched: IScheduler = null
diff --git a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
index b1c0da256c..52ec977b1f 100644
--- a/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ExecutorScheduler.scala
@@ -20,7 +20,7 @@ import scala.concurrent.ThreadPoolRunner
*
* @author Philipp Haller
*/
-trait ExecutorScheduler extends IScheduler with ThreadPoolRunner {
+private[scheduler] trait ExecutorScheduler extends IScheduler with ThreadPoolRunner {
def execute(task: Runnable) {
super[ThreadPoolRunner].execute(task.asInstanceOf[Task[Unit]])
diff --git a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
index b7f68be3b3..bbcbfa90f4 100644
--- a/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ForkJoinScheduler.scala
@@ -10,19 +10,32 @@ import scala.concurrent.forkjoin._
*
* @author Philipp Haller
*/
-class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor {
+class ForkJoinScheduler(val initCoreSize: Int, val maxSize: Int, daemon: Boolean) extends Runnable with IScheduler with TerminationMonitor {
private var pool = makeNewPool()
private var terminating = false
private var snapshoting = false
+
+ // this has to be a java.util.Collection, since this is what
+ // the ForkJoinPool returns.
private var drainedTasks: Collection[ForkJoinTask[_]] = null
- private val CHECK_FREQ = 10
+ protected val CHECK_FREQ = 10
+
+ def this(d: Boolean) {
+ this(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, d)
+ }
+
+ def this() {
+ this(false)
+ }
private def makeNewPool(): DrainableForkJoinPool = {
val p = new DrainableForkJoinPool()
// enable locally FIFO scheduling mode
p.setAsyncMode(true)
+ p.setParallelism(initCoreSize)
+ p.setMaximumPoolSize(maxSize)
Debug.info(this+": parallelism "+p.getParallelism())
Debug.info(this+": max pool size "+p.getMaximumPoolSize())
p
@@ -31,7 +44,15 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
/** Starts this scheduler.
*/
def start() {
- (new Thread(this)).start()
+ try {
+ val t = new Thread(this)
+ t.setDaemon(daemon)
+ t.setName("ForkJoinScheduler")
+ t.start()
+ } catch {
+ case e: Exception =>
+ Debug.info(this+": could not create scheduler thread: "+e)
+ }
}
private def allWorkersBlocked: Boolean =
@@ -62,6 +83,9 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
}
if (!snapshoting) {
+ gc()
+
+ // check if we need more threads to avoid deadlock
val poolSize = pool.getPoolSize()
if (allWorkersBlocked && (poolSize < ThreadPoolConfig.maxPoolSize)) {
pool.setParallelism(poolSize + 1)
@@ -90,6 +114,7 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
}
}
+ // TODO: when do we pass a task that is not a RecursiveAction?
def execute(task: Runnable) {
pool.execute(task)
}
@@ -111,13 +136,6 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
def run() { fun }
})
- override def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
- ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
- def block = blocker.block()
- def isReleasable() = blocker.isReleasable
- }, true)
- }
-
/** Shuts down the scheduler.
*/
def shutdown(): Unit = synchronized {
@@ -127,6 +145,13 @@ class ForkJoinScheduler extends Runnable with IScheduler with TerminationMonitor
def isActive =
(pool ne null) && !pool.isShutdown()
+ override def managedBlock(blocker: scala.concurrent.ManagedBlocker) {
+ ForkJoinPool.managedBlock(new ForkJoinPool.ManagedBlocker {
+ def block = blocker.block()
+ def isReleasable() = blocker.isReleasable
+ }, true)
+ }
+
/** Suspends the scheduler. All threads that were in use by the
* scheduler and its internal thread pool are terminated.
*/
diff --git a/src/actors/scala/actors/scheduler/SchedulerService.scala b/src/actors/scala/actors/scheduler/SchedulerService.scala
index 16937f52ea..37cfa5ca59 100644
--- a/src/actors/scala/actors/scheduler/SchedulerService.scala
+++ b/src/actors/scala/actors/scheduler/SchedulerService.scala
@@ -32,20 +32,8 @@ abstract class SchedulerService(daemon: Boolean) extends Thread with IScheduler
private var terminating = false
- def printActorDump {}
-
protected val CHECK_FREQ = 100
- def onLockup(handler: () => Unit) =
- lockupHandler = handler
-
- def onLockup(millis: Int)(handler: () => Unit) = {
- //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ
- lockupHandler = handler
- }
-
- private var lockupHandler: () => Unit = null
-
def onShutdown(): Unit
override def run() {
diff --git a/src/actors/scala/actors/scheduler/TerminationMonitor.scala b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
index d26d8e4e3d..875b79a8c1 100644
--- a/src/actors/scala/actors/scheduler/TerminationMonitor.scala
+++ b/src/actors/scala/actors/scheduler/TerminationMonitor.scala
@@ -15,13 +15,13 @@ import scala.collection.mutable.HashMap
trait TerminationMonitor {
- protected var pendingReactions = 0
- protected val termHandlers = new HashMap[Reactor, () => Unit]
+ protected var activeActors = 0
+ protected val terminationHandlers = new HashMap[Reactor, () => Unit]
private var started = false
/** newActor is invoked whenever a new actor is started. */
def newActor(a: Reactor) = synchronized {
- pendingReactions += 1
+ activeActors += 1
if (!started)
started = true
}
@@ -33,16 +33,20 @@ trait TerminationMonitor {
* @param f the closure to be registered
*/
def onTerminate(a: Reactor)(f: => Unit): Unit = synchronized {
- termHandlers += (a -> (() => f))
+ terminationHandlers += (a -> (() => f))
}
- def terminated(a: Reactor) = synchronized {
+ /** Registers that the specified actor has terminated.
+ *
+ * @param a the actor that has terminated
+ */
+ def terminated(a: Reactor) = {
// obtain termination handler (if any)
val todo = synchronized {
- termHandlers.get(a) match {
+ terminationHandlers.get(a) match {
case Some(handler) =>
- termHandlers -= a
- () => handler
+ terminationHandlers -= a
+ handler
case None =>
() => { /* do nothing */ }
}
@@ -52,13 +56,16 @@ trait TerminationMonitor {
todo()
synchronized {
- pendingReactions -= 1
+ activeActors -= 1
}
}
- protected def allTerminated: Boolean = synchronized {
- started && pendingReactions <= 0
+ /** Checks whether all actors have terminated. */
+ @deprecated("this method is going to be removed in a future release")
+ def allTerminated: Boolean = synchronized {
+ started && activeActors <= 0
}
+ /** Checks for actors that have become garbage. */
protected def gc() {}
}
diff --git a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
index 1a68dc669b..e429e89274 100644
--- a/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
+++ b/src/actors/scala/actors/scheduler/ThreadPoolScheduler.scala
@@ -47,7 +47,7 @@ class ThreadPoolScheduler(protected var executor: ThreadPoolExecutor,
this(null, true, d)
}
- protected def adjustCorePoolSize() {
+ private def adjustCorePoolSize() {
val coreSize = executor.getCorePoolSize()
if (coreSize < ThreadPoolConfig.maxPoolSize && (executor.getActiveCount() >= coreSize - 1)) {
executor.setCorePoolSize(coreSize + 1)