From b35a79a93cce5de8872e278586e74dedc53a04a7 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Fri, 29 May 2009 14:30:44 +0000 Subject: Implemented #2009. --- src/actors/scala/actors/Actor.scala | 195 +++++++++++++---------- src/actors/scala/actors/OutputChannelActor.scala | 101 +++++++++--- 2 files changed, 186 insertions(+), 110 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 1b0d258d1b..55cf976c14 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -394,46 +394,21 @@ trait Actor extends OutputChannelActor with AbstractActor { */ private var onTimeout: Option[TimerTask] = None - /** - * Sends msg to this actor (asynchronous) supplying - * explicit reply destination. - * - * @param msg the message to send - * @param replyTo the reply destination - */ - override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - if (waitingFor ne waitingForNone) { - val savedWaitingFor = waitingFor - waitingFor = waitingForNone - scheduler execute { - if (synchronized { savedWaitingFor(msg) }) { - synchronized { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - } - - if (isSuspended) { - synchronized { - senders = replyTo :: senders - received = Some(msg) - resumeActor() - } - } else { - synchronized { - senders = List(replyTo) - } - // assert continuation != null - (new Reaction(this, continuation, msg)).run() - } - } else synchronized { - waitingFor = savedWaitingFor - mailbox.append(msg, replyTo) - } + protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any])) { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + if (isSuspended) { + synchronized { + senders = item._2 :: senders + received = Some(item._1) + resumeActor() } } else { - mailbox.append(msg, replyTo) + senders = List(item._2) + // assert continuation != null + (new Reaction(this, continuation, item._1)).run() } } @@ -445,17 +420,34 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") + synchronized { if (shouldExit) exit() // links + drainSendBuffer() + } + + var done = false + while (!done) { val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { - waitingFor = f.isDefinedAt - suspendActor() + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer() + // keep going + } else { + waitingFor = f.isDefinedAt + suspendActor() + done = true + } + } } else { received = Some(qel.msg) senders = qel.session :: senders + done = true } } + val result = f(received.get) received = None senders = senders.tail @@ -472,41 +464,57 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") + synchronized { if (shouldExit) exit() // links + drainSendBuffer() + } - // first, remove spurious TIMEOUT message from mailbox if any - val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) + // first, remove spurious TIMEOUT message from mailbox if any + mailbox.extractFirst((m: Any) => m == TIMEOUT) + val receiveTimeout = () => { + if (f.isDefinedAt(TIMEOUT)) { + received = Some(TIMEOUT) + senders = this :: senders + } else + error("unhandled timeout") + } + + var done = false + while (!done) { val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { - if (msec == 0) { - if (f.isDefinedAt(TIMEOUT)) { - received = Some(TIMEOUT) - senders = this :: senders - } else - error("unhandled timeout") - } else { - waitingFor = f.isDefinedAt - received = None - suspendActorFor(msec) - if (received.isEmpty) { - // actor is not resumed because of new message - // therefore, waitingFor has not been updated, yet. - waitingFor = waitingForNone - if (f.isDefinedAt(TIMEOUT)) { - received = Some(TIMEOUT) - senders = this :: senders + val todo = synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer() + // keep going + () => {} + } else if (msec == 0) { + done = true + receiveTimeout + } else { + waitingFor = f.isDefinedAt + received = None + suspendActorFor(msec) + if (received.isEmpty) { + // actor is not resumed because of new message + // therefore, waitingFor has not been updated, yet. + waitingFor = waitingForNone } - else - error("unhandled timeout") + done = true + receiveTimeout } } + todo() } else { received = Some(qel.msg) senders = qel.session :: senders + done = true } } + val result = f(received.get) received = None senders = senders.tail @@ -525,15 +533,9 @@ trait Actor extends OutputChannelActor with AbstractActor { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") synchronized { if (shouldExit) exit() // links - val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) - if (null eq qel) { - waitingFor = f.isDefinedAt - continuation = f - } else { - senders = List(qel.session) - scheduleActor(f, qel.msg) - } + drainSendBuffer() } + searchMailbox(f) throw Actor.suspendException } @@ -549,35 +551,56 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") + synchronized { if (shouldExit) exit() // links + drainSendBuffer() + } - // first, remove spurious TIMEOUT message from mailbox if any - val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) + // first, remove spurious TIMEOUT message from mailbox if any + mailbox.extractFirst((m: Any) => m == TIMEOUT) + val receiveTimeout = () => { + if (f.isDefinedAt(TIMEOUT)) { + senders = List(this) + scheduleActor(f, TIMEOUT) + } else + error("unhandled timeout") + } + + var done = false + while (!done) { val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { - if (msec == 0) { - if (f.isDefinedAt(TIMEOUT)) { - senders = List(this) - scheduleActor(f, TIMEOUT) + val todo = synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer() + // keep going + () => {} + } else if (msec == 0) { + done = true + receiveTimeout + } else { + waitingFor = f.isDefinedAt + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor.send(TIMEOUT, thisActor) } + }) + Actor.timer.schedule(onTimeout.get, msec) + continuation = f + done = true + () => {} } - else - error("unhandled timeout") - } else { - waitingFor = f.isDefinedAt - val thisActor = this - onTimeout = Some(new TimerTask { - def run() { thisActor.send(TIMEOUT, thisActor) } - }) - Actor.timer.schedule(onTimeout.get, msec) - continuation = f } + todo() } else { senders = List(qel.session) scheduleActor(f, qel.msg) + done = true } } + throw Actor.suspendException } @@ -791,7 +814,7 @@ trait Actor extends OutputChannelActor with AbstractActor { } // guarded by lock of this - private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = + protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = if ((f eq null) && (continuation eq null)) { // do nothing (timeout is handled instead) } diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala index 2b2a4397a1..28f2948680 100644 --- a/src/actors/scala/actors/OutputChannelActor.scala +++ b/src/actors/scala/actors/OutputChannelActor.scala @@ -10,6 +10,8 @@ package scala.actors +import scala.collection.mutable.Queue + trait OutputChannelActor extends OutputChannel[Any] { @volatile @@ -18,6 +20,8 @@ trait OutputChannelActor extends OutputChannel[Any] { /* The actor's mailbox. */ protected val mailbox = new MessageQueue + protected var sendBuffer = new Queue[(Any, OutputChannel[Any])] + /* A list of the current senders. The head of the list is * the sender of the message that was received last. */ @@ -55,25 +59,52 @@ trait OutputChannelActor extends OutputChannel[Any] { protected[actors] def mailboxSize: Int = mailbox.size - def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - if (waitingFor ne waitingForNone) { - val savedWaitingFor = waitingFor - waitingFor = waitingForNone - scheduler execute { - if (synchronized { savedWaitingFor(msg) }) { - synchronized { - if (!ignoreSender) - senders = List(replyTo) + /** + * Sends msg to this actor (asynchronous) supplying + * explicit reply destination. + * + * @param msg the message to send + * @param replyTo the reply destination + */ + def send(msg: Any, replyTo: OutputChannel[Any]) { + val todo = synchronized { + if (waitingFor ne waitingForNone) { + val savedWaitingFor = waitingFor + waitingFor = waitingForNone + () => scheduler execute { + var item: Option[(Any, OutputChannel[Any])] = + synchronized { Some(msg, replyTo) } + while (!item.isEmpty) { + if (savedWaitingFor(item.get._1)) { + resumeReceiver(item.get) + item = None + } else { + mailbox.append(item.get._1, item.get._2) + + item = synchronized { + if (!sendBuffer.isEmpty) + Some(sendBuffer.dequeue()) + else { + waitingFor = savedWaitingFor + None + } + } + } } - // assert continuation != null - (new LightReaction(this, continuation, msg)).run() - } else synchronized { - waitingFor = savedWaitingFor - mailbox.append(msg, replyTo) } + } else { + sendBuffer.enqueue((msg, replyTo)) + () => { /* do nothing */ } } - } else - mailbox.append(msg, replyTo) + } + todo() + } + + protected[this] def resumeReceiver(item: (Any, OutputChannel[Any])) { + if (!ignoreSender) + senders = List(item._2) + // assert continuation != null + (new LightReaction(this, continuation, item._1)).run() } def !(msg: Any) { @@ -86,19 +117,41 @@ trait OutputChannelActor extends OutputChannel[Any] { def receiver: Actor = this.asInstanceOf[Actor] - protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = { - assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - synchronized { + protected[this] def drainSendBuffer() { + while (!sendBuffer.isEmpty) { + val item = sendBuffer.dequeue() + mailbox.append(item._1, item._2) + } + } + + protected[this] def searchMailbox(f: PartialFunction[Any, Unit]) { + var done = false + while (!done) { val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { - waitingFor = f.isDefinedAt - continuation = f + synchronized { + // in mean time new stuff might have arrived + if (!sendBuffer.isEmpty) { + drainSendBuffer() + // keep going + } else { + waitingFor = f.isDefinedAt + continuation = f + done = true + } + } } else { - if (!ignoreSender) - senders = List(qel.session) + senders = List(qel.session) scheduleActor(f, qel.msg) + done = true } } + } + + protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") + synchronized { drainSendBuffer() } + searchMailbox(f) throw Actor.suspendException } @@ -111,7 +164,7 @@ trait OutputChannelActor extends OutputChannel[Any] { sender ! msg } - private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { + protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { scheduler execute (new LightReaction(this, if (f eq null) continuation else f, msg)) -- cgit v1.2.3