summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-05-29 14:30:44 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-05-29 14:30:44 +0000
commitb35a79a93cce5de8872e278586e74dedc53a04a7 (patch)
treeca47f63c17440623f388fe3cbdf4130a3e625200 /src
parent02827fb0819f36d5cebd52ff2c1f5688f7fc2575 (diff)
downloadscala-b35a79a93cce5de8872e278586e74dedc53a04a7.tar.gz
scala-b35a79a93cce5de8872e278586e74dedc53a04a7.tar.bz2
scala-b35a79a93cce5de8872e278586e74dedc53a04a7.zip
Implemented #2009.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala195
-rw-r--r--src/actors/scala/actors/OutputChannelActor.scala101
2 files changed, 186 insertions, 110 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 1b0d258d1b..55cf976c14 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -394,46 +394,21 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
private var onTimeout: Option[TimerTask] = None
- /**
- * Sends <code>msg</code> to this actor (asynchronous) supplying
- * explicit reply destination.
- *
- * @param msg the message to send
- * @param replyTo the reply destination
- */
- override def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- if (waitingFor ne waitingForNone) {
- val savedWaitingFor = waitingFor
- waitingFor = waitingForNone
- scheduler execute {
- if (synchronized { savedWaitingFor(msg) }) {
- synchronized {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- }
-
- if (isSuspended) {
- synchronized {
- senders = replyTo :: senders
- received = Some(msg)
- resumeActor()
- }
- } else {
- synchronized {
- senders = List(replyTo)
- }
- // assert continuation != null
- (new Reaction(this, continuation, msg)).run()
- }
- } else synchronized {
- waitingFor = savedWaitingFor
- mailbox.append(msg, replyTo)
- }
+ protected[this] override def resumeReceiver(item: (Any, OutputChannel[Any])) {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ if (isSuspended) {
+ synchronized {
+ senders = item._2 :: senders
+ received = Some(item._1)
+ resumeActor()
}
} else {
- mailbox.append(msg, replyTo)
+ senders = List(item._2)
+ // assert continuation != null
+ (new Reaction(this, continuation, item._1)).run()
}
}
@@ -445,17 +420,34 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receive[R](f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
+
synchronized {
if (shouldExit) exit() // links
+ drainSendBuffer()
+ }
+
+ var done = false
+ while (!done) {
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
- waitingFor = f.isDefinedAt
- suspendActor()
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer()
+ // keep going
+ } else {
+ waitingFor = f.isDefinedAt
+ suspendActor()
+ done = true
+ }
+ }
} else {
received = Some(qel.msg)
senders = qel.session :: senders
+ done = true
}
}
+
val result = f(received.get)
received = None
senders = senders.tail
@@ -472,41 +464,57 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")
+
synchronized {
if (shouldExit) exit() // links
+ drainSendBuffer()
+ }
- // first, remove spurious TIMEOUT message from mailbox if any
- val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ val receiveTimeout = () => {
+ if (f.isDefinedAt(TIMEOUT)) {
+ received = Some(TIMEOUT)
+ senders = this :: senders
+ } else
+ error("unhandled timeout")
+ }
+
+ var done = false
+ while (!done) {
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
- if (msec == 0) {
- if (f.isDefinedAt(TIMEOUT)) {
- received = Some(TIMEOUT)
- senders = this :: senders
- } else
- error("unhandled timeout")
- } else {
- waitingFor = f.isDefinedAt
- received = None
- suspendActorFor(msec)
- if (received.isEmpty) {
- // actor is not resumed because of new message
- // therefore, waitingFor has not been updated, yet.
- waitingFor = waitingForNone
- if (f.isDefinedAt(TIMEOUT)) {
- received = Some(TIMEOUT)
- senders = this :: senders
+ val todo = synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer()
+ // keep going
+ () => {}
+ } else if (msec == 0) {
+ done = true
+ receiveTimeout
+ } else {
+ waitingFor = f.isDefinedAt
+ received = None
+ suspendActorFor(msec)
+ if (received.isEmpty) {
+ // actor is not resumed because of new message
+ // therefore, waitingFor has not been updated, yet.
+ waitingFor = waitingForNone
}
- else
- error("unhandled timeout")
+ done = true
+ receiveTimeout
}
}
+ todo()
} else {
received = Some(qel.msg)
senders = qel.session :: senders
+ done = true
}
}
+
val result = f(received.get)
received = None
senders = senders.tail
@@ -525,15 +533,9 @@ trait Actor extends OutputChannelActor with AbstractActor {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
synchronized {
if (shouldExit) exit() // links
- val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
- if (null eq qel) {
- waitingFor = f.isDefinedAt
- continuation = f
- } else {
- senders = List(qel.session)
- scheduleActor(f, qel.msg)
- }
+ drainSendBuffer()
}
+ searchMailbox(f)
throw Actor.suspendException
}
@@ -549,35 +551,56 @@ trait Actor extends OutputChannelActor with AbstractActor {
*/
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self(scheduler) == this, "react on channel belonging to other actor")
+
synchronized {
if (shouldExit) exit() // links
+ drainSendBuffer()
+ }
- // first, remove spurious TIMEOUT message from mailbox if any
- val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ // first, remove spurious TIMEOUT message from mailbox if any
+ mailbox.extractFirst((m: Any) => m == TIMEOUT)
+ val receiveTimeout = () => {
+ if (f.isDefinedAt(TIMEOUT)) {
+ senders = List(this)
+ scheduleActor(f, TIMEOUT)
+ } else
+ error("unhandled timeout")
+ }
+
+ var done = false
+ while (!done) {
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
- if (msec == 0) {
- if (f.isDefinedAt(TIMEOUT)) {
- senders = List(this)
- scheduleActor(f, TIMEOUT)
+ val todo = synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer()
+ // keep going
+ () => {}
+ } else if (msec == 0) {
+ done = true
+ receiveTimeout
+ } else {
+ waitingFor = f.isDefinedAt
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() { thisActor.send(TIMEOUT, thisActor) }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ continuation = f
+ done = true
+ () => {}
}
- else
- error("unhandled timeout")
- } else {
- waitingFor = f.isDefinedAt
- val thisActor = this
- onTimeout = Some(new TimerTask {
- def run() { thisActor.send(TIMEOUT, thisActor) }
- })
- Actor.timer.schedule(onTimeout.get, msec)
- continuation = f
}
+ todo()
} else {
senders = List(qel.session)
scheduleActor(f, qel.msg)
+ done = true
}
}
+
throw Actor.suspendException
}
@@ -791,7 +814,7 @@ trait Actor extends OutputChannelActor with AbstractActor {
}
// guarded by lock of this
- private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ protected override def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
}
diff --git a/src/actors/scala/actors/OutputChannelActor.scala b/src/actors/scala/actors/OutputChannelActor.scala
index 2b2a4397a1..28f2948680 100644
--- a/src/actors/scala/actors/OutputChannelActor.scala
+++ b/src/actors/scala/actors/OutputChannelActor.scala
@@ -10,6 +10,8 @@
package scala.actors
+import scala.collection.mutable.Queue
+
trait OutputChannelActor extends OutputChannel[Any] {
@volatile
@@ -18,6 +20,8 @@ trait OutputChannelActor extends OutputChannel[Any] {
/* The actor's mailbox. */
protected val mailbox = new MessageQueue
+ protected var sendBuffer = new Queue[(Any, OutputChannel[Any])]
+
/* A list of the current senders. The head of the list is
* the sender of the message that was received last.
*/
@@ -55,25 +59,52 @@ trait OutputChannelActor extends OutputChannel[Any] {
protected[actors] def mailboxSize: Int =
mailbox.size
- def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
- if (waitingFor ne waitingForNone) {
- val savedWaitingFor = waitingFor
- waitingFor = waitingForNone
- scheduler execute {
- if (synchronized { savedWaitingFor(msg) }) {
- synchronized {
- if (!ignoreSender)
- senders = List(replyTo)
+ /**
+ * Sends <code>msg</code> to this actor (asynchronous) supplying
+ * explicit reply destination.
+ *
+ * @param msg the message to send
+ * @param replyTo the reply destination
+ */
+ def send(msg: Any, replyTo: OutputChannel[Any]) {
+ val todo = synchronized {
+ if (waitingFor ne waitingForNone) {
+ val savedWaitingFor = waitingFor
+ waitingFor = waitingForNone
+ () => scheduler execute {
+ var item: Option[(Any, OutputChannel[Any])] =
+ synchronized { Some(msg, replyTo) }
+ while (!item.isEmpty) {
+ if (savedWaitingFor(item.get._1)) {
+ resumeReceiver(item.get)
+ item = None
+ } else {
+ mailbox.append(item.get._1, item.get._2)
+
+ item = synchronized {
+ if (!sendBuffer.isEmpty)
+ Some(sendBuffer.dequeue())
+ else {
+ waitingFor = savedWaitingFor
+ None
+ }
+ }
+ }
}
- // assert continuation != null
- (new LightReaction(this, continuation, msg)).run()
- } else synchronized {
- waitingFor = savedWaitingFor
- mailbox.append(msg, replyTo)
}
+ } else {
+ sendBuffer.enqueue((msg, replyTo))
+ () => { /* do nothing */ }
}
- } else
- mailbox.append(msg, replyTo)
+ }
+ todo()
+ }
+
+ protected[this] def resumeReceiver(item: (Any, OutputChannel[Any])) {
+ if (!ignoreSender)
+ senders = List(item._2)
+ // assert continuation != null
+ (new LightReaction(this, continuation, item._1)).run()
}
def !(msg: Any) {
@@ -86,19 +117,41 @@ trait OutputChannelActor extends OutputChannel[Any] {
def receiver: Actor = this.asInstanceOf[Actor]
- protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
- synchronized {
+ protected[this] def drainSendBuffer() {
+ while (!sendBuffer.isEmpty) {
+ val item = sendBuffer.dequeue()
+ mailbox.append(item._1, item._2)
+ }
+ }
+
+ protected[this] def searchMailbox(f: PartialFunction[Any, Unit]) {
+ var done = false
+ while (!done) {
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
- waitingFor = f.isDefinedAt
- continuation = f
+ synchronized {
+ // in mean time new stuff might have arrived
+ if (!sendBuffer.isEmpty) {
+ drainSendBuffer()
+ // keep going
+ } else {
+ waitingFor = f.isDefinedAt
+ continuation = f
+ done = true
+ }
+ }
} else {
- if (!ignoreSender)
- senders = List(qel.session)
+ senders = List(qel.session)
scheduleActor(f, qel.msg)
+ done = true
}
}
+ }
+
+ protected[actors] def react(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.rawSelf(scheduler) == this, "react on channel belonging to other actor")
+ synchronized { drainSendBuffer() }
+ searchMailbox(f)
throw Actor.suspendException
}
@@ -111,7 +164,7 @@ trait OutputChannelActor extends OutputChannel[Any] {
sender ! msg
}
- private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
scheduler execute (new LightReaction(this,
if (f eq null) continuation else f,
msg))