summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/actors/scala/actors/Actor.scala52
-rw-r--r--src/actors/scala/actors/Channel.scala4
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala4
4 files changed, 27 insertions, 35 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 6188067005..9b0422bab2 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -270,12 +270,12 @@ trait Actor extends OutputChannel[Any] {
private var received: Option[Any] = None
- private[actors] val waitingForNone = (m: Any) => false
- private[actors] var waitingFor: Any => Boolean = waitingForNone
- private[actors] var isSuspended = false
+ private val waitingForNone = (m: Any) => false
+ private var waitingFor: Any => Boolean = waitingForNone
+ private var isSuspended = false
private val mailbox = new MessageQueue
- private[actors] var sessions: List[OutputChannel[Any]] = Nil
+ private var sessions: List[OutputChannel[Any]] = Nil
private var session1: Option[OutputChannel[Any]] = None
private[actors] def send(msg: Any, session: OutputChannel[Any]) = synchronized {
@@ -297,8 +297,8 @@ trait Actor extends OutputChannel[Any] {
if (isSuspended)
resumeActor()
- else
- scheduleActor(null, msg)
+ else // continuation != null
+ Scheduler.execute(new Reaction(this, continuation, msg))
} else {
mailbox.append(msg, session)
}
@@ -440,7 +440,7 @@ trait Actor extends OutputChannel[Any] {
* (synchronous).
*/
def !?(msg: Any): Any = {
- val replyCh = new Channel[Any](Actor.self)
+ val replyCh = Actor.self.freshReplyChannel
send(msg, replyCh)
replyCh.receive {
case x => x
@@ -455,7 +455,7 @@ trait Actor extends OutputChannel[Any] {
* <code>value</code> is the reply value.
*/
def !?(msec: Long, msg: Any): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self)
+ val replyCh = Actor.self.freshReplyChannel
send(msg, replyCh)
replyCh.receiveWithin(msec) {
case TIMEOUT => None
@@ -519,9 +519,9 @@ trait Actor extends OutputChannel[Any] {
sender ! msg
}
- private var rc = new Channel[Any](this)
- def getReplyChannel = rc
- def freshReply() = { rc = new Channel[Any]; rc }
+ private var rc: Channel[Any] = null
+ private[actors] def replyChannel = rc
+ private[actors] def freshReplyChannel = { rc = new Channel[Any](this); rc }
/**
* Receives the next message from this actor's mailbox.
@@ -530,29 +530,22 @@ trait Actor extends OutputChannel[Any] {
case x => x
}
-/*
- def sender: Actor = {
- val s = session
- if (null ne s) s.receiver
- else null
- }
-*/
-
def sender: OutputChannel[Any] =
if (sessions.isEmpty) {
session1 match {
case None => null
case Some(s) => s
}
- } else sessions.head//.asInstanceOf[OutputChannel[Any]]
+ } else sessions.head
- private[actors] var continuation: PartialFunction[Any, Unit] = null
- private[actors] var timeoutPending = false
+ private var continuation: PartialFunction[Any, Unit] = null
+ private var timeoutPending = false
+ // accessed in Reaction
private[actors] var isDetached = false
- private[actors] var isWaiting = false
+ private var isWaiting = false
// guarded by lock of this
- private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
+ private def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) =
if ((f eq null) && (continuation eq null)) {
// do nothing (timeout is handled instead)
}
@@ -563,12 +556,12 @@ trait Actor extends OutputChannel[Any] {
Scheduler execute task
}
- private[actors] def tick(): Unit =
+ private def tick(): Unit =
Scheduler tick this
private[actors] var kill: () => unit = () => {}
- def suspendActor() {
+ private def suspendActor() {
isWaiting = true
while(isWaiting) {
try {
@@ -581,7 +574,7 @@ trait Actor extends OutputChannel[Any] {
if (shouldExit) exit()
}
- def suspendActorFor(msec: Long) {
+ private def suspendActorFor(msec: Long) {
val ts = Platform.currentTime
var waittime = msec
var fromExc = false
@@ -605,7 +598,7 @@ trait Actor extends OutputChannel[Any] {
if (shouldExit) exit()
}
- def resumeActor() {
+ private def resumeActor() {
isWaiting = false
notify()
}
@@ -742,9 +735,8 @@ trait Actor extends OutputChannel[Any] {
exitReason = reason
if (isSuspended)
resumeActor()
- else if (isDetached) {
+ else if (isDetached)
scheduleActor(null, null)
- }
}
}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index e3db8dc8ac..e0c2c10960 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -137,7 +137,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
* @return the reply
*/
def !?(msg: Msg): Any = {
- val replyCh = new Channel[Any](Actor.self)
+ val replyCh = Actor.self.freshReplyChannel
receiver.send(scala.actors.!(this, msg), replyCh)
replyCh.receive {
case x => x
@@ -154,7 +154,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
* <code>Some(x)</code> where <code>x</code> is the reply
*/
def !?(msec: Long, msg: Msg): Option[Any] = {
- val replyCh = new Channel[Any](Actor.self)
+ val replyCh = Actor.self.freshReplyChannel
receiver.send(scala.actors.!(this, msg), replyCh)
replyCh.receiveWithin(msec) {
case TIMEOUT => None
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 0862f148df..d28ed56ffb 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -114,7 +114,7 @@ class NetKernel(service: Service) {
val msg = service.serializer.deserialize(data)
val senderProxy = new Actor {
def act() = {
- a.getReplyChannel ! msg
+ a.replyChannel ! msg
}
}
senderProxy.start(); {}
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
index 14dc7b1a91..a38c56ab27 100644
--- a/src/actors/scala/actors/remote/RemoteActor.scala
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -98,9 +98,9 @@ object RemoteActor {
}
override def !?(msg: Any): Any = msg match {
case a: AnyRef =>
- val replyChannel = Actor.self.freshReply()
+ val replyCh = Actor.self.freshReplyChannel
selfKernel.syncSend(node, sym, a)
- replyChannel.receive {
+ replyCh.receive {
case x => x
}
case other =>