summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-10-03 12:50:09 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-10-03 12:50:09 +0000
commit6ce056f31e1fea835b33203dd864f1ee4edf587c (patch)
tree4b452ed52572e54b3f830917e4f24874fc1721e1
parent8d3d085f4bc64bfe019e49675e64073e5a73f60a (diff)
downloadscala-6ce056f31e1fea835b33203dd864f1ee4edf587c.tar.gz
scala-6ce056f31e1fea835b33203dd864f1ee4edf587c.tar.bz2
scala-6ce056f31e1fea835b33203dd864f1ee4edf587c.zip
Added scaladoc comments to most of the public t...
Added scaladoc comments to most of the public types and methods.
-rw-r--r--src/actors/scala/actors/Actor.scala272
-rw-r--r--src/actors/scala/actors/Channel.scala38
-rw-r--r--src/actors/scala/actors/Reactor.scala63
-rw-r--r--src/actors/scala/actors/Scheduler.scala177
-rw-r--r--src/actors/scala/actors/ThreadedActor.scala6
5 files changed, 385 insertions, 171 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index f6dc77d672..157f2631b8 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -2,10 +2,23 @@ package scala.actors
import scala.collection.mutable.HashSet
+/**
+ This object provides functions for the definition of actors and
+ reactors, as well as all actor operations, such as
+ <code>receive</code>, <code>react</code>, <code>reply</code>,
+ etc.
+
+ @author Philipp Haller
+ */
object Actor {
private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)
+ /**
+ Returns the currently executing actor. Should be used instead
+ of <code>this</code> in all blocks of code executed by
+ actors.
+ */
def self: Actor = synchronized {
val t = Thread.currentThread()
if (t.isInstanceOf[ActorThread])
@@ -20,6 +33,10 @@ object Actor {
}
}
+ /**
+ Creates an instance of a thread-based actor executing <code>body</code>,
+ and starts it.
+ */
def actor(body: => Unit): ActorThread = synchronized {
val actor = new ActorThread {
def act() = body
@@ -28,6 +45,11 @@ object Actor {
actor
}
+ /**
+ Creates an instance of a thread-based actor specifying a
+ channel which can be used for typed communication with other
+ actors.
+ */
def actor[a](ch: Channel[a])(body: => Unit): ActorThread = synchronized {
val actor = new ActorThread {
def act() = body
@@ -37,6 +59,10 @@ object Actor {
actor
}
+ /**
+ Creates an instance of an event-based reactor executing
+ <code>body</code>, and starts it.
+ */
def reactor(body: => Unit): Reactor = synchronized {
val reactor = new Reactor {
def act() = body
@@ -45,18 +71,53 @@ object Actor {
reactor
}
+ /**
+ Receives a message from the mailbox of
+ <code>self</code>. Blocks if no message matching any of the
+ cases of <code>f</code> can be received.
+
+ Only (thread-based) actors may call this method. It fails at
+ runtime if executed by a reactor.
+ */
def receive[a](f: PartialFunction[Any, a]): a =
self.in.receive(f)
+ /**
+ Receives a message from the mailbox of
+ <code>self</code>. Blocks at most <code>msec</code>
+ milliseconds if no message matching any of the cases of
+ <code>f</code> can be received. If no message could be
+ received the <code>TIMEOUT</code> action is executed if
+ specified.
+
+ Only (thread-based) actors may call this method. It fails at
+ runtime if executed by a reactor.
+ */
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R =
self.in.receiveWithin(msec)(f)
+ /**
+ <code>receive</code> for event-based reactors.
+
+ Actions in <code>f</code> have to contain the rest of the
+ computation of <code>self</code>, as this method will never
+ return.
+ */
def react(f: PartialFunction[Any, Unit]): Nothing =
self.in.react(f)
+ /**
+ <code>receiveWithin</code> for event-based reactors.
+
+ Actions in <code>f</code> have to contain the rest of the
+ computation of <code>self</code>, as this method will never
+ return.
+ */
+
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing =
self.in.reactWithin(msec)(f)
+ /*
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
self.in.react(new RecursiveProxyHandler(self, f))
@@ -69,7 +130,12 @@ object Actor {
self.in.react(this)
}
}
+ */
+ /**
+ Used for receiving a message from a specific actor.
+ Example: <code>from (a) receive { //... }</code>
+ */
def from(r: Actor): FromReceive =
new FromReceive(r)
@@ -78,14 +144,23 @@ object Actor {
self.in.receiveFrom(r)(f)
}
+ /**
+ Returns the actor which sent the last received message.
+ */
def sender: Actor = self.sender
+ /**
+ Send <code>msg</code> to the actor waiting in a call to
+ <code>!?</code>.
+ */
def reply(msg: Any): Unit = sender.reply ! msg
+ /**
+ Send <code>()</code> to the actor waiting in a call to
+ <code>!?</code>.
+ */
def reply(): Unit = reply(())
- def forward(msg: Any): Unit = self.in.forward(msg)
-
private[actors] trait Body[T] {
def orElse(other: => T): T
def andThen(other: => T): T
@@ -96,7 +171,7 @@ object Actor {
def andThen(other: => Unit): Unit = seq(body, other)
}
- def choose(alt1: => Unit, alt2: => Unit): Unit = {
+ private[actors] def choose(alt1: => Unit, alt2: => Unit): Unit = {
val s = self
// save former custom suspendActor function
// (e.g. from further orElse)
@@ -117,24 +192,63 @@ object Actor {
}
}
- def loop(b: => Unit): Unit = {
+ /**
+ Causes <code>self</code> to repeatedly execute
+ <code>body</code>.
+ */
+ def loop(body: => Unit): Unit = {
val s = self
- s.kill = () => { b; s.kill() }
- b
+ s.kill = () => { body; s.kill() }
+ body
}
- def seq(b1: => Unit, b2: => Unit): Unit = {
+ /**
+ Causes <code>self</code> to execute <code>first</code>
+ followed by <code>next</code>.
+ */
+ def seq(first: => Unit, next: => Unit): Unit = {
val s = self
- s.kill = () => { b2 }
- b1
+ s.kill = () => { next }
+ first
}
+ /**
+ Links <code>self</code> to actor <code>to</code>.
+ */
def link(to: Actor): Actor = self.link(to)
+
+ /**
+ Links <code>self</code> to actor defined by <code>body</code>.
+ */
def link(body: => Unit): Actor = self.link(body)
+
+ /**
+ Unlinks <code>self</code> from actor <code>from</code>.
+ */
def unlink(from: Actor): Unit = self.unlink(from)
+
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
+ */
def exit(reason: String): Unit = self.exit(reason)
}
+/**
+ This trait defines commonalities between thread-based and
+ event-based actors.
+
+ @author Philipp Haller
+ */
trait Actor {
private[actors] val in = new Channel[Any]
@@ -154,12 +268,23 @@ trait Actor {
rc.receiver = this
}
- /*
- Specification of behavior
+ /**
+ The behavior of an actor is specified by implementing this
+ abstract method. Note that the preferred way to create actors
+ is through the <code>actor</code> and <code>reactor</code>
+ methods defined in object <code>Actor</code>.
*/
def act(): Unit
+ /**
+ Sends <code>msg</code> to this actor (asynchronous).
+ */
def !(msg: Any): Unit = in ! msg
+
+ /**
+ Sends <code>msg</code> to this actor and awaits reply
+ (synchronous).
+ */
def !?(msg: Any): Any = in !? msg
private[actors] def sender: Actor
@@ -180,12 +305,18 @@ trait Actor {
private val links = new HashSet[Actor]
+ /**
+ Links <code>self</code> to actor <code>to</code>.
+ */
def link(to: Actor): Actor = {
links += to
to.linkTo(this)
to
}
+ /**
+ Links <code>self</code> to actor defined by <code>body</code>.
+ */
def link(body: => Unit): Actor = {
val actor = new ActorThread {
def act() = body
@@ -198,6 +329,9 @@ trait Actor {
private[actors] def linkTo(to: Actor): Unit =
links += to
+ /**
+ Unlinks <code>self</code> from actor <code>from</code>.
+ */
def unlink(from: Actor): Unit = {
links -= from
from.unlinkFrom(this)
@@ -210,6 +344,19 @@ trait Actor {
private[actors] var exitReason: String = ""
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
+ */
def exit(reason: String): Unit
private[actors] def exit(from: Actor, reason: String): Unit = {
@@ -249,9 +396,27 @@ trait Actor {
}
}
+/**
+ Messages of this type are sent to each actor <code>a</code>
+ that is linked to an actor <code>b</code> whenever
+ <code>b</code> terminates and <code>a</code> has
+ <code>trapExit</code> set to <code>true</code>.
+
+ @author Philipp Haller
+ */
case class Exit(from: Actor, reason: String)
+/**
+ This class provides an implementation for actors based on
+ threads. To be able to create instances of this class, the
+ inherited abstract method <code>act()</code> has to be
+ implemented. Note that the preferred way of creating
+ thread-based actors is through the <code>actor</code> method
+ defined in object <code>Actor</code>.
+
+ @author Philipp Haller
+ */
abstract class ActorThread extends Thread with ThreadedActor {
override def run(): Unit = {
try {
@@ -271,14 +436,46 @@ abstract class ActorThread extends Thread with ThreadedActor {
}
}
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
+ */
def exit(reason: String): Unit = {
exitReason = reason
interrupt()
}
}
-class ActorProxy(t: Thread) extends ThreadedActor {
+/**
+ This class provides a dynamic actor proxy for normal Java
+ threads.
+
+ @author Philipp Haller
+ */
+private[actors] class ActorProxy(t: Thread) extends ThreadedActor {
def act(): Unit = {}
+ /**
+ Terminates execution of <code>self</code> with the following
+ effect on linked actors:
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>true</code>, send message
+ <code>Exit(self, reason)</code> to <code>a</code>.
+
+ For each linked actor <code>a</code> with
+ <code>trapExit</code> set to <code>false</code> (default),
+ call <code>a.exit(reason)</code> if
+ <code>!reason.equals("normal")</code>.
+ */
def exit(reason: String): Unit = {
exitReason = reason
t.interrupt()
@@ -286,18 +483,53 @@ class ActorProxy(t: Thread) extends ThreadedActor {
}
+/**
+ This object provides methods for creating, registering, and
+ selecting remotely accessible actors.
+
+ A remote actor is typically created like this:
+ <pre>
+ actor {
+ alive(9010)
+ register('myName, self)
+
+ // behavior
+ }
+ </pre>
+
+ It can be accessed by an actor running on a (possibly)
+ different node by selecting it in the following way:
+ <pre>
+ actor {
+ // ...
+ val c = select(TcpNode("127.0.0.1", 9010), 'myName)
+ c ! msg
+ // ...
+ }
+ </pre>
+
+ @author Philipp Haller
+ */
object RemoteActor {
import remote.NetKernel
import remote.TcpService
private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel]
+ /**
+ Makes <code>self</code> remotely accessible on TCP port
+ <code>port</code>.
+ */
def alive(port: int): Unit = {
val serv = new TcpService(port)
serv.start()
kernels += Actor.self -> serv.kernel
}
+ /**
+ Registers <code>a</code> under <code>name</code> on this
+ node.
+ */
def register(name: Symbol, a: Actor): Unit = {
val kernel = kernels.get(Actor.self) match {
case None => {
@@ -311,6 +543,10 @@ object RemoteActor {
kernel.register(name, a)
}
+ /**
+ Returns (a proxy for) the actor registered under
+ <code>name</code> on <code>node</code>.
+ */
def select(node: Node, name: Symbol): Actor =
new Reactor {
def act(): Unit = {}
@@ -342,11 +578,23 @@ case class TcpNode(address: String, port: Int) extends Node
case class JxtaNode(group: String) extends Node
+/**
+ This class is used by our efficient message queue
+ implementation.
+ */
private[actors] abstract class MessageQueueResult[Msg] {
def msg: Msg
def sender: Actor
}
+/**
+ The class <code>MessageQueue</code> provides an efficient
+ implementation of a message queue specialized for this actor
+ library. Classes in this package are supposed to be the only
+ clients of this class.
+
+ @author Martin Odersky, Philipp Haller
+ */
private[actors] class MessageQueue[Msg] extends MessageQueueResult[Msg] {
var msg: Msg = _
var sender: Actor = _
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 659e6d2f5d..a7e723f724 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -14,6 +14,13 @@ class SuspendActorException extends Throwable {
}
}
+/**
+ This class provides a means for typed communication among
+ actors. Only the actor creating an instance of a
+ <code>Channel</code> may receive from it.
+
+ @author Philipp Haller
+ */
class Channel[Msg] {
private[actors] var receiver: Actor = synchronized {
@@ -57,8 +64,15 @@ class Channel[Msg] {
}
}
+ /**
+ Sends <code>msg</code> to this <code>Channel</code>.
+ */
def !(msg: Msg): unit = send(msg, self)
+ /**
+ Sends <code>msg</code> to this <code>Channel</code> and
+ awaits reply.
+ */
def !?(msg: Msg): Any = {
self.freshReply()
this ! msg
@@ -67,8 +81,15 @@ class Channel[Msg] {
}
}
+ /**
+ Forwards <code>msg</code> to <code>this</code> keeping the
+ last sender as sender instead of <code>self</code>.
+ */
def forward(msg: Msg): unit = send(msg, receiver.sender)
+ /**
+ Receives a message from this <code>Channel</code>.
+ */
def receive[R](f: PartialFunction[Msg, R]): R = {
assert(self == receiver, "receive from channel belonging to other actor")
assert(receiver.isThreaded, "receive invoked from reactor")
@@ -115,6 +136,12 @@ class Channel[Msg] {
result
}
+ /**
+ Receives a message from this <code>Channel</code>. If no
+ message could be received before <code>msec</code>
+ milliseconds elapsed, the <code>TIMEOUT</code> action is
+ executed if specified.
+ */
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
assert(self == receiver, "receive from channel belonging to other actor")
assert(receiver.isThreaded, "receive invoked from reactor")
@@ -144,6 +171,9 @@ class Channel[Msg] {
result
}
+ /**
+ <code>receive</code> for reactors.
+ */
def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(self == receiver, "react on channel belonging to other actor")
receiver.synchronized {
@@ -154,11 +184,6 @@ class Channel[Msg] {
receiver.pushSender(q.sender)
waitingFor = waitingForNone
receiver.scheduleActor(f, received)
-
- // would like:
- // receiver.continuation = f
- // receiver.message = received
- // receiver.resume()
}
else synchronized {
receiver.detachActor(f)
@@ -167,6 +192,9 @@ class Channel[Msg] {
}
}
+ /**
+ <code>receiveWithin</code> for reactors.
+ */
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(self == receiver, "react on channel belonging to other actor")
receiver.synchronized {
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 939bd4a166..4b6c63180e 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -1,17 +1,27 @@
package scala.actors
+/**
+
+ This class provides (together with <code>Channel</code>) an
+ implementation of event-based actors (aka reactors).
+
+ The main ideas of our approach are explained in the paper<br>
+ <b>Event-Based Programming without Inversion of Control</b>, Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
+
+ @author Philipp Haller
+ */
trait Reactor extends Actor {
private var lastSender: Actor = null
- def sender: Actor = lastSender
- def pushSender(sender: Actor): Unit = lastSender = sender
- def popSender(): Unit = lastSender = null
+ private[actors] def sender: Actor = lastSender
+ private[actors] def pushSender(sender: Actor): Unit = lastSender = sender
+ private[actors] def popSender(): Unit = lastSender = null
- def isThreaded = false
+ private[actors] def isThreaded = false
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
- def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
if (f == null && continuation == null) {
// do nothing (timeout is handled instead)
}
@@ -23,13 +33,13 @@ trait Reactor extends Actor {
}
}
- def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
+ private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
(f: PartialFunction[Any, Unit]) => {
continuation = f
throw new SuspendActorException
}
- def resetActor(): Unit = {
+ private[actors] def resetActor(): Unit = {
detachActor = defaultDetachActor
suspendActor = () => error("suspendActor called on reactor.")
suspendActorFor = (msec: long) => error("suspendActorFor called on reactor.")
@@ -38,21 +48,43 @@ trait Reactor extends Actor {
resetActor()
+ /**
+ Starts this reactor.
+ */
def start(): Unit = {
Scheduler.execute(new StartTask(this))
}
+ /**
+ Terminates this reactor, thereby influencing linked actors
+ (see Actor.exit).
+ */
def exit(reason: String): Unit = {
exitReason = reason
Thread.currentThread().interrupt()
}
}
-abstract class Reaction extends Runnable {
+/**
+ The abstract class <code>Reaction</code> associates an instance
+ of a <code>Reactor</code> with a
+ <code>java.lang.Runnable</code>. It is also the super class of
+ the different kinds of tasks used for the execution of
+ <code>Reactor</code>s.
+
+ @author Philipp Haller
+ */
+private[actors] abstract class Reaction extends Runnable {
def actor: Reactor
}
-class StartTask(a: Reactor) extends Reaction {
+/**
+ This class represents task items used to start the execution
+ of <code>Reactor</code>s.
+
+ @author Philipp Haller
+ */
+private[actors] class StartTask(a: Reactor) extends Reaction {
def actor = a
def run(): Unit = {
@@ -83,9 +115,16 @@ class StartTask(a: Reactor) extends Reaction {
}
}
-class ActorTask(a: Reactor,
- f: PartialFunction[Any, Unit],
- msg: Any) extends Reaction {
+/**
+ This class represents task items used to execute actions
+ specified in arguments of <code>react</code> and
+ <code>reactWithin</code>.
+
+ @author Philipp Haller
+ */
+private[actors] class ActorTask(a: Reactor,
+ f: PartialFunction[Any, Unit],
+ msg: Any) extends Reaction {
def actor = a
def run(): Unit = {
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 2f8bdd1243..429437a6c7 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -1,17 +1,20 @@
package scala.actors
-import scala.collection.mutable._
+import scala.collection.mutable.{Queue,Buffer,ArrayBuffer}
-object Scheduler /*extends java.util.concurrent.Executor*/ {
- private var sched: /*java.util.concurrent.Executor*/ IScheduler =
- //java.util.concurrent.Executors.newFixedThreadPool(4);
- //new FixedWorkersScheduler(2);
- //new SpareWorkerScheduler2;
+/**
+ The <code>Scheduler</code> object is used by
+ <code>Reactor</code> to execute tasks of an execution of a
+ reactor.
+
+ @author Philipp Haller
+ */
+object Scheduler {
+ private var sched: IScheduler =
new SpareWorkerScheduler
- //new SingleThreadedScheduler
def impl = sched
- def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = {
+ def impl_= (scheduler: IScheduler) = {
sched = scheduler
}
@@ -26,7 +29,13 @@ object Scheduler /*extends java.util.concurrent.Executor*/ {
def shutdown(): Unit = sched.shutdown()
}
-abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
+/**
+ This abstract class provides a common interface for all
+ schedulers used to execute reactors.
+
+ @author Philipp Haller
+ */
+abstract class IScheduler {
def execute(task: Reaction): Unit
def getTask(worker: WorkerThread): Runnable
def tick(a: Reactor): Unit
@@ -40,6 +49,12 @@ abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
}
}
+/**
+ This scheduler executes the tasks of a reactor on a single
+ thread (the current thread).
+
+ @author Philipp Haller
+ */
class SingleThreadedScheduler extends IScheduler {
def execute(task: Reaction): Unit = {
// execute task immediately on same thread
@@ -53,136 +68,13 @@ class SingleThreadedScheduler extends IScheduler {
def shutdown(): Unit = {}
}
-abstract class FixedWorkersScheduler(workercnt: int) extends IScheduler {
- private var canQuit = false;
- private val tasks = new Queue[ActorTask];
- private val idle = new Queue[WorkerThread];
-
- private var workers = new Array[WorkerThread](workercnt);
-
- def init = {
- for (val i <- List.range(0, workers.length)) {
- workers(i) = new WorkerThread(this)
- workers(i).start()
- }
- }
- init;
-
- def execute(item: ActorTask): unit = synchronized {
- if (workers.length == 0) item.run
- else {
- canQuit = true
- if (idle.length > 0) idle.dequeue.execute(item)
- else tasks += item
- }
- }
-
- def getTask(worker: WorkerThread) = synchronized {
- if (tasks.length > 0) tasks.dequeue
- else {
- idle += worker
- null
- //if ((idle.length == workers.length) && canQuit) haltExcept(worker)
- //else null
- }
- }
-
- def tick(a: Reactor): unit = {}
-}
-
-abstract class SpareWorkerScheduler2 extends IScheduler {
- private val tasks = new Queue[ActorTask];
- private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
-
- val idle = new Queue[WorkerThread];
- val ticks = new HashMap[WorkerThread, long]
- val executing = new HashMap[Reactor, WorkerThread]
-
- var TICKFREQ = 50
-
- def init = {
- for (val i <- List.range(0, 2)) {
- val worker = new WorkerThread(this)
- workers += worker
- worker.start()
- }
- }
- init;
-
- var maxWorkers = 0;
- var ticksCnt = 0;
-
- def tick(a: Reactor): unit = synchronized {
- ticksCnt = ticksCnt + 1
- executing.get(a) match {
- case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
- case Some(wt) =>
- ticks.update(wt, System.currentTimeMillis)
- }
- }
-
- def execute(item: ActorTask): unit = synchronized {
- if (idle.length > 0) {
- val wt = idle.dequeue
- executing.update(item.actor, wt)
- wt.execute(item)
- }
- else {
- /* only create new worker thread
- when all are blocked according to heuristic
-
- we check time stamps of latest send/receive ops
- of ALL workers
-
- we stop if there is one which is not blocked */
-
- val iter = workers.elements
- var foundBusy = false
- while (iter.hasNext && !foundBusy) {
- val wt = iter.next
- ticks.get(wt) match {
- case None => foundBusy = true // assume not blocked
- case Some(ts) => {
- val currTime = System.currentTimeMillis
- if (currTime - ts < TICKFREQ)
- foundBusy = true
- }
- }
- }
-
- if (!foundBusy) {
- val newWorker = new WorkerThread(this)
- workers += newWorker
- maxWorkers = workers.length // statistics
-
- executing.update(item.actor, newWorker)
-
- newWorker.execute(item)
- newWorker.start()
- }
- else {
- // wait assuming busy thread will be finished soon
- // and ask for more work
- tasks += item
- }
- }
- }
-
- def getTask(worker: WorkerThread) = synchronized {
- if (tasks.length > 0) {
- val item = tasks.dequeue
- executing.update(item.actor, worker)
- item
- }
- else {
- idle += worker
- null
- }
- }
-}
+/**
+ This scheduler creates additional threads whenever there is no
+ idle thread available.
+ @author Philipp Haller
+ */
class SpareWorkerScheduler extends IScheduler {
-
private val tasks = new Queue[Reaction]
private val idle = new Queue[WorkerThread]
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
@@ -228,9 +120,7 @@ class SpareWorkerScheduler extends IScheduler {
def shutdown(): Unit = synchronized {
terminating = true
-
val numNonIdle = workers.length - idle.length
-
for (val i <- List.range(0, numNonIdle))
tasks += QUIT_TASK
val idleThreads = idle.elements
@@ -242,18 +132,21 @@ class SpareWorkerScheduler extends IScheduler {
}
}
+/**
+ This class is used by schedulers to execute reactor tasks on
+ multiple threads.
+ @author Philipp Haller
+ */
class WorkerThread(sched: IScheduler) extends Thread {
-
private var task: Runnable = null
+ private var running = true
def execute(r: Runnable) = synchronized {
task = r
notify()
}
- private var running = true
-
override def run(): Unit = synchronized {
try {
while (running) {
diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala
index 5d3deadae4..2e3e8ecc2b 100644
--- a/src/actors/scala/actors/ThreadedActor.scala
+++ b/src/actors/scala/actors/ThreadedActor.scala
@@ -1,5 +1,11 @@
package scala.actors
+/**
+ This trait is part of the thread-based implementation of
+ actors.
+
+ @author Philipp Haller
+ */
trait ThreadedActor extends Actor {
private val lastSenders = new scala.collection.mutable.Stack[Actor]
def sender: Actor = {