summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-28 14:55:58 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-28 14:55:58 +0000
commit8434c271e5fb1f6130aaf87a209aa84605dd7919 (patch)
treec0f16d9b256cd38b1e85b76ffee002a299b32d21 /src/actors
parentf34e908054b82c70f1912e67ace2140c8e9af50c (diff)
downloadscala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.gz
scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.bz2
scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.zip
Fixed #2010 by scheduling waitingFor check.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala69
-rw-r--r--src/actors/scala/actors/OutputChannelActor.scala55
2 files changed, 71 insertions, 53 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index ddeeca8c9f..1b0d258d1b 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -10,11 +10,8 @@
package scala.actors
-import scala.collection.mutable.{HashSet, Queue}
import scala.compat.Platform
-
import java.util.{Timer, TimerTask}
-
import java.util.concurrent.ExecutionException
/**
@@ -32,6 +29,8 @@ object Actor {
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
+ private[actors] val suspendException = new SuspendActorException
+
/**
* Returns the currently executing actor. Should be used instead
* of <code>this</code> in all blocks of code executed by
@@ -380,6 +379,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
* suspends by blocking its underlying thread, for example,
* when waiting in a receive or synchronous send.
*/
+ @volatile
private var isSuspended = false
/* This field is used to communicate the received message from
@@ -402,22 +402,35 @@ trait Actor extends OutputChannelActor with AbstractActor {
* @param replyTo the reply destination
*/
override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- if (waitingFor(msg)) {
+ if (waitingFor ne waitingForNone) {
+ val savedWaitingFor = waitingFor
waitingFor = waitingForNone
+ scheduler execute {
+ if (synchronized { savedWaitingFor(msg) }) {
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ }
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
-
- if (isSuspended) {
- senders = replyTo :: senders
- received = Some(msg)
- resumeActor()
- } else {
- senders = List(replyTo)
- // assert continuation != null
- scheduler.execute(new Reaction(this, continuation, msg))
+ if (isSuspended) {
+ synchronized {
+ senders = replyTo :: senders
+ received = Some(msg)
+ resumeActor()
+ }
+ } else {
+ synchronized {
+ senders = List(replyTo)
+ }
+ // assert continuation != null
+ (new Reaction(this, continuation, msg)).run()
+ }
+ } else synchronized {
+ waitingFor = savedWaitingFor
+ mailbox.append(msg, replyTo)
+ }
}
} else {
mailbox.append(msg, replyTo)
@@ -432,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
@@ -459,7 +472,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
// first, remove spurious TIMEOUT message from mailbox if any
@@ -473,8 +486,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = this :: senders
} else
error("unhandled timeout")
- }
- else {
+ } else {
waitingFor = f.isDefinedAt
received = None
suspendActorFor(msec)
@@ -511,7 +523,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
override def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
@@ -521,8 +533,8 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = List(qel.session)
scheduleActor(f, qel.msg)
}
- throw new SuspendActorException
}
+ throw Actor.suspendException
}
/**
@@ -537,7 +549,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
// first, remove spurious TIMEOUT message from mailbox if any
@@ -552,8 +564,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
}
else
error("unhandled timeout")
- }
- else {
+ } else {
waitingFor = f.isDefinedAt
val thisActor = this
onTimeout = Some(new TimerTask {
@@ -566,8 +577,8 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = List(qel.session)
scheduleActor(f, qel.msg)
}
- throw new SuspendActorException
}
+ throw Actor.suspendException
}
/**
@@ -938,7 +949,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
if (!links.isEmpty)
exitLinked()
terminated()
- throw new SuspendActorException
+ throw Actor.suspendException
}
// Assume !links.isEmpty
@@ -966,7 +977,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
this ! Exit(from, reason)
}
else if (reason != 'normal)
- this.synchronized {
+ synchronized {
shouldExit = true
exitReason = reason
// resume this Actor in a way that
diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala
index dc90898ccd..2b2a4397a1 100644
--- a/src/actors/scala/actors/OutputChannelActor.scala
+++ b/src/actors/scala/actors/OutputChannelActor.scala
@@ -12,6 +12,7 @@ package scala.actors
trait OutputChannelActor extends OutputChannel[Any] {
+ @volatile
protected var ignoreSender: Boolean = false
/* The actor's mailbox. */
@@ -45,27 +46,34 @@ trait OutputChannelActor extends OutputChannel[Any] {
*/
def act(): Unit
- protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = Map()
+ protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] =
+ Map()
protected[actors] def scheduler: IScheduler =
Scheduler
- def mailboxSize: Int = synchronized {
+ protected[actors] def mailboxSize: Int =
mailbox.size
- }
def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- if (waitingFor(msg)) {
+ if (waitingFor ne waitingForNone) {
+ val savedWaitingFor = waitingFor
waitingFor = waitingForNone
-
- if (!ignoreSender)
- senders = List(replyTo)
-
- // assert continuation != null
- scheduler.execute(new LightReaction(this, continuation, msg))
- } else {
+ scheduler execute {
+ if (synchronized { savedWaitingFor(msg) }) {
+ synchronized {
+ if (!ignoreSender)
+ senders = List(replyTo)
+ }
+ // assert continuation != null
+ (new LightReaction(this, continuation, msg)).run()
+ } else synchronized {
+ waitingFor = savedWaitingFor
+ mailbox.append(msg, replyTo)
+ }
+ }
+ } else
mailbox.append(msg, replyTo)
- }
}
def !(msg: Any) {
@@ -78,9 +86,9 @@ trait OutputChannelActor extends OutputChannel[Any] {
def receiver: Actor = this.asInstanceOf[Actor]
- def react(f: PartialFunction[Any, Unit]): Nothing = {
+ protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
- this.synchronized {
+ synchronized {
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@@ -90,27 +98,26 @@ trait OutputChannelActor extends OutputChannel[Any] {
senders = List(qel.session)
scheduleActor(f, qel.msg)
}
- throw new SuspendActorException
}
+ throw Actor.suspendException
}
- def sender: OutputChannel[Any] = senders.head
+ protected[actors] def sender: OutputChannel[Any] = senders.head
/**
* Replies with <code>msg</code> to the sender.
*/
- def reply(msg: Any) {
+ protected[actors] def reply(msg: Any) {
sender ! msg
}
private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
- val task = new LightReaction(this,
- if (f eq null) continuation else f,
- msg)
- scheduler execute task
+ scheduler execute (new LightReaction(this,
+ if (f eq null) continuation else f,
+ msg))
}
- def start(): OutputChannelActor = synchronized {
+ def start(): OutputChannelActor = {
scheduler execute {
scheduler.newActor(OutputChannelActor.this)
(new LightReaction(OutputChannelActor.this)).run()
@@ -135,7 +142,7 @@ trait OutputChannelActor extends OutputChannel[Any] {
// instead of directly executing `next`,
// schedule as continuation
scheduleActor({ case _ => next }, 1)
- throw new SuspendActorException
+ throw Actor.suspendException
}
first
throw new KillActorException
@@ -143,7 +150,7 @@ trait OutputChannelActor extends OutputChannel[Any] {
protected[actors] def exit(): Nothing = {
terminated()
- throw new SuspendActorException
+ throw Actor.suspendException
}
protected[actors] def terminated() {