summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-17 10:40:31 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-17 10:40:31 +0000
commitcf5b53633eb1f99c340feb18bcc83b8af2e8065d (patch)
tree8db3281fc036cf40e118a6d53cef245b0c84c37d /src/actors
parent6093bbedc0c0cf39650c4cd931afb18feb1bcda8 (diff)
downloadscala-cf5b53633eb1f99c340feb18bcc83b8af2e8065d.tar.gz
scala-cf5b53633eb1f99c340feb18bcc83b8af2e8065d.tar.bz2
scala-cf5b53633eb1f99c340feb18bcc83b8af2e8065d.zip
Changed channels. Removed receiveFrom and Request.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala245
-rw-r--r--src/actors/scala/actors/Channel.scala419
-rw-r--r--src/actors/scala/actors/InputChannel.scala2
-rw-r--r--src/actors/scala/actors/Scheduler.scala2
4 files changed, 220 insertions, 448 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 478466f916..4444378a5b 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -10,16 +10,18 @@
package scala.actors
-import scala.collection.mutable.{HashSet, Stack}
+import scala.collection.mutable.{HashSet, Queue}
import compat.Platform
+import java.util.Stack
+
/**
* The <code>Actor</code> object provides functions for the definition of
* actors, as well as all actor operations, such as
* <code>receive</code>, <code>react</code>, <code>reply</code>,
* etc.
*
- * @version 0.9.0
+ * @version 0.9.2
* @author Philipp Haller
*/
object Actor {
@@ -67,9 +69,7 @@ object Actor {
}
*/
- def ? : Any = self.in.?
-
- def poll: Option[Any] = self.in.poll
+ def ? : Any = self.?
/**
* Receives a message from the mailbox of
@@ -80,7 +80,7 @@ object Actor {
* @return ...
*/
def receive[a](f: PartialFunction[Any, a]): a =
- self.in.receive(f)
+ self.receive(f)
/**
* Receives a message from the mailbox of
@@ -95,7 +95,7 @@ object Actor {
* @return ...
*/
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R =
- self.in.receiveWithin(msec)(f)
+ self.receiveWithin(msec)(f)
/**
* <code>receive</code> for event-based reactors.
@@ -108,7 +108,7 @@ object Actor {
* @return ...
*/
def react(f: PartialFunction[Any, Unit]): Nothing =
- self.in.react(f)
+ self.react(f)
/**
* <code>receiveWithin</code> for event-based reactors.
@@ -122,10 +122,10 @@ object Actor {
* @return ...
*/
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing =
- self.in.reactWithin(msec)(f)
+ self.reactWithin(msec)(f)
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
- self.in.react(new RecursiveProxyHandler(self, f))
+ self.react(new RecursiveProxyHandler(self, f))
private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit])
extends PartialFunction[Any, Unit] {
@@ -133,26 +133,11 @@ object Actor {
true // events are immediately removed from the mailbox
def apply(m: Any): Unit = {
if (f.isDefinedAt(m)) f(m)
- self.in.react(this)
+ self.react(this)
}
}
/**
- * <p>Used for receiving a message from a specific actor.</p>
- * <p>Example:</p> <code>from (a) receive { //... }</code>
- *
- * @param r ...
- * @return ...
- */
- def from(r: Actor): FromReceive =
- new FromReceive(r)
-
- private[actors] class FromReceive(r: Actor) {
- def receive[a](f: PartialFunction[Any, a]): a =
- self.in.receiveFrom(r)(f)
- }
-
- /**
* Returns the actor which sent the last received message.
*/
def sender: Actor = self.sender
@@ -161,13 +146,13 @@ object Actor {
* Send <code>msg</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(msg: Any): Unit = sender.reply ! msg
+ def reply(msg: Any): Unit = self.reply(msg)
/**
* Send <code>()</code> to the actor waiting in a call to
* <code>!?</code>.
*/
- def reply(): Unit = reply(())
+ def reply(): Unit = self.reply(())
private[actors] trait Body[a] {
def orElse[b >: a](other: => b): b
@@ -189,12 +174,12 @@ object Actor {
// have to get out of the point of suspend in alt1's
// receive
s.suspendActor = () => {
- s.in.isSuspended = false
- s.in.waitingFor = s.in.waitingForNone
+ s.isSuspended = false
+ s.waitingFor = s.waitingForNone
throw new SuspendActorException
}
s.detachActor = f => {
- s.in.waitingFor = s.in.waitingForNone
+ s.waitingFor = s.waitingForNone
Scheduler.unPendReaction
throw new SuspendActorException
}
@@ -284,13 +269,6 @@ object Actor {
def exit(reason: String): Nothing = self.exit(reason)
}
-case class Request[a](msg: a) {
- private[actors] val in = new Channel[a]
- def reply(resp: a): unit = {
- in ! resp
- }
-}
-
/**
* <p>
* This class provides (together with <code>Channel</code>) an
@@ -302,26 +280,148 @@ case class Request[a](msg: a) {
* Philipp Haller, Martin Odersky <i>Proc. JMLC 2006</i>
* </p>
*
- * @version 0.9.0
+ * @version 0.9.2
* @author Philipp Haller
*/
trait Actor extends OutputChannel[Any] {
- private[actors] val in = new Channel[Any]
- in.receiver = this
- private var rc: Channel[Any] = null
+ private var received: Option[Any] = None
+
+ private[actors] val waitingForNone = (m: Any) => false
+ private[actors] var waitingFor: Any => boolean = waitingForNone
+ private[actors] var isSuspended = false
+
+ private val sessions = new Stack//[Channel[Any]]
+
+ private val mailbox = new Queue[Pair[Any, Channel[Any]]]
+
+ private def send(msg: Any, session: Channel[Any]) = synchronized {
+ tick()
+ if (waitingFor(msg)) {
+ received = Some(msg)
+ sessions push session
+ waitingFor = waitingForNone
+
+ if (timeoutPending) {
+ timeoutPending = false
+ TimerThread.trashRequest(this)
+ }
+
+ if (isSuspended)
+ resumeActor()
+ else
+ scheduleActor(null, msg)
+ } else {
+ mailbox += Pair(msg, session)
+ }
+ }
+
+ def receive[R](f: PartialFunction[Any, R]): R = {
+ assert(Actor.self == this, "receive from channel belonging to other actor")
+ this.synchronized {
+ tick()
+ mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
+ f.isDefinedAt(p._1)
+ }) match {
+ case Some(Pair(msg, session)) => {
+ received = Some(msg)
+ sessions push session
+ }
+ case None => {
+ waitingFor = f.isDefinedAt
+ isSuspended = true
+ suspendActor()
+ }
+ }
+ waitingFor = waitingForNone
+ isSuspended = false
+ }
+ val result = f(received.get)
+ sessions.pop
+ result
+ }
+
+ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
+ assert(Actor.self == this, "receive from channel belonging to other actor")
+ this.synchronized {
+ tick()
+ mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
+ f.isDefinedAt(p._1)
+ }) match {
+ case Some(Pair(msg, session)) => {
+ received = Some(msg)
+ sessions push session
+ }
+ case None => {
+ waitingFor = f.isDefinedAt
+ isSuspended = true
+ received = None
+ suspendActorFor(msec)
+ Debug.info("received: "+received)
+ if (received.isEmpty) {
+ Debug.info("no message received after "+msec+" millis")
+ if (f.isDefinedAt(TIMEOUT)) {
+ Debug.info("executing TIMEOUT action")
+ waitingFor = waitingForNone
+ isSuspended = false
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+ }
+ }
+ waitingFor = waitingForNone
+ isSuspended = false
+ }
+ val result = f(received.get)
+ sessions.pop
+ result
+ }
- private[actors] def reply: Channel[Any] = {
- if (rc eq null) {
- rc = new Channel[Any]
- rc.receiver = this
+ def react(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.self == this, "react on channel belonging to other actor")
+ Scheduler.pendReaction
+ this.synchronized {
+ tick()
+ mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
+ f.isDefinedAt(p._1)
+ }) match {
+ case Some(Pair(msg, session)) => {
+ sessions push session
+ scheduleActor(f, msg)
+ }
+ case None => {
+ waitingFor = f.isDefinedAt
+ detachActor(f)
+ }
+ }
+ throw new SuspendActorException
}
- rc
}
- private[actors] def freshReply(): Unit = {
- rc = new Channel[Any]
- rc.receiver = this
+ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(Actor.self == this, "react on channel belonging to other actor")
+ Scheduler.pendReaction
+ this.synchronized {
+ tick()
+ mailbox.dequeueFirst((p: Pair[Any, Channel[Any]]) => {
+ f.isDefinedAt(p._1)
+ }) match {
+ case Some(Pair(msg, session)) => {
+ sessions push session
+ scheduleActor(f, msg)
+ }
+ case None => {
+ waitingFor = f.isDefinedAt
+ TimerThread.requestTimeout(this, f, msec)
+ timeoutPending = true
+ detachActor(f)
+ }
+ }
+ throw new SuspendActorException
+ }
}
/**
@@ -335,32 +435,49 @@ trait Actor extends OutputChannel[Any] {
/**
* Sends <code>msg</code> to this actor (asynchronous).
*/
- def !(msg: Any): Unit = in ! msg
+ def !(msg: Any): Unit = send(msg, Actor.self.reply)
- def forward(msg: Any): Unit = in forward msg
+ def forward(msg: Any): Unit = send(msg, Actor.sender.reply)
/**
* Sends <code>msg</code> to this actor and awaits reply
* (synchronous).
*/
- def !?(msg: Any): Any = in !? msg
+ def !?(msg: Any): Any = {
+ val replyChannel = Actor.self.freshReply()
+ this ! msg
+ replyChannel.receive {
+ case x => x
+ }
+ }
- def rpc[a](msg: a): a = {
- Debug.info("Actor.!? called by "+Actor.self)
- val req = Request(msg)
- in ! req
- req.in.?
+ def !?(msec: long, msg: Any): Option[Any] = {
+ val replyChannel = Actor.self.freshReply()
+ this ! msg
+ replyChannel.receiveWithin(msec) {
+ case TIMEOUT => None
+ case x => Some(x)
+ }
}
- private val lastSenders = new Stack[Actor]
+ def reply(msg: Any): Unit = session ! msg
- private[actors] def sender: Actor = {
- if (lastSenders.isEmpty) null
- else lastSenders.top
+ private var rc = new Channel[Any]
+ def reply = rc
+ def freshReply() = { rc = new Channel[Any]; rc }
+
+ def ? : Any = receive {
+ case x => x
}
- private[actors] def pushSender(s: Actor) = { lastSenders.push(s) }
- private[actors] def popSender(): Unit = { lastSenders.pop }
+ private[actors] def sender: Actor =
+ if (sessions.empty) null
+ else sessions.peek.asInstanceOf[Channel[Any]].receiver
+
+ private[actors] def session: Channel[Any] =
+ if (sessions.empty) null
+ else sessions.peek.asInstanceOf[Channel[Any]]
+
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 29293b3286..3c6bf822fb 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -24,12 +24,14 @@ class SuspendActorException extends Throwable {
}
}
+case class ![a](ch: Channel[a], msg: a)
+
/**
* This class provides a means for typed communication among
* actors. Only the actor creating an instance of a
* <code>Channel</code> may receive from it.
*
- * @version 0.9.0
+ * @version 0.9.2
* @author Philipp Haller
*/
class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
@@ -39,417 +41,70 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
Actor.selfs.get(currentThread).asInstanceOf[Actor]
}
- private var received: Option[Msg] = None
-
- private[actors] val waitingForNone = (m: Msg) => false
- private[actors] var waitingFor: Msg => boolean = waitingForNone
- private[actors] var waitingForSender: Actor = null
-
- private[actors] var isSuspended = false
-
- //private val messageQueue = new MessageQueue[Msg]
- private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]]
-
- private def send(msg: Msg, sender: Actor) = receiver.synchronized {
- receiver.tick()
- if (waitingFor(msg) && ((waitingForSender eq null) ||
- (waitingForSender == sender))) {
- received = Some(msg)
- receiver.pushSender(sender)
- waitingFor = waitingForNone
- waitingForSender = null
-
- if (receiver.timeoutPending) {
- receiver.timeoutPending = false
- TimerThread.trashRequest(receiver)
- }
-
- if (isSuspended)
- receiver.resumeActor()
- else
- receiver.scheduleActor(null, msg)
- } else {
- //messageQueue.append(msg, sender)
- mailbox += Pair(msg, sender)
- }
- }
-
/**
* Sends <code>msg</code> to this <code>Channel</code>.
*/
- def !(msg: Msg): unit = send(msg, Actor.self)
-
- def ? : Msg = receive { case any => any }
-
- def poll: Option[Msg] = {
- Some(?)
- } orElse {
- None.asInstanceOf[Option[Msg]]
- }
-
- /**
- * Sends <code>msg</code> to this <code>Channel</code> and
- * awaits reply.
- */
- def !?(msg: Msg): Any = {
- Actor.self.freshReply()
- this ! msg
- Actor.self.reply.receiveFrom(receiver) {
- case x => x
- }
- }
-
- def !?(msec: long, msg: Msg): Option[Any] = {
- Debug.info("rpc with timeout "+msec)
- Actor.self.freshReply()
- this ! msg
- Actor.self.reply.receiveWithinFrom(msec)(receiver) {
- case TIMEOUT => None
- case x => Some(x)
- }
+ def !(msg: Msg): unit = {
+ receiver ! scala.actors.!(this, msg)
}
/**
* Forwards <code>msg</code> to <code>this</code> keeping the
* last sender as sender instead of <code>self</code>.
*/
- def forward(msg: Msg): unit = send(msg, receiver.sender)
-
- /**
- * Receives a message from this <code>Channel</code>.
- */
- def receive[R](f: PartialFunction[Msg, R]): R = {
- assert(Actor.self == receiver, "receive from channel belonging to other actor")
- receiver.synchronized {
- receiver.tick()
-
-/*
- val q = messageQueue.extractFirst(waitingFor)
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- }
- // acquire lock because we might call wait()
- else synchronized {
- isSuspended = true
- receiver.suspendActor()
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1)
- }) match {
- case Some(Pair(msg, sender)) => {
- received = Some(msg)
- receiver.pushSender(sender)
- }
- case None => {
- // acquire lock because we might call wait()
- this.synchronized {
- waitingFor = f.isDefinedAt
- isSuspended = true
- //val wp = Tuple4(f.isDefinedAt, None, None, Some(receiver))
- //waitingParts = wp :: waitingParts
- receiver.suspendActor()
- }
- }
- }
-
- waitingFor = waitingForNone
- isSuspended = false
- }
- val result = f(received.get)
- receiver.popSender()
- result
+ def forward(msg: Msg): unit = {
+ receiver forward scala.actors.!(this, msg)
}
- private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = {
- assert(Actor.self == receiver, "receive from channel belonging to other actor")
- receiver.synchronized {
- receiver.tick()
-
-/*
- var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => {
- waitingFor(item.msg) && item.sender == r
- })
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- }
- else synchronized {
- isSuspended = true
- receiver.suspendActor()
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1) && p._2 == r
- }) match {
- case Some(Pair(msg, sender)) => {
- received = Some(msg)
- receiver.pushSender(sender)
- }
- case None => {
- // acquire lock because we might call wait()
- this.synchronized {
- waitingFor = f.isDefinedAt
- waitingForSender = r
- isSuspended = true
- receiver.suspendActor()
- }
- }
- }
-
- waitingFor = waitingForNone
- waitingForSender = null
- isSuspended = false
+ def receive[R](f: PartialFunction[Any, R]): R = {
+ val C = this
+ receiver.receive {
+ case C ! msg if (f.isDefinedAt(msg)) => f(msg)
}
- val result = f(received.get)
- receiver.popSender()
- result
}
- /**
- * Receives a message from this <code>Channel</code>. If no
- * message could be received before <code>msec</code>
- * milliseconds elapsed, the <code>TIMEOUT</code> action is
- * executed if specified.
- */
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
- assert(Actor.self == receiver, "receive from channel belonging to other actor")
- receiver.synchronized {
- receiver.tick()
-
-/*
- val q = messageQueue.extractFirst(waitingFor)
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- }
- else synchronized {
- isSuspended = true
- receiver.suspendActorFor(msec)
- if (received eq null)
- if (f.isDefinedAt(TIMEOUT)) {
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
- else
- error("unhandled timeout")
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1)
- }) match {
- case Some(Pair(msg, sender)) => {
- received = Some(msg)
- receiver.pushSender(sender)
- }
- case None => {
- // acquire lock because we might call wait()
- this.synchronized {
- waitingFor = f.isDefinedAt
- isSuspended = true
- received = None
- receiver.suspendActorFor(msec)
- Debug.info("received: "+received)
- if (received.isEmpty) {
- Debug.info("no message received after "+msec+" millis")
- if (f.isDefinedAt(TIMEOUT)) {
- Debug.info("executing TIMEOUT action")
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
- else
- error("unhandled timeout")
- }
- }
- }
- }
-
- waitingFor = waitingForNone
- isSuspended = false
+ val C = this
+ receiver.receiveWithin(msec) {
+ case C ! msg if (f.isDefinedAt(msg)) => f(msg)
+ case TIMEOUT => f(TIMEOUT)
}
- val result = f(received.get)
- receiver.popSender()
- result
}
- def receiveWithinFrom[R](msec: long)(r: Actor)(f: PartialFunction[Any, R]): R = {
- assert(Actor.self == receiver, "receive from channel belonging to other actor")
- receiver.synchronized {
- receiver.tick()
-
-/*
- val q = messageQueue.extractFirst(waitingFor)
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- }
- else synchronized {
- waitingFor = f.isDefinedAt
- waitingForSender = r
- isSuspended = true
- receiver.suspendActorFor(msec)
- if (received eq null)
- if (f.isDefinedAt(TIMEOUT)) {
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
- else
- error("unhandled timeout")
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1) && p._2 == r
- }) match {
- case Some(Pair(msg, sender)) => {
- received = Some(msg)
- receiver.pushSender(sender)
- }
- case None => {
- // acquire lock because we might call wait()
- this.synchronized {
- waitingFor = f.isDefinedAt
- waitingForSender = r
- isSuspended = true
- received = None
- receiver.suspendActorFor(msec)
- Debug.info("received: "+received)
- if (received.isEmpty) {
- Debug.info("no message received after "+msec+" millis")
- if (f.isDefinedAt(TIMEOUT)) {
- Debug.info("executing TIMEOUT action")
- isSuspended = false
- val result = f(TIMEOUT)
- return result
- }
- else
- error("unhandled timeout")
- }
- }
- }
- }
-
- waitingFor = waitingForNone
- waitingForSender = null
- isSuspended = false
+ def react(f: PartialFunction[Any, Unit]): Nothing = {
+ val C = this
+ receiver.react {
+ case C ! msg if (f.isDefinedAt(msg)) => f(msg)
}
- val result = f(received.get)
- receiver.popSender()
- result
}
- /**
- * <code>receive</code> for reactors.
- */
- def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self == receiver, "react on channel belonging to other actor")
- Scheduler.pendReaction
- receiver.synchronized {
- receiver.tick()
-
-/*
- val q = messageQueue.extractFirst(waitingFor)
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- waitingFor = waitingForNone
- receiver.scheduleActor(f, received)
- }
- else synchronized {
- receiver.detachActor(f)
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1)
- }) match {
- case Some(Pair(msg, sender)) => {
- receiver.pushSender(sender)
- receiver.scheduleActor(f, msg)
- }
- case None => {
- this.synchronized {
- //Scheduler.detached(receiver)
- waitingFor = f.isDefinedAt
- //val wp = Tuple4(f.isDefinedAt, None, Some(f), None)
- //waitingParts = wp :: waitingParts
- receiver.detachActor(f)
- }
- }
- }
-
- throw new SuspendActorException
+ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
+ val C = this
+ receiver.reactWithin(msec) {
+ case C ! msg if (f.isDefinedAt(msg)) => f(msg)
+ case TIMEOUT => f(TIMEOUT)
}
}
/**
- * <code>receiveWithin</code> for reactors.
+ * Sends <code>msg</code> to this <code>Channel</code> and
+ * awaits reply.
*/
- def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
- assert(Actor.self == receiver, "react on channel belonging to other actor")
- Scheduler.pendReaction
- receiver.synchronized {
- receiver.tick()
-
-/*
- val q = messageQueue.extractFirst(waitingFor)
- if (q != null) {
- received = q.msg
- receiver.pushSender(q.sender)
- waitingFor = waitingForNone
- receiver.scheduleActor(f, received)
- }
- else synchronized {
- waitingFor = f.isDefinedAt
- TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec)
- receiver.asInstanceOf[Reactor].timeoutPending = true
- receiver.detachActor(f)
- }
-*/
-
- mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
- waitingFor(p._1)
- }) match {
- case Some(Pair(msg, sender)) => {
- received = Some(msg)
- receiver.pushSender(sender)
- waitingFor = waitingForNone
- receiver.scheduleActor(f, received.get)
- }
- case None => {
- this.synchronized {
- waitingFor = f.isDefinedAt
- TimerThread.requestTimeout(receiver, f, msec)
- receiver.timeoutPending = true
- receiver.detachActor(f)
- }
- }
- }
-
- throw new SuspendActorException
+ def !?(msg: Msg): Any = {
+ val replyChannel = Actor.self.freshReply()
+ receiver ! scala.actors.!(this, msg)
+ replyChannel.receive {
+ case x => x
}
}
- /*
- * Prints contents of mailbox to standard out.
- * This is used for printing actor dumps.
- */
- private[actors] def printMailbox = {
- Console.print("[")
- val msgs = mailbox.elements
- if (msgs.hasNext)
- Console.print(msgs.next._1.toString())
- while (msgs.hasNext) {
- Console.print(", "+msgs.next._1.toString())
+ def !?(msec: long, msg: Msg): Option[Any] = {
+ val replyChannel = Actor.self.freshReply()
+ receiver ! scala.actors.!(this, msg)
+ replyChannel.receiveWithin(msec) {
+ case TIMEOUT => None
+ case x => Some(x)
}
- Console.println("]")
}
}
diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala
index 6ddde7e911..c22671fc93 100644
--- a/src/actors/scala/actors/InputChannel.scala
+++ b/src/actors/scala/actors/InputChannel.scala
@@ -16,7 +16,7 @@ package scala.actors
* @author Philipp Haller
*/
trait InputChannel[Msg] {
- def receive[R](f: PartialFunction[Msg, R]): R
+ def receive[R](f: PartialFunction[Any, R]): R
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R
def react(f: PartialFunction[Any, Unit]): Nothing
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index a4b5eb3368..1e78798337 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -132,7 +132,7 @@ class TickedScheduler extends Thread with IScheduler {
if (a.isDetached || a.isWaiting) {
// dump contents of mailbox
Console.println("Waiting with mailbox:")
- a.in.printMailbox
+ //a.printMailbox
}
Console.println