From cf5b53633eb1f99c340feb18bcc83b8af2e8065d Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 17 Jan 2007 10:40:31 +0000 Subject: Changed channels. Removed receiveFrom and Request. --- src/actors/scala/actors/Actor.scala | 245 ++++++++++++----- src/actors/scala/actors/Channel.scala | 419 +++-------------------------- src/actors/scala/actors/InputChannel.scala | 2 +- src/actors/scala/actors/Scheduler.scala | 2 +- 4 files changed, 220 insertions(+), 448 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 478466f916..4444378a5b 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -10,16 +10,18 @@ package scala.actors -import scala.collection.mutable.{HashSet, Stack} +import scala.collection.mutable.{HashSet, Queue} import compat.Platform +import java.util.Stack + /** * The Actor object provides functions for the definition of * actors, as well as all actor operations, such as * receive, react, reply, * etc. * - * @version 0.9.0 + * @version 0.9.2 * @author Philipp Haller */ object Actor { @@ -67,9 +69,7 @@ object Actor { } */ - def ? : Any = self.in.? - - def poll: Option[Any] = self.in.poll + def ? : Any = self.? /** * Receives a message from the mailbox of @@ -80,7 +80,7 @@ object Actor { * @return ... */ def receive[a](f: PartialFunction[Any, a]): a = - self.in.receive(f) + self.receive(f) /** * Receives a message from the mailbox of @@ -95,7 +95,7 @@ object Actor { * @return ... */ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = - self.in.receiveWithin(msec)(f) + self.receiveWithin(msec)(f) /** * receive for event-based reactors. @@ -108,7 +108,7 @@ object Actor { * @return ... */ def react(f: PartialFunction[Any, Unit]): Nothing = - self.in.react(f) + self.react(f) /** * receiveWithin for event-based reactors. @@ -122,10 +122,10 @@ object Actor { * @return ... */ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = - self.in.reactWithin(msec)(f) + self.reactWithin(msec)(f) def eventloop(f: PartialFunction[Any, Unit]): Nothing = - self.in.react(new RecursiveProxyHandler(self, f)) + self.react(new RecursiveProxyHandler(self, f)) private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit]) extends PartialFunction[Any, Unit] { @@ -133,25 +133,10 @@ object Actor { true // events are immediately removed from the mailbox def apply(m: Any): Unit = { if (f.isDefinedAt(m)) f(m) - self.in.react(this) + self.react(this) } } - /** - *

Used for receiving a message from a specific actor.

- *

Example:

from (a) receive { //... } - * - * @param r ... - * @return ... - */ - def from(r: Actor): FromReceive = - new FromReceive(r) - - private[actors] class FromReceive(r: Actor) { - def receive[a](f: PartialFunction[Any, a]): a = - self.in.receiveFrom(r)(f) - } - /** * Returns the actor which sent the last received message. */ @@ -161,13 +146,13 @@ object Actor { * Send msg to the actor waiting in a call to * !?. */ - def reply(msg: Any): Unit = sender.reply ! msg + def reply(msg: Any): Unit = self.reply(msg) /** * Send () to the actor waiting in a call to * !?. */ - def reply(): Unit = reply(()) + def reply(): Unit = self.reply(()) private[actors] trait Body[a] { def orElse[b >: a](other: => b): b @@ -189,12 +174,12 @@ object Actor { // have to get out of the point of suspend in alt1's // receive s.suspendActor = () => { - s.in.isSuspended = false - s.in.waitingFor = s.in.waitingForNone + s.isSuspended = false + s.waitingFor = s.waitingForNone throw new SuspendActorException } s.detachActor = f => { - s.in.waitingFor = s.in.waitingForNone + s.waitingFor = s.waitingForNone Scheduler.unPendReaction throw new SuspendActorException } @@ -284,13 +269,6 @@ object Actor { def exit(reason: String): Nothing = self.exit(reason) } -case class Request[a](msg: a) { - private[actors] val in = new Channel[a] - def reply(resp: a): unit = { - in ! resp - } -} - /** *

* This class provides (together with Channel) an @@ -302,26 +280,148 @@ case class Request[a](msg: a) { * Philipp Haller, Martin Odersky Proc. JMLC 2006 *

* - * @version 0.9.0 + * @version 0.9.2 * @author Philipp Haller */ trait Actor extends OutputChannel[Any] { - private[actors] val in = new Channel[Any] - in.receiver = this - private var rc: Channel[Any] = null + private var received: Option[Any] = None + + private[actors] val waitingForNone = (m: Any) => false + private[actors] var waitingFor: Any => boolean = waitingForNone + private[actors] var isSuspended = false + + private val sessions = new Stack//[Channel[Any]] + + private val mailbox = new Queue[Pair[Any, Channel[Any]]] + + private def send(msg: Any, session: Channel[Any]) = synchronized { + tick() + if (waitingFor(msg)) { + received = Some(msg) + sessions push session + waitingFor = waitingForNone + + if (timeoutPending) { + timeoutPending = false + TimerThread.trashRequest(this) + } + + if (isSuspended) + resumeActor() + else + scheduleActor(null, msg) + } else { + mailbox += Pair(msg, session) + } + } + + def receive[R](f: PartialFunction[Any, R]): R = { + assert(Actor.self == this, "receive from channel belonging to other actor") + this.synchronized { + tick() + mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { + f.isDefinedAt(p._1) + }) match { + case Some(Pair(msg, session)) => { + received = Some(msg) + sessions push session + } + case None => { + waitingFor = f.isDefinedAt + isSuspended = true + suspendActor() + } + } + waitingFor = waitingForNone + isSuspended = false + } + val result = f(received.get) + sessions.pop + result + } + + def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { + assert(Actor.self == this, "receive from channel belonging to other actor") + this.synchronized { + tick() + mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { + f.isDefinedAt(p._1) + }) match { + case Some(Pair(msg, session)) => { + received = Some(msg) + sessions push session + } + case None => { + waitingFor = f.isDefinedAt + isSuspended = true + received = None + suspendActorFor(msec) + Debug.info("received: "+received) + if (received.isEmpty) { + Debug.info("no message received after "+msec+" millis") + if (f.isDefinedAt(TIMEOUT)) { + Debug.info("executing TIMEOUT action") + waitingFor = waitingForNone + isSuspended = false + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } + } + } + waitingFor = waitingForNone + isSuspended = false + } + val result = f(received.get) + sessions.pop + result + } - private[actors] def reply: Channel[Any] = { - if (rc eq null) { - rc = new Channel[Any] - rc.receiver = this + def react(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.self == this, "react on channel belonging to other actor") + Scheduler.pendReaction + this.synchronized { + tick() + mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { + f.isDefinedAt(p._1) + }) match { + case Some(Pair(msg, session)) => { + sessions push session + scheduleActor(f, msg) + } + case None => { + waitingFor = f.isDefinedAt + detachActor(f) + } + } + throw new SuspendActorException } - rc } - private[actors] def freshReply(): Unit = { - rc = new Channel[Any] - rc.receiver = this + def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.self == this, "react on channel belonging to other actor") + Scheduler.pendReaction + this.synchronized { + tick() + mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => { + f.isDefinedAt(p._1) + }) match { + case Some(Pair(msg, session)) => { + sessions push session + scheduleActor(f, msg) + } + case None => { + waitingFor = f.isDefinedAt + TimerThread.requestTimeout(this, f, msec) + timeoutPending = true + detachActor(f) + } + } + throw new SuspendActorException + } } /** @@ -335,32 +435,49 @@ trait Actor extends OutputChannel[Any] { /** * Sends msg to this actor (asynchronous). */ - def !(msg: Any): Unit = in ! msg + def !(msg: Any): Unit = send(msg, Actor.self.reply) - def forward(msg: Any): Unit = in forward msg + def forward(msg: Any): Unit = send(msg, Actor.sender.reply) /** * Sends msg to this actor and awaits reply * (synchronous). */ - def !?(msg: Any): Any = in !? msg + def !?(msg: Any): Any = { + val replyChannel = Actor.self.freshReply() + this ! msg + replyChannel.receive { + case x => x + } + } - def rpc[a](msg: a): a = { - Debug.info("Actor.!? called by "+Actor.self) - val req = Request(msg) - in ! req - req.in.? + def !?(msec: long, msg: Any): Option[Any] = { + val replyChannel = Actor.self.freshReply() + this ! msg + replyChannel.receiveWithin(msec) { + case TIMEOUT => None + case x => Some(x) + } } - private val lastSenders = new Stack[Actor] + def reply(msg: Any): Unit = session ! msg - private[actors] def sender: Actor = { - if (lastSenders.isEmpty) null - else lastSenders.top + private var rc = new Channel[Any] + def reply = rc + def freshReply() = { rc = new Channel[Any]; rc } + + def ? : Any = receive { + case x => x } - private[actors] def pushSender(s: Actor) = { lastSenders.push(s) } - private[actors] def popSender(): Unit = { lastSenders.pop } + private[actors] def sender: Actor = + if (sessions.empty) null + else sessions.peek.asInstanceOf[Channel[Any]].receiver + + private[actors] def session: Channel[Any] = + if (sessions.empty) null + else sessions.peek.asInstanceOf[Channel[Any]] + private[actors] var continuation: PartialFunction[Any, Unit] = null private[actors] var timeoutPending = false diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index 29293b3286..3c6bf822fb 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -24,12 +24,14 @@ class SuspendActorException extends Throwable { } } +case class ![a](ch: Channel[a], msg: a) + /** * This class provides a means for typed communication among * actors. Only the actor creating an instance of a * Channel may receive from it. * - * @version 0.9.0 + * @version 0.9.2 * @author Philipp Haller */ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { @@ -39,417 +41,70 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { Actor.selfs.get(currentThread).asInstanceOf[Actor] } - private var received: Option[Msg] = None - - private[actors] val waitingForNone = (m: Msg) => false - private[actors] var waitingFor: Msg => boolean = waitingForNone - private[actors] var waitingForSender: Actor = null - - private[actors] var isSuspended = false - - //private val messageQueue = new MessageQueue[Msg] - private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]] - - private def send(msg: Msg, sender: Actor) = receiver.synchronized { - receiver.tick() - if (waitingFor(msg) && ((waitingForSender eq null) || - (waitingForSender == sender))) { - received = Some(msg) - receiver.pushSender(sender) - waitingFor = waitingForNone - waitingForSender = null - - if (receiver.timeoutPending) { - receiver.timeoutPending = false - TimerThread.trashRequest(receiver) - } - - if (isSuspended) - receiver.resumeActor() - else - receiver.scheduleActor(null, msg) - } else { - //messageQueue.append(msg, sender) - mailbox += Pair(msg, sender) - } - } - /** * Sends msg to this Channel. */ - def !(msg: Msg): unit = send(msg, Actor.self) - - def ? : Msg = receive { case any => any } - - def poll: Option[Msg] = { - Some(?) - } orElse { - None.asInstanceOf[Option[Msg]] - } - - /** - * Sends msg to this Channel and - * awaits reply. - */ - def !?(msg: Msg): Any = { - Actor.self.freshReply() - this ! msg - Actor.self.reply.receiveFrom(receiver) { - case x => x - } - } - - def !?(msec: long, msg: Msg): Option[Any] = { - Debug.info("rpc with timeout "+msec) - Actor.self.freshReply() - this ! msg - Actor.self.reply.receiveWithinFrom(msec)(receiver) { - case TIMEOUT => None - case x => Some(x) - } + def !(msg: Msg): unit = { + receiver ! scala.actors.!(this, msg) } /** * Forwards msg to this keeping the * last sender as sender instead of self. */ - def forward(msg: Msg): unit = send(msg, receiver.sender) - - /** - * Receives a message from this Channel. - */ - def receive[R](f: PartialFunction[Msg, R]): R = { - assert(Actor.self == receiver, "receive from channel belonging to other actor") - receiver.synchronized { - receiver.tick() - -/* - val q = messageQueue.extractFirst(waitingFor) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - } - // acquire lock because we might call wait() - else synchronized { - isSuspended = true - receiver.suspendActor() - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) - }) match { - case Some(Pair(msg, sender)) => { - received = Some(msg) - receiver.pushSender(sender) - } - case None => { - // acquire lock because we might call wait() - this.synchronized { - waitingFor = f.isDefinedAt - isSuspended = true - //val wp = Tuple4(f.isDefinedAt, None, None, Some(receiver)) - //waitingParts = wp :: waitingParts - receiver.suspendActor() - } - } - } - - waitingFor = waitingForNone - isSuspended = false - } - val result = f(received.get) - receiver.popSender() - result + def forward(msg: Msg): unit = { + receiver forward scala.actors.!(this, msg) } - private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = { - assert(Actor.self == receiver, "receive from channel belonging to other actor") - receiver.synchronized { - receiver.tick() - -/* - var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => { - waitingFor(item.msg) && item.sender == r - }) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - } - else synchronized { - isSuspended = true - receiver.suspendActor() - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) && p._2 == r - }) match { - case Some(Pair(msg, sender)) => { - received = Some(msg) - receiver.pushSender(sender) - } - case None => { - // acquire lock because we might call wait() - this.synchronized { - waitingFor = f.isDefinedAt - waitingForSender = r - isSuspended = true - receiver.suspendActor() - } - } - } - - waitingFor = waitingForNone - waitingForSender = null - isSuspended = false + def receive[R](f: PartialFunction[Any, R]): R = { + val C = this + receiver.receive { + case C ! msg if (f.isDefinedAt(msg)) => f(msg) } - val result = f(received.get) - receiver.popSender() - result } - /** - * Receives a message from this Channel. If no - * message could be received before msec - * milliseconds elapsed, the TIMEOUT action is - * executed if specified. - */ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { - assert(Actor.self == receiver, "receive from channel belonging to other actor") - receiver.synchronized { - receiver.tick() - -/* - val q = messageQueue.extractFirst(waitingFor) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - } - else synchronized { - isSuspended = true - receiver.suspendActorFor(msec) - if (received eq null) - if (f.isDefinedAt(TIMEOUT)) { - isSuspended = false - val result = f(TIMEOUT) - return result - } - else - error("unhandled timeout") - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) - }) match { - case Some(Pair(msg, sender)) => { - received = Some(msg) - receiver.pushSender(sender) - } - case None => { - // acquire lock because we might call wait() - this.synchronized { - waitingFor = f.isDefinedAt - isSuspended = true - received = None - receiver.suspendActorFor(msec) - Debug.info("received: "+received) - if (received.isEmpty) { - Debug.info("no message received after "+msec+" millis") - if (f.isDefinedAt(TIMEOUT)) { - Debug.info("executing TIMEOUT action") - isSuspended = false - val result = f(TIMEOUT) - return result - } - else - error("unhandled timeout") - } - } - } - } - - waitingFor = waitingForNone - isSuspended = false + val C = this + receiver.receiveWithin(msec) { + case C ! msg if (f.isDefinedAt(msg)) => f(msg) + case TIMEOUT => f(TIMEOUT) } - val result = f(received.get) - receiver.popSender() - result } - def receiveWithinFrom[R](msec: long)(r: Actor)(f: PartialFunction[Any, R]): R = { - assert(Actor.self == receiver, "receive from channel belonging to other actor") - receiver.synchronized { - receiver.tick() - -/* - val q = messageQueue.extractFirst(waitingFor) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - } - else synchronized { - waitingFor = f.isDefinedAt - waitingForSender = r - isSuspended = true - receiver.suspendActorFor(msec) - if (received eq null) - if (f.isDefinedAt(TIMEOUT)) { - isSuspended = false - val result = f(TIMEOUT) - return result - } - else - error("unhandled timeout") - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) && p._2 == r - }) match { - case Some(Pair(msg, sender)) => { - received = Some(msg) - receiver.pushSender(sender) - } - case None => { - // acquire lock because we might call wait() - this.synchronized { - waitingFor = f.isDefinedAt - waitingForSender = r - isSuspended = true - received = None - receiver.suspendActorFor(msec) - Debug.info("received: "+received) - if (received.isEmpty) { - Debug.info("no message received after "+msec+" millis") - if (f.isDefinedAt(TIMEOUT)) { - Debug.info("executing TIMEOUT action") - isSuspended = false - val result = f(TIMEOUT) - return result - } - else - error("unhandled timeout") - } - } - } - } - - waitingFor = waitingForNone - waitingForSender = null - isSuspended = false + def react(f: PartialFunction[Any, Unit]): Nothing = { + val C = this + receiver.react { + case C ! msg if (f.isDefinedAt(msg)) => f(msg) } - val result = f(received.get) - receiver.popSender() - result } - /** - * receive for reactors. - */ - def react(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self == receiver, "react on channel belonging to other actor") - Scheduler.pendReaction - receiver.synchronized { - receiver.tick() - -/* - val q = messageQueue.extractFirst(waitingFor) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - waitingFor = waitingForNone - receiver.scheduleActor(f, received) - } - else synchronized { - receiver.detachActor(f) - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) - }) match { - case Some(Pair(msg, sender)) => { - receiver.pushSender(sender) - receiver.scheduleActor(f, msg) - } - case None => { - this.synchronized { - //Scheduler.detached(receiver) - waitingFor = f.isDefinedAt - //val wp = Tuple4(f.isDefinedAt, None, Some(f), None) - //waitingParts = wp :: waitingParts - receiver.detachActor(f) - } - } - } - - throw new SuspendActorException + def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { + val C = this + receiver.reactWithin(msec) { + case C ! msg if (f.isDefinedAt(msg)) => f(msg) + case TIMEOUT => f(TIMEOUT) } } /** - * receiveWithin for reactors. + * Sends msg to this Channel and + * awaits reply. */ - def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.self == receiver, "react on channel belonging to other actor") - Scheduler.pendReaction - receiver.synchronized { - receiver.tick() - -/* - val q = messageQueue.extractFirst(waitingFor) - if (q != null) { - received = q.msg - receiver.pushSender(q.sender) - waitingFor = waitingForNone - receiver.scheduleActor(f, received) - } - else synchronized { - waitingFor = f.isDefinedAt - TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec) - receiver.asInstanceOf[Reactor].timeoutPending = true - receiver.detachActor(f) - } -*/ - - mailbox.dequeueFirst((p: Pair[Msg, Actor]) => { - waitingFor(p._1) - }) match { - case Some(Pair(msg, sender)) => { - received = Some(msg) - receiver.pushSender(sender) - waitingFor = waitingForNone - receiver.scheduleActor(f, received.get) - } - case None => { - this.synchronized { - waitingFor = f.isDefinedAt - TimerThread.requestTimeout(receiver, f, msec) - receiver.timeoutPending = true - receiver.detachActor(f) - } - } - } - - throw new SuspendActorException + def !?(msg: Msg): Any = { + val replyChannel = Actor.self.freshReply() + receiver ! scala.actors.!(this, msg) + replyChannel.receive { + case x => x } } - /* - * Prints contents of mailbox to standard out. - * This is used for printing actor dumps. - */ - private[actors] def printMailbox = { - Console.print("[") - val msgs = mailbox.elements - if (msgs.hasNext) - Console.print(msgs.next._1.toString()) - while (msgs.hasNext) { - Console.print(", "+msgs.next._1.toString()) + def !?(msec: long, msg: Msg): Option[Any] = { + val replyChannel = Actor.self.freshReply() + receiver ! scala.actors.!(this, msg) + replyChannel.receiveWithin(msec) { + case TIMEOUT => None + case x => Some(x) } - Console.println("]") } } diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala index 6ddde7e911..c22671fc93 100644 --- a/src/actors/scala/actors/InputChannel.scala +++ b/src/actors/scala/actors/InputChannel.scala @@ -16,7 +16,7 @@ package scala.actors * @author Philipp Haller */ trait InputChannel[Msg] { - def receive[R](f: PartialFunction[Msg, R]): R + def receive[R](f: PartialFunction[Any, R]): R def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R def react(f: PartialFunction[Any, Unit]): Nothing def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index a4b5eb3368..1e78798337 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -132,7 +132,7 @@ class TickedScheduler extends Thread with IScheduler { if (a.isDetached || a.isWaiting) { // dump contents of mailbox Console.println("Waiting with mailbox:") - a.in.printMailbox + //a.printMailbox } Console.println -- cgit v1.2.3