summaryrefslogtreecommitdiff
path: root/src/actors/scala/actors/Actor.scala
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-28 14:55:58 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-28 14:55:58 +0000
commit8434c271e5fb1f6130aaf87a209aa84605dd7919 (patch)
treec0f16d9b256cd38b1e85b76ffee002a299b32d21 /src/actors/scala/actors/Actor.scala
parentf34e908054b82c70f1912e67ace2140c8e9af50c (diff)
downloadscala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.gz
scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.tar.bz2
scala-8434c271e5fb1f6130aaf87a209aa84605dd7919.zip
Fixed #2010 by scheduling waitingFor check.
Diffstat (limited to 'src/actors/scala/actors/Actor.scala')
-rw-r--r--src/actors/scala/actors/Actor.scala69
1 files changed, 40 insertions, 29 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index ddeeca8c9f..1b0d258d1b 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -10,11 +10,8 @@
package scala.actors
-import scala.collection.mutable.{HashSet, Queue}
import scala.compat.Platform
-
import java.util.{Timer, TimerTask}
-
import java.util.concurrent.ExecutionException
/**
@@ -32,6 +29,8 @@ object Actor {
// timer thread runs as daemon
private[actors] val timer = new Timer(true)
+ private[actors] val suspendException = new SuspendActorException
+
/**
* Returns the currently executing actor. Should be used instead
* of <code>this</code> in all blocks of code executed by
@@ -380,6 +379,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
* suspends by blocking its underlying thread, for example,
* when waiting in a receive or synchronous send.
*/
+ @volatile
private var isSuspended = false
/* This field is used to communicate the received message from
@@ -402,22 +402,35 @@ trait Actor extends OutputChannelActor with AbstractActor {
* @param replyTo the reply destination
*/
override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- if (waitingFor(msg)) {
+ if (waitingFor ne waitingForNone) {
+ val savedWaitingFor = waitingFor
waitingFor = waitingForNone
+ scheduler execute {
+ if (synchronized { savedWaitingFor(msg) }) {
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ }
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
-
- if (isSuspended) {
- senders = replyTo :: senders
- received = Some(msg)
- resumeActor()
- } else {
- senders = List(replyTo)
- // assert continuation != null
- scheduler.execute(new Reaction(this, continuation, msg))
+ if (isSuspended) {
+ synchronized {
+ senders = replyTo :: senders
+ received = Some(msg)
+ resumeActor()
+ }
+ } else {
+ synchronized {
+ senders = List(replyTo)
+ }
+ // assert continuation != null
+ (new Reaction(this, continuation, msg)).run()
+ }
+ } else synchronized {
+ waitingFor = savedWaitingFor
+ mailbox.append(msg, replyTo)
+ }
}
} else {
mailbox.append(msg, replyTo)
@@ -432,7 +445,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
@@ -459,7 +472,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
// first, remove spurious TIMEOUT message from mailbox if any
@@ -473,8 +486,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = this :: senders
} else
error("unhandled timeout")
- }
- else {
+ } else {
waitingFor = f.isDefinedAt
received = None
suspendActorFor(msec)
@@ -511,7 +523,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
override def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
@@ -521,8 +533,8 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = List(qel.session)
scheduleActor(f, qel.msg)
}
- throw new SuspendActorException
}
+ throw Actor.suspendException
}
/**
@@ -537,7 +549,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
- this.synchronized {
+ synchronized {
if (shouldExit) exit() // links
// first, remove spurious TIMEOUT message from mailbox if any
@@ -552,8 +564,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
}
else
error("unhandled timeout")
- }
- else {
+ } else {
waitingFor = f.isDefinedAt
val thisActor = this
onTimeout = Some(new TimerTask {
@@ -566,8 +577,8 @@ trait Actor extends OutputChannelActor with AbstractActor {
senders = List(qel.session)
scheduleActor(f, qel.msg)
}
- throw new SuspendActorException
}
+ throw Actor.suspendException
}
/**
@@ -938,7 +949,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
if (!links.isEmpty)
exitLinked()
terminated()
- throw new SuspendActorException
+ throw Actor.suspendException
}
// Assume !links.isEmpty
@@ -966,7 +977,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
this ! Exit(from, reason)
}
else if (reason != 'normal)
- this.synchronized {
+ synchronized {
shouldExit = true
exitReason = reason
// resume this Actor in a way that