diff options
author | Philipp Haller <hallerp@gmail.com> | 2009-10-01 15:58:38 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2009-10-01 15:58:38 +0000 |
commit | 353c843392167b35f4920eafc5acc3e10a4711ff (patch) | |
tree | 73bd876c06ffb2a418d23a1ead87e1f61a4d028b /src/actors | |
parent | f30c0b0dba3889de9a9603761aec8a60a871fc85 (diff) | |
download | scala-353c843392167b35f4920eafc5acc3e10a4711ff.tar.gz scala-353c843392167b35f4920eafc5acc3e10a4711ff.tar.bz2 scala-353c843392167b35f4920eafc5acc3e10a4711ff.zip |
Actors waiting in receive search for messages o...
Actors waiting in receive search for messages on their underlying
thread. Simplified receiveWithin.
Diffstat (limited to 'src/actors')
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 104 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 15 |
2 files changed, 45 insertions, 74 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 057436a514..476bf85411 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -399,40 +399,26 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { */ private var onTimeout: Option[TimerTask] = None - /* Used for notifying scheduler when blocking inside <code>receive</code>. */ - private lazy val blocker = new ActorBlocker(0) - - private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable { - def call() = fun() - def run() = fun() - } + private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) = + if (isSuspended) { + () => synchronized { + mailbox.append(msg, replyTo) + resumeActor() + } + } else super.startSearch(msg, replyTo, handler) - private[actors] override def makeReaction(fun: () => Unit): Runnable = { - if (isSuspended) - new RunCallable(fun) - else - new ActorTask(this, fun) - } + private[actors] override def makeReaction(fun: () => Unit): Runnable = + new ActorTask(this, fun) private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) { - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - if (isSuspended) { - synchronized { - received = Some(item._1) - senders = item._2 :: senders - resumeActor() + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None } - } else { - senders = List(item._2) - // assert continuation != null - if (onSameThread) - continuation(item._1) - else - scheduleActor(continuation, item._1) } + senders = List(item._2) + super.resumeReceiver(item, onSameThread) } /** @@ -462,7 +448,8 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { waitingFor = f.isDefinedAt isSuspended = true scheduler.managedBlock(blocker) - done = true + drainSendBuffer(mailbox) + // keep going } } } else { @@ -522,19 +509,25 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { waitingFor = f.isDefinedAt received = None isSuspended = true - scheduler.managedBlock(new ActorBlocker(msec)) - done = true - if (received.isEmpty) { - // actor is not resumed because of new message - // therefore, waitingFor has not been updated, yet. - waitingFor = waitingForNone - receiveTimeout - } else - () => {} + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor.send(TIMEOUT, thisActor) } + }) + Actor.timer.schedule(onTimeout.get, msec) + scheduler.managedBlock(blocker) + drainSendBuffer(mailbox) + // keep going + () => {} } } todo() } else { + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + } received = Some(qel.msg) senders = qel.session :: senders done = true @@ -650,19 +643,17 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { scheduler executeFromActor task } - private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker { + /* Used for notifying scheduler when blocking inside receive/receiveWithin. */ + private object blocker extends scala.concurrent.ManagedBlocker { def block() = { - if (timeout > 0) - Actor.this.suspendActorFor(timeout) - else - Actor.this.suspendActor() + Actor.this.suspendActor() true } def isReleasable = !Actor.this.isSuspended } - private def suspendActor() { + private def suspendActor() = synchronized { while (isSuspended) { try { wait() @@ -674,29 +665,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor { if (shouldExit) exit() } - private def suspendActorFor(msec: Long) { - val ts = Platform.currentTime - var waittime = msec - var fromExc = false - while (isSuspended) { - try { - fromExc = false - wait(waittime) - } catch { - case _: InterruptedException => { - fromExc = true - val now = Platform.currentTime - val waited = now-ts - waittime = msec-waited - if (waittime < 0) { isSuspended = false } - } - } - if (!fromExc) { isSuspended = false } - } - // links: check if we should exit - if (shouldExit) exit() - } - private def resumeActor() { isSuspended = false notify() diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index a3e44d4d2c..07a59a8c28 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -22,7 +22,7 @@ trait Reactor extends OutputChannel[Any] { /* The actor's mailbox. */ private[actors] val mailbox = new MessageQueue("Reactor") - private[actors] var sendBuffer = new Queue[(Any, OutputChannel[Any])] + private[actors] val sendBuffer = new Queue[(Any, OutputChannel[Any])] /* If the actor waits in a react, continuation holds the * message handler that react was called with. @@ -69,11 +69,7 @@ trait Reactor extends OutputChannel[Any] { if (waitingFor ne waitingForNone) { val savedWaitingFor = waitingFor waitingFor = waitingForNone - () => scheduler execute (makeReaction(() => { - val startMbox = new MessageQueue("Start") - synchronized { startMbox.append(msg, replyTo) } - searchMailbox(startMbox, savedWaitingFor, true) - })) + startSearch(msg, replyTo, savedWaitingFor) } else { sendBuffer.enqueue((msg, replyTo)) () => { /* do nothing */ } @@ -82,6 +78,13 @@ trait Reactor extends OutputChannel[Any] { todo() } + private[actors] def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) = + () => scheduler execute (makeReaction(() => { + val startMbox = new MessageQueue("Start") + synchronized { startMbox.append(msg, replyTo) } + searchMailbox(startMbox, handler, true) + })) + private[actors] def makeReaction(fun: () => Unit): Runnable = new ReactorTask(this, fun) |