summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-10-01 15:58:38 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-10-01 15:58:38 +0000
commit353c843392167b35f4920eafc5acc3e10a4711ff (patch)
tree73bd876c06ffb2a418d23a1ead87e1f61a4d028b /src/actors
parentf30c0b0dba3889de9a9603761aec8a60a871fc85 (diff)
downloadscala-353c843392167b35f4920eafc5acc3e10a4711ff.tar.gz
scala-353c843392167b35f4920eafc5acc3e10a4711ff.tar.bz2
scala-353c843392167b35f4920eafc5acc3e10a4711ff.zip
Actors waiting in receive search for messages o...
Actors waiting in receive search for messages on their underlying thread. Simplified receiveWithin.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala104
-rw-r--r--src/actors/scala/actors/Reactor.scala15
2 files changed, 45 insertions, 74 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 057436a514..476bf85411 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -399,40 +399,26 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
*/
private var onTimeout: Option[TimerTask] = None
- /* Used for notifying scheduler when blocking inside <code>receive</code>. */
- private lazy val blocker = new ActorBlocker(0)
-
- private class RunCallable(fun: () => Unit) extends Callable[Unit] with Runnable {
- def call() = fun()
- def run() = fun()
- }
+ private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) =
+ if (isSuspended) {
+ () => synchronized {
+ mailbox.append(msg, replyTo)
+ resumeActor()
+ }
+ } else super.startSearch(msg, replyTo, handler)
- private[actors] override def makeReaction(fun: () => Unit): Runnable = {
- if (isSuspended)
- new RunCallable(fun)
- else
- new ActorTask(this, fun)
- }
+ private[actors] override def makeReaction(fun: () => Unit): Runnable =
+ new ActorTask(this, fun)
private[actors] override def resumeReceiver(item: (Any, OutputChannel[Any]), onSameThread: Boolean) {
- if (!onTimeout.isEmpty) {
- onTimeout.get.cancel()
- onTimeout = None
- }
- if (isSuspended) {
- synchronized {
- received = Some(item._1)
- senders = item._2 :: senders
- resumeActor()
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
}
- } else {
- senders = List(item._2)
- // assert continuation != null
- if (onSameThread)
- continuation(item._1)
- else
- scheduleActor(continuation, item._1)
}
+ senders = List(item._2)
+ super.resumeReceiver(item, onSameThread)
}
/**
@@ -462,7 +448,8 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
waitingFor = f.isDefinedAt
isSuspended = true
scheduler.managedBlock(blocker)
- done = true
+ drainSendBuffer(mailbox)
+ // keep going
}
}
} else {
@@ -522,19 +509,25 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
waitingFor = f.isDefinedAt
received = None
isSuspended = true
- scheduler.managedBlock(new ActorBlocker(msec))
- done = true
- if (received.isEmpty) {
- // actor is not resumed because of new message
- // therefore, waitingFor has not been updated, yet.
- waitingFor = waitingForNone
- receiveTimeout
- } else
- () => {}
+ val thisActor = this
+ onTimeout = Some(new TimerTask {
+ def run() { thisActor.send(TIMEOUT, thisActor) }
+ })
+ Actor.timer.schedule(onTimeout.get, msec)
+ scheduler.managedBlock(blocker)
+ drainSendBuffer(mailbox)
+ // keep going
+ () => {}
}
}
todo()
} else {
+ synchronized {
+ if (!onTimeout.isEmpty) {
+ onTimeout.get.cancel()
+ onTimeout = None
+ }
+ }
received = Some(qel.msg)
senders = qel.session :: senders
done = true
@@ -650,19 +643,17 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
scheduler executeFromActor task
}
- private class ActorBlocker(timeout: Long) extends scala.concurrent.ManagedBlocker {
+ /* Used for notifying scheduler when blocking inside receive/receiveWithin. */
+ private object blocker extends scala.concurrent.ManagedBlocker {
def block() = {
- if (timeout > 0)
- Actor.this.suspendActorFor(timeout)
- else
- Actor.this.suspendActor()
+ Actor.this.suspendActor()
true
}
def isReleasable =
!Actor.this.isSuspended
}
- private def suspendActor() {
+ private def suspendActor() = synchronized {
while (isSuspended) {
try {
wait()
@@ -674,29 +665,6 @@ trait Actor extends AbstractActor with ReplyReactor with ReplyableActor {
if (shouldExit) exit()
}
- private def suspendActorFor(msec: Long) {
- val ts = Platform.currentTime
- var waittime = msec
- var fromExc = false
- while (isSuspended) {
- try {
- fromExc = false
- wait(waittime)
- } catch {
- case _: InterruptedException => {
- fromExc = true
- val now = Platform.currentTime
- val waited = now-ts
- waittime = msec-waited
- if (waittime < 0) { isSuspended = false }
- }
- }
- if (!fromExc) { isSuspended = false }
- }
- // links: check if we should exit
- if (shouldExit) exit()
- }
-
private def resumeActor() {
isSuspended = false
notify()
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index a3e44d4d2c..07a59a8c28 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -22,7 +22,7 @@ trait Reactor extends OutputChannel[Any] {
/* The actor's mailbox. */
private[actors] val mailbox = new MessageQueue("Reactor")
- private[actors] var sendBuffer = new Queue[(Any, OutputChannel[Any])]
+ private[actors] val sendBuffer = new Queue[(Any, OutputChannel[Any])]
/* If the actor waits in a react, continuation holds the
* message handler that react was called with.
@@ -69,11 +69,7 @@ trait Reactor extends OutputChannel[Any] {
if (waitingFor ne waitingForNone) {
val savedWaitingFor = waitingFor
waitingFor = waitingForNone
- () => scheduler execute (makeReaction(() => {
- val startMbox = new MessageQueue("Start")
- synchronized { startMbox.append(msg, replyTo) }
- searchMailbox(startMbox, savedWaitingFor, true)
- }))
+ startSearch(msg, replyTo, savedWaitingFor)
} else {
sendBuffer.enqueue((msg, replyTo))
() => { /* do nothing */ }
@@ -82,6 +78,13 @@ trait Reactor extends OutputChannel[Any] {
todo()
}
+ private[actors] def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: Any => Boolean) =
+ () => scheduler execute (makeReaction(() => {
+ val startMbox = new MessageQueue("Start")
+ synchronized { startMbox.append(msg, replyTo) }
+ searchMailbox(startMbox, handler, true)
+ }))
+
private[actors] def makeReaction(fun: () => Unit): Runnable =
new ReactorTask(this, fun)