From 8434c271e5fb1f6130aaf87a209aa84605dd7919 Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 28 May 2009 14:55:58 +0000 Subject: Fixed #2010 by scheduling waitingFor check. --- src/actors/scala/actors/Actor.scala | 69 ++++++++++++++---------- src/actors/scala/actors/OutputChannelActor.scala | 55 ++++++++++--------- 2 files changed, 71 insertions(+), 53 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index ddeeca8c9f..1b0d258d1b 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -10,11 +10,8 @@ package scala.actors -import scala.collection.mutable.{HashSet, Queue} import scala.compat.Platform - import java.util.{Timer, TimerTask} - import java.util.concurrent.ExecutionException /** @@ -32,6 +29,8 @@ object Actor { // timer thread runs as daemon private[actors] val timer = new Timer(true) + private[actors] val suspendException = new SuspendActorException + /** * Returns the currently executing actor. Should be used instead * of this in all blocks of code executed by @@ -380,6 +379,7 @@ trait Actor extends OutputChannelActor with AbstractActor { * suspends by blocking its underlying thread, for example, * when waiting in a receive or synchronous send. */ + @volatile private var isSuspended = false /* This field is used to communicate the received message from @@ -402,22 +402,35 @@ trait Actor extends OutputChannelActor with AbstractActor { * @param replyTo the reply destination */ override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - if (waitingFor(msg)) { + if (waitingFor ne waitingForNone) { + val savedWaitingFor = waitingFor waitingFor = waitingForNone + scheduler execute { + if (synchronized { savedWaitingFor(msg) }) { + synchronized { + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + } - if (!onTimeout.isEmpty) { - onTimeout.get.cancel() - onTimeout = None - } - - if (isSuspended) { - senders = replyTo :: senders - received = Some(msg) - resumeActor() - } else { - senders = List(replyTo) - // assert continuation != null - scheduler.execute(new Reaction(this, continuation, msg)) + if (isSuspended) { + synchronized { + senders = replyTo :: senders + received = Some(msg) + resumeActor() + } + } else { + synchronized { + senders = List(replyTo) + } + // assert continuation != null + (new Reaction(this, continuation, msg)).run() + } + } else synchronized { + waitingFor = savedWaitingFor + mailbox.append(msg, replyTo) + } } } else { mailbox.append(msg, replyTo) @@ -432,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receive[R](f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { @@ -459,7 +472,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links // first, remove spurious TIMEOUT message from mailbox if any @@ -473,8 +486,7 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = this :: senders } else error("unhandled timeout") - } - else { + } else { waitingFor = f.isDefinedAt received = None suspendActorFor(msec) @@ -511,7 +523,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ override def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { @@ -521,8 +533,8 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = List(qel.session) scheduleActor(f, qel.msg) } - throw new SuspendActorException } + throw Actor.suspendException } /** @@ -537,7 +549,7 @@ trait Actor extends OutputChannelActor with AbstractActor { */ def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self(scheduler) == this, "react on channel belonging to other actor") - this.synchronized { + synchronized { if (shouldExit) exit() // links // first, remove spurious TIMEOUT message from mailbox if any @@ -552,8 +564,7 @@ trait Actor extends OutputChannelActor with AbstractActor { } else error("unhandled timeout") - } - else { + } else { waitingFor = f.isDefinedAt val thisActor = this onTimeout = Some(new TimerTask { @@ -566,8 +577,8 @@ trait Actor extends OutputChannelActor with AbstractActor { senders = List(qel.session) scheduleActor(f, qel.msg) } - throw new SuspendActorException } + throw Actor.suspendException } /** @@ -938,7 +949,7 @@ trait Actor extends OutputChannelActor with AbstractActor { if (!links.isEmpty) exitLinked() terminated() - throw new SuspendActorException + throw Actor.suspendException } // Assume !links.isEmpty @@ -966,7 +977,7 @@ trait Actor extends OutputChannelActor with AbstractActor { this ! Exit(from, reason) } else if (reason != 'normal) - this.synchronized { + synchronized { shouldExit = true exitReason = reason // resume this Actor in a way that diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala index dc90898ccd..2b2a4397a1 100644 --- a/src/actors/scala/actors/OutputChannelActor.scala +++ b/src/actors/scala/actors/OutputChannelActor.scala @@ -12,6 +12,7 @@ package scala.actors trait OutputChannelActor extends OutputChannel[Any] { + @volatile protected var ignoreSender: Boolean = false /* The actor's mailbox. */ @@ -45,27 +46,34 @@ trait OutputChannelActor extends OutputChannel[Any] { */ def act(): Unit - protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map() + protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = + Map() protected[actors] def scheduler: IScheduler = Scheduler - def mailboxSize: Int = synchronized { + protected[actors] def mailboxSize: Int = mailbox.size - } def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { - if (waitingFor(msg)) { + if (waitingFor ne waitingForNone) { + val savedWaitingFor = waitingFor waitingFor = waitingForNone - - if (!ignoreSender) - senders = List(replyTo) - - // assert continuation != null - scheduler.execute(new LightReaction(this, continuation, msg)) - } else { + scheduler execute { + if (synchronized { savedWaitingFor(msg) }) { + synchronized { + if (!ignoreSender) + senders = List(replyTo) + } + // assert continuation != null + (new LightReaction(this, continuation, msg)).run() + } else synchronized { + waitingFor = savedWaitingFor + mailbox.append(msg, replyTo) + } + } + } else mailbox.append(msg, replyTo) - } } def !(msg: Any) { @@ -78,9 +86,9 @@ trait OutputChannelActor extends OutputChannel[Any] { def receiver: Actor = this.asInstanceOf[Actor] - def react(f: PartialFunction[Any, Unit]): Nothing = { + protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor") - this.synchronized { + synchronized { val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) if (null eq qel) { waitingFor = f.isDefinedAt @@ -90,27 +98,26 @@ trait OutputChannelActor extends OutputChannel[Any] { senders = List(qel.session) scheduleActor(f, qel.msg) } - throw new SuspendActorException } + throw Actor.suspendException } - def sender: OutputChannel[Any] = senders.head + protected[actors] def sender: OutputChannel[Any] = senders.head /** * Replies with msg to the sender. */ - def reply(msg: Any) { + protected[actors] def reply(msg: Any) { sender ! msg } private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { - val task = new LightReaction(this, - if (f eq null) continuation else f, - msg) - scheduler execute task + scheduler execute (new LightReaction(this, + if (f eq null) continuation else f, + msg)) } - def start(): OutputChannelActor = synchronized { + def start(): OutputChannelActor = { scheduler execute { scheduler.newActor(OutputChannelActor.this) (new LightReaction(OutputChannelActor.this)).run() @@ -135,7 +142,7 @@ trait OutputChannelActor extends OutputChannel[Any] { // instead of directly executing `next`, // schedule as continuation scheduleActor({ case _ => next }, 1) - throw new SuspendActorException + throw Actor.suspendException } first throw new KillActorException @@ -143,7 +150,7 @@ trait OutputChannelActor extends OutputChannel[Any] { protected[actors] def exit(): Nothing = { terminated() - throw new SuspendActorException + throw Actor.suspendException } protected[actors] def terminated() { -- cgit v1.2.3