From 4b0a5966df2e4755890ed8e6c2b0e7350cbae1ae Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Thu, 19 Oct 2006 13:52:11 +0000 Subject: Fixed bugs in orElse, more general typing. --- src/actors/scala/actors/Actor.scala | 36 ++++++++++++++++++++++++----------- src/actors/scala/actors/Channel.scala | 21 ++++++++++++++++---- 2 files changed, 42 insertions(+), 15 deletions(-) (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index b44326fe53..370b861bab 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -68,6 +68,8 @@ object Actor { def ? : Any = self.in.? + def poll: Option[Any] = self.in.poll + /** * Receives a message from the mailbox of * self. Blocks if no message matching any of the @@ -168,17 +170,17 @@ object Actor { */ def reply(): Unit = reply(()) - private[actors] trait Body[T] { - def orElse(other: => T): T - def andThen(other: => T): T + private[actors] trait Body[a] { + def orElse[b >: a](other: => b): b + def andThen[b >: a](other: => b): b } - implicit def mkBody(body: => Unit) = new Body[Unit] { - def orElse(other: => Unit): Unit = choose(body, other) - def andThen(other: => Unit): Unit = seq(body, other) + implicit def mkBody[a](body: => a) = new Body[a] { + def orElse[b >: a](other: => b): b = choose(body, other) + def andThen[b >: a](other: => b): b = seq(body, other) } - private[actors] def choose(alt1: => Unit, alt2: => Unit): Unit = { + private[actors] def choose[a, b >: a](alt1: => a, alt2: => b): b = { val s = self // save former custom suspendActor function // (e.g. from further orElse) @@ -187,10 +189,22 @@ object Actor { // have to get out of the point of suspend in alt1's // receive - s.suspendActor = () => { throw new SuspendActorException } - s.detachActor = f => { throw new SuspendActorException } + s.suspendActor = () => { + s.in.isSuspended = false + s.in.waitingFor = s.in.waitingForNone + throw new SuspendActorException + } + s.detachActor = f => { + s.in.waitingFor = s.in.waitingForNone + throw new SuspendActorException + } - try { alt1 } + try { + val res = alt1 + s.suspendActor = suspendNext + s.detachActor = detachNext + res + } catch { case d: SuspendActorException => s.suspendActor = suspendNext @@ -218,7 +232,7 @@ object Actor { * @param first ... * @param next ... */ - def seq(first: => Unit, next: => Unit): Unit = { + def seq[a, b >: a](first: => a, next: => b): b = { val s = self s.kill = () => { next } first diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index c064d96c73..74a5b88916 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -10,6 +10,8 @@ package scala.actors +import Actor._ + case object TIMEOUT class SuspendActorException extends Throwable { @@ -41,9 +43,9 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private var received: Msg = _ - private val waitingForNone = (m: Msg) => false - private var waitingFor: Msg => boolean = waitingForNone - private var waitingForSender: Actor = null + private[actors] val waitingForNone = (m: Msg) => false + private[actors] var waitingFor: Msg => boolean = waitingForNone + private[actors] var waitingForSender: Actor = null //private val messageQueue = new MessageQueue[Msg] private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]] @@ -62,6 +64,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { TimerThread.trashRequest(receiver) } + // TODO: transmit call to protocol + if (isSuspended) receiver.resumeActor() else @@ -79,6 +83,12 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { def ? : Msg = receive { case any => any } + def poll = { + Some(?) + } orElse { + None.asInstanceOf[Option[Msg]] + } + /** * Sends msg to this Channel and * awaits reply. @@ -97,7 +107,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { */ def forward(msg: Msg): unit = send(msg, receiver.sender) - private var isSuspended = false + private[actors] var isSuspended = false /** * Receives a message from this Channel. @@ -126,6 +136,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { }) match { case Some(Pair(msg, sender)) => { received = msg + // TODO: call to delivery protocol action receiver.pushSender(sender) } case None => { @@ -142,6 +153,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { } receiver.resetActor() val result = f(received) + // TODO: call to complete protocol action receiver.popSender() result } @@ -288,6 +300,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { received = msg receiver.pushSender(sender) waitingFor = waitingForNone + // TODO: call to delivery protocol action receiver.scheduleActor(f, received) } case None => { -- cgit v1.2.3