From bf18c3732071dee637284f50c5d979e924c3b1ad Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Tue, 10 Oct 2006 08:42:42 +0000 Subject: Event-based actors now allowed to call receive. --- src/actors/scala/actors/Actor.scala | 205 ++--------------------- src/actors/scala/actors/Channel.scala | 140 ++++++++++++++-- src/actors/scala/actors/Reactor.scala | 19 ++- src/actors/scala/actors/ThreadedActor.scala | 1 + src/actors/scala/actors/remote/NetKernel.scala | 2 +- src/actors/scala/actors/remote/RemoteActor.scala | 97 +++++++++++ 6 files changed, 255 insertions(+), 209 deletions(-) create mode 100644 src/actors/scala/actors/remote/RemoteActor.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 1caa6d6d8e..b1f8bcde23 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -31,24 +31,16 @@ object Actor { */ def self: Actor = synchronized { val t = Thread.currentThread() - if (t.isInstanceOf[ActorThread]) - t.asInstanceOf[ActorThread] - else { - var a = selfs.get(t).asInstanceOf[Actor] - if (a == null) { - a = new ActorProxy(t) - selfs.put(t, a) - } - a + var a = selfs.get(t).asInstanceOf[Actor] + if (a == null) { + a = new ActorProxy(t) + selfs.put(t, a) } + a } - /** - * Creates an instance of a thread-based actor executing body, - * and starts it. - */ - def actor(body: => Unit): ActorThread = synchronized { - val actor = new ActorThread { + def actor(body: => Unit): Actor = synchronized { + val actor = new Reactor { def act() = body } actor.start() @@ -60,8 +52,8 @@ object Actor { * channel which can be used for typed communication with other * actors. */ - def actor[a](ch: Channel[a])(body: => Unit): ActorThread = synchronized { - val actor = new ActorThread { + def actor[a](ch: Channel[a])(body: => Unit): Actor = synchronized { + val actor = new Reactor { def act() = body } ch.receiver = actor @@ -69,25 +61,10 @@ object Actor { actor } - /** - * Creates an instance of an event-based reactor executing - * body, and starts it. - */ - def reactor(body: => Unit): Reactor = synchronized { - val reactor = new Reactor { - def act() = body - } - reactor.start() - reactor - } - /** * Receives a message from the mailbox of * self. Blocks if no message matching any of the * cases of f can be received. - * - * Only (thread-based) actors may call this method. It fails at - * runtime if executed by a reactor. */ def receive[a](f: PartialFunction[Any, a]): a = self.in.receive(f) @@ -99,9 +76,6 @@ object Actor { f can be received. If no message could be received the TIMEOUT action is executed if specified. - - Only (thread-based) actors may call this method. It fails at - runtime if executed by a reactor. */ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = self.in.receiveWithin(msec)(f) @@ -303,14 +277,14 @@ trait Actor extends OutputChannel[Any] { private[actors] def pushSender(sender: Actor): unit private[actors] def popSender(): unit - private[actors] var suspendActor: () => unit = _ - private[actors] var suspendActorFor: long => unit = _ - private[actors] var detachActor: PartialFunction[Any, unit] => unit = _ + private[actors] var suspendActor: () => Unit = _ + private[actors] var suspendActorFor: long => Unit = _ + private[actors] var resumeActor: () => Unit = _ + private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _ private[actors] var kill: () => Unit = _ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) private[actors] def tick(): Unit - private[actors] def isThreaded: boolean private[actors] def resetActor(): unit resetActor() @@ -330,7 +304,7 @@ trait Actor extends OutputChannel[Any] { Links self to actor defined by body. */ def link(body: => Unit): Actor = { - val actor = new ActorThread { + val actor = new Reactor { def act() = body } link(actor) @@ -419,61 +393,13 @@ trait Actor extends OutputChannel[Any] { case class Exit(from: Actor, reason: String) -/** - * This class provides an implementation for actors based on - * threads. To be able to create instances of this class, the - * inherited abstract method act() has to be - * implemented. Note that the preferred way of creating - * thread-based actors is through the actor method - * defined in object Actor. - * - * @author Philipp Haller - */ -abstract class ActorThread extends Thread with ThreadedActor { - override def run(): Unit = { - try { - act() - if (isInterrupted()) - throw new InterruptedException - kill() - if (isInterrupted()) - throw new InterruptedException - exit("normal") - } - catch { - case ie: InterruptedException => - exitLinked() - case t: Throwable => - exitLinked(t.toString()) - } - } - - /** - Terminates execution of self with the following - effect on linked actors: - - For each linked actor a with - trapExit set to true, send message - Exit(self, reason) to a. - - For each linked actor a with - trapExit set to false (default), - call a.exit(reason) if - !reason.equals("normal"). - */ - def exit(reason: String): Unit = { - exitReason = reason - interrupt() - } -} - /** * This class provides a dynamic actor proxy for normal Java * threads. * * @author Philipp Haller */ -private[actors] class ActorProxy(t: Thread) extends ThreadedActor { +private[actors] class ActorProxy(t: Thread) extends Reactor { def act(): Unit = {} /** Terminates execution of self with the following @@ -488,112 +414,13 @@ private[actors] class ActorProxy(t: Thread) extends ThreadedActor { call a.exit(reason) if !reason.equals("normal"). */ - def exit(reason: String): Unit = { + override def exit(reason: String): Unit = { exitReason = reason t.interrupt() } } -/** - This object provides methods for creating, registering, and - selecting remotely accessible actors. - - A remote actor is typically created like this: -
- actor {
-   alive(9010)
-   register('myName, self)
-
-   // behavior
- }
- 
- - It can be accessed by an actor running on a (possibly) - different node by selecting it in the following way: -
- actor {
-   // ...
-   val c = select(TcpNode("127.0.0.1", 9010), 'myName)
-   c ! msg
-   // ...
- }
- 
- - @author Philipp Haller - */ -object RemoteActor { - import remote.NetKernel - import remote.TcpService - - private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel] - - /** - * Makes self remotely accessible on TCP port - * port. - */ - def alive(port: int): Unit = { - val serv = new TcpService(port) - serv.start() - kernels += Actor.self -> serv.kernel - } - - /** - * Registers a under name on this - * node. - */ - def register(name: Symbol, a: Actor): Unit = { - val kernel = kernels.get(Actor.self) match { - case None => { - val serv = new TcpService(TcpService.generatePort) - serv.start() - kernels += Actor.self -> serv.kernel - serv.kernel - } - case Some(k) => k - } - kernel.register(name, a) - } - - /** - * Returns (a proxy for) the actor registered under - * name on node. - */ - def select(node: Node, name: Symbol): Actor = - new Reactor { - def act(): Unit = {} - override def !(msg: Any): Unit = msg match { - case a: AnyRef => { - // establish remotely accessible - // return path (sender) - val kernel = kernels.get(Actor.self) match { - case None => { - val serv = new TcpService(TcpService.generatePort) - serv.start() - kernels += Actor.self -> serv.kernel - serv.kernel - } - case Some(k) => k - } - kernel.send(node, name, a) - } - case other => - error("Cannot send non-AnyRef value remotely.") - } - override def !?(msg: Any): Any = - error("!? not implemented for remote actors.") - } -} - - -/** - * This class represents a machine node on a TCP network. - * - * @author Philipp Haller - */ -case class Node(address: String, port: Int) - - /** *

* This class is used by our efficient message queue diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 4bb4205614..e7c800a8f2 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -34,12 +34,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private[actors] var receiver: Actor = synchronized { // basically Actor.self, but can be null val t = Thread.currentThread() - if (t.isInstanceOf[ActorThread]) - t.asInstanceOf[ActorThread] - else { - val a = Actor.selfs.get(t).asInstanceOf[Actor] - a - } + val a = Actor.selfs.get(t).asInstanceOf[Actor] + a } private var received: Msg = _ @@ -48,7 +44,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private var waitingFor: Msg => boolean = waitingForNone private var waitingForSender: Actor = null - private val messageQueue = new MessageQueue[Msg] + //private val messageQueue = new MessageQueue[Msg] + private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]] private def send(msg: Msg, sender: Actor) = receiver.synchronized { receiver.tick() @@ -67,9 +64,13 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } } - receiver.scheduleActor(null, msg) + if (isSuspended) + receiver.resumeActor() + else + receiver.scheduleActor(null, msg) } else { - messageQueue.append(msg, sender) + //messageQueue.append(msg, sender) + mailbox += Pair(msg, sender) } } @@ -96,15 +97,18 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { */ def forward(msg: Msg): unit = send(msg, receiver.sender) + private var isSuspended = false + /** * Receives a message from this Channel. */ def receive[R](f: PartialFunction[Msg, R]): R = { assert(Actor.self == receiver, "receive from channel belonging to other actor") - assert(receiver.isThreaded, "receive invoked from reactor") + //assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt +/* val q = messageQueue.extractFirst(waitingFor) if (q != null) { received = q.msg @@ -112,8 +116,28 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } // acquire lock because we might call wait() else synchronized { + isSuspended = true receiver.suspendActor() } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) + }) match { + case Some(Pair(msg, sender)) => { + received = msg + receiver.pushSender(sender) + } + case None => { + // acquire lock because we might call wait() + this.synchronized { + isSuspended = true + receiver.suspendActor() + } + } + } + + isSuspended = false waitingFor = waitingForNone } receiver.resetActor() @@ -124,11 +148,12 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = { assert(Actor.self == receiver, "receive from channel belonging to other actor") - assert(receiver.isThreaded, "receive invoked from reactor") + //assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt waitingForSender = r +/* var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => { waitingFor(item.msg) && item.sender == r }) @@ -137,8 +162,28 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { receiver.pushSender(q.sender) } else synchronized { + isSuspended = true receiver.suspendActor() } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) && p._2 == r + }) match { + case Some(Pair(msg, sender)) => { + received = msg + receiver.pushSender(sender) + } + case None => { + // acquire lock because we might call wait() + this.synchronized { + isSuspended = true + receiver.suspendActor() + } + } + } + + isSuspended = false waitingFor = waitingForNone waitingForSender = null } @@ -156,19 +201,22 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { */ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { assert(Actor.self == receiver, "receive from channel belonging to other actor") - assert(receiver.isThreaded, "receive invoked from reactor") + //assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt +/* val q = messageQueue.extractFirst(waitingFor) if (q != null) { received = q.msg receiver.pushSender(q.sender) } else synchronized { + isSuspended = true receiver.suspendActorFor(msec) if (received == null) if (f.isDefinedAt(TIMEOUT)) { + isSuspended = false receiver.resetActor() val result = f(TIMEOUT) return result @@ -176,6 +224,34 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { else error("unhandled timeout") } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) + }) match { + case Some(Pair(msg, sender)) => { + received = msg + receiver.pushSender(sender) + } + case None => { + // acquire lock because we might call wait() + this.synchronized { + isSuspended = true + receiver.suspendActorFor(msec) + if (received == null) + if (f.isDefinedAt(TIMEOUT)) { + isSuspended = false + receiver.resetActor() + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } + } + } + + isSuspended = false waitingFor = waitingForNone } receiver.resetActor() @@ -192,6 +268,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt +/* val q = messageQueue.extractFirst(waitingFor) if (q != null) { received = q.msg @@ -202,6 +279,24 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { else synchronized { receiver.detachActor(f) } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) + }) match { + case Some(Pair(msg, sender)) => { + received = msg + receiver.pushSender(sender) + waitingFor = waitingForNone + receiver.scheduleActor(f, received) + } + case None => { + this.synchronized { + receiver.detachActor(f) + } + } + } + throw new SuspendActorException } } @@ -214,6 +309,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { receiver.synchronized { receiver.tick() waitingFor = f.isDefinedAt +/* val q = messageQueue.extractFirst(waitingFor) if (q != null) { received = q.msg @@ -226,6 +322,26 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { receiver.asInstanceOf[Reactor].timeoutPending = true receiver.detachActor(f) } +*/ + + mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { + waitingFor(p._1) + }) match { + case Some(Pair(msg, sender)) => { + received = msg + receiver.pushSender(sender) + waitingFor = waitingForNone + receiver.scheduleActor(f, received) + } + case None => { + this.synchronized { + TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec) + receiver.asInstanceOf[Reactor].timeoutPending = true + receiver.detachActor(f) + } + } + } + throw new SuspendActorException } } diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 015dbd82f2..aa16eea7ac 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -20,12 +20,16 @@ package scala.actors * @author Philipp Haller */ trait Reactor extends Actor { - private var lastSender: Actor = null - private[actors] def sender: Actor = lastSender - private[actors] def pushSender(sender: Actor): Unit = lastSender = sender - private[actors] def popSender(): Unit = lastSender = null - private[actors] def isThreaded = false + private val lastSenders = new scala.collection.mutable.Stack[Actor] + + private[actors] def sender: Actor = { + if (lastSenders.isEmpty) null + else lastSenders.top + } + + private[actors] def pushSender(s: Actor) = { lastSenders.push(s) } + private[actors] def popSender(): Unit = { lastSenders.pop } private[actors] var continuation: PartialFunction[Any, Unit] = null private[actors] var timeoutPending = false @@ -51,9 +55,10 @@ trait Reactor extends Actor { } private[actors] def resetActor(): Unit = { + suspendActor = () => wait() + suspendActorFor = (msec: long) => wait(msec) + resumeActor = () => notify() detachActor = defaultDetachActor - suspendActor = () => error("suspendActor called on reactor.") - suspendActorFor = (msec: long) => error("suspendActorFor called on reactor.") kill = () => {} } diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala index 7059b70f3e..8f6f53d025 100644 --- a/src/actors/scala/actors/ThreadedActor.scala +++ b/src/actors/scala/actors/ThreadedActor.scala @@ -36,6 +36,7 @@ trait ThreadedActor extends Actor { private[actors] def resetActor() = { suspendActor = () => wait() suspendActorFor = (msec: long) => wait(msec) + resumeActor = () => notify() detachActor = (f: PartialFunction[Any, Unit]) => wait() kill = () => {} } diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala index 1a621005b7..d6705fccd7 100644 --- a/src/actors/scala/actors/remote/NetKernel.scala +++ b/src/actors/scala/actors/remote/NetKernel.scala @@ -45,7 +45,7 @@ class NetKernel(service: Service) { actors.get(receiver) match { case Some(a) => { val msg = service.serializer.deserialize(data) - val senderProxy = new ActorThread { + val senderProxy = new Reactor { def act() = { a ! msg } override def !(msg: Any): Unit = { msg match { diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala new file mode 100644 index 0000000000..0984834623 --- /dev/null +++ b/src/actors/scala/actors/remote/RemoteActor.scala @@ -0,0 +1,97 @@ +package scala.actors.remote + +/** + This object provides methods for creating, registering, and + selecting remotely accessible actors. + + A remote actor is typically created like this: +

+ actor {
+   alive(9010)
+   register('myName, self)
+
+   // behavior
+ }
+ 
+ + It can be accessed by an actor running on a (possibly) + different node by selecting it in the following way: +
+ actor {
+   // ...
+   val c = select(TcpNode("127.0.0.1", 9010), 'myName)
+   c ! msg
+   // ...
+ }
+ 
+ + @author Philipp Haller + */ +object RemoteActor { + + private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel] + + /** + * Makes self remotely accessible on TCP port + * port. + */ + def alive(port: int): Unit = { + val serv = new TcpService(port) + serv.start() + kernels += Actor.self -> serv.kernel + } + + /** + * Registers a under name on this + * node. + */ + def register(name: Symbol, a: Actor): Unit = { + val kernel = kernels.get(Actor.self) match { + case None => { + val serv = new TcpService(TcpService.generatePort) + serv.start() + kernels += Actor.self -> serv.kernel + serv.kernel + } + case Some(k) => k + } + kernel.register(name, a) + } + + /** + * Returns (a proxy for) the actor registered under + * name on node. + */ + def select(node: Node, name: Symbol): Actor = + new Reactor { + def act(): Unit = {} + override def !(msg: Any): Unit = msg match { + case a: AnyRef => { + // establish remotely accessible + // return path (sender) + val kernel = kernels.get(Actor.self) match { + case None => { + val serv = new TcpService(TcpService.generatePort) + serv.start() + kernels += Actor.self -> serv.kernel + serv.kernel + } + case Some(k) => k + } + kernel.send(node, name, a) + } + case other => + error("Cannot send non-AnyRef value remotely.") + } + override def !?(msg: Any): Any = + error("!? not implemented for remote actors.") + } +} + + +/** + * This class represents a machine node on a TCP network. + * + * @author Philipp Haller + */ +case class Node(address: String, port: Int) -- cgit v1.2.3