summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-10-10 08:42:42 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-10-10 08:42:42 +0000
commitbf18c3732071dee637284f50c5d979e924c3b1ad (patch)
tree03e3dda5726200e40d99313088b8dbf27fc67c9a /src/actors
parent7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb (diff)
downloadscala-bf18c3732071dee637284f50c5d979e924c3b1ad.tar.gz
scala-bf18c3732071dee637284f50c5d979e924c3b1ad.tar.bz2
scala-bf18c3732071dee637284f50c5d979e924c3b1ad.zip
Event-based actors now allowed to call receive.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala205
-rw-r--r--src/actors/scala/actors/Channel.scala140
-rw-r--r--src/actors/scala/actors/Reactor.scala19
-rw-r--r--src/actors/scala/actors/ThreadedActor.scala1
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala2
-rw-r--r--src/actors/scala/actors/remote/RemoteActor.scala97
6 files changed, 255 insertions, 209 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 1caa6d6d8e..b1f8bcde23 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -31,24 +31,16 @@ object Actor {
*/
def self: Actor = synchronized {
val t = Thread.currentThread()
- if (t.isInstanceOf[ActorThread])
- t.asInstanceOf[ActorThread]
- else {
- var a = selfs.get(t).asInstanceOf[Actor]
- if (a == null) {
- a = new ActorProxy(t)
- selfs.put(t, a)
- }
- a
+ var a = selfs.get(t).asInstanceOf[Actor]
+ if (a == null) {
+ a = new ActorProxy(t)
+ selfs.put(t, a)
}
+ a
}
- /**
- * Creates an instance of a thread-based actor executing <code>body</code>,
- * and starts it.
- */
- def actor(body: => Unit): ActorThread = synchronized {
- val actor = new ActorThread {
+ def actor(body: => Unit): Actor = synchronized {
+ val actor = new Reactor {
def act() = body
}
actor.start()
@@ -60,8 +52,8 @@ object Actor {
* channel which can be used for typed communication with other
* actors.
*/
- def actor[a](ch: Channel[a])(body: => Unit): ActorThread = synchronized {
- val actor = new ActorThread {
+ def actor[a](ch: Channel[a])(body: => Unit): Actor = synchronized {
+ val actor = new Reactor {
def act() = body
}
ch.receiver = actor
@@ -70,24 +62,9 @@ object Actor {
}
/**
- * Creates an instance of an event-based reactor executing
- * <code>body</code>, and starts it.
- */
- def reactor(body: => Unit): Reactor = synchronized {
- val reactor = new Reactor {
- def act() = body
- }
- reactor.start()
- reactor
- }
-
- /**
* Receives a message from the mailbox of
* <code>self</code>. Blocks if no message matching any of the
* cases of <code>f</code> can be received.
- *
- * Only (thread-based) actors may call this method. It fails at
- * runtime if executed by a reactor.
*/
def receive[a](f: PartialFunction[Any, a]): a =
self.in.receive(f)
@@ -99,9 +76,6 @@ object Actor {
<code>f</code> can be received. If no message could be
received the <code>TIMEOUT</code> action is executed if
specified.
-
- Only (thread-based) actors may call this method. It fails at
- runtime if executed by a reactor.
*/
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R =
self.in.receiveWithin(msec)(f)
@@ -303,14 +277,14 @@ trait Actor extends OutputChannel[Any] {
private[actors] def pushSender(sender: Actor): unit
private[actors] def popSender(): unit
- private[actors] var suspendActor: () => unit = _
- private[actors] var suspendActorFor: long => unit = _
- private[actors] var detachActor: PartialFunction[Any, unit] => unit = _
+ private[actors] var suspendActor: () => Unit = _
+ private[actors] var suspendActorFor: long => Unit = _
+ private[actors] var resumeActor: () => Unit = _
+ private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
private[actors] var kill: () => Unit = _
private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)
private[actors] def tick(): Unit
- private[actors] def isThreaded: boolean
private[actors] def resetActor(): unit
resetActor()
@@ -330,7 +304,7 @@ trait Actor extends OutputChannel[Any] {
Links <code>self</code> to actor defined by <code>body</code>.
*/
def link(body: => Unit): Actor = {
- val actor = new ActorThread {
+ val actor = new Reactor {
def act() = body
}
link(actor)
@@ -420,60 +394,12 @@ case class Exit(from: Actor, reason: String)
/**
- * This class provides an implementation for actors based on
- * threads. To be able to create instances of this class, the
- * inherited abstract method <code>act()</code> has to be
- * implemented. Note that the preferred way of creating
- * thread-based actors is through the <code>actor</code> method
- * defined in object <code>Actor</code>.
- *
- * @author Philipp Haller
- */
-abstract class ActorThread extends Thread with ThreadedActor {
- override def run(): Unit = {
- try {
- act()
- if (isInterrupted())
- throw new InterruptedException
- kill()
- if (isInterrupted())
- throw new InterruptedException
- exit("normal")
- }
- catch {
- case ie: InterruptedException =>
- exitLinked()
- case t: Throwable =>
- exitLinked(t.toString())
- }
- }
-
- /**
- Terminates execution of <code>self</code> with the following
- effect on linked actors:
-
- For each linked actor <code>a</code> with
- <code>trapExit</code> set to <code>true</code>, send message
- <code>Exit(self, reason)</code> to <code>a</code>.
-
- For each linked actor <code>a</code> with
- <code>trapExit</code> set to <code>false</code> (default),
- call <code>a.exit(reason)</code> if
- <code>!reason.equals("normal")</code>.
- */
- def exit(reason: String): Unit = {
- exitReason = reason
- interrupt()
- }
-}
-
-/**
* This class provides a dynamic actor proxy for normal Java
* threads.
*
* @author Philipp Haller
*/
-private[actors] class ActorProxy(t: Thread) extends ThreadedActor {
+private[actors] class ActorProxy(t: Thread) extends Reactor {
def act(): Unit = {}
/**
Terminates execution of <code>self</code> with the following
@@ -488,7 +414,7 @@ private[actors] class ActorProxy(t: Thread) extends ThreadedActor {
call <code>a.exit(reason)</code> if
<code>!reason.equals("normal")</code>.
*/
- def exit(reason: String): Unit = {
+ override def exit(reason: String): Unit = {
exitReason = reason
t.interrupt()
}
@@ -496,105 +422,6 @@ private[actors] class ActorProxy(t: Thread) extends ThreadedActor {
/**
- This object provides methods for creating, registering, and
- selecting remotely accessible actors.
-
- A remote actor is typically created like this:
- <pre>
- actor {
- alive(9010)
- register('myName, self)
-
- // behavior
- }
- </pre>
-
- It can be accessed by an actor running on a (possibly)
- different node by selecting it in the following way:
- <pre>
- actor {
- // ...
- <b>val</b> c = select(TcpNode("127.0.0.1", 9010), 'myName)
- c ! msg
- // ...
- }
- </pre>
-
- @author Philipp Haller
- */
-object RemoteActor {
- import remote.NetKernel
- import remote.TcpService
-
- private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel]
-
- /**
- * Makes <code>self</code> remotely accessible on TCP port
- * <code>port</code>.
- */
- def alive(port: int): Unit = {
- val serv = new TcpService(port)
- serv.start()
- kernels += Actor.self -> serv.kernel
- }
-
- /**
- * Registers <code>a</code> under <code>name</code> on this
- * node.
- */
- def register(name: Symbol, a: Actor): Unit = {
- val kernel = kernels.get(Actor.self) match {
- case None => {
- val serv = new TcpService(TcpService.generatePort)
- serv.start()
- kernels += Actor.self -> serv.kernel
- serv.kernel
- }
- case Some(k) => k
- }
- kernel.register(name, a)
- }
-
- /**
- * Returns (a proxy for) the actor registered under
- * <code>name</code> on <code>node</code>.
- */
- def select(node: Node, name: Symbol): Actor =
- new Reactor {
- def act(): Unit = {}
- override def !(msg: Any): Unit = msg match {
- case a: AnyRef => {
- // establish remotely accessible
- // return path (sender)
- val kernel = kernels.get(Actor.self) match {
- case None => {
- val serv = new TcpService(TcpService.generatePort)
- serv.start()
- kernels += Actor.self -> serv.kernel
- serv.kernel
- }
- case Some(k) => k
- }
- kernel.send(node, name, a)
- }
- case other =>
- error("Cannot send non-AnyRef value remotely.")
- }
- override def !?(msg: Any): Any =
- error("!? not implemented for remote actors.")
- }
-}
-
-
-/**
- * This class represents a machine node on a TCP network.
- *
- * @author Philipp Haller
- */
-case class Node(address: String, port: Int)
-
-
-/**
* <p>
* This class is used by our efficient message queue
* implementation.
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 4bb4205614..e7c800a8f2 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -34,12 +34,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private[actors] var receiver: Actor = synchronized {
// basically Actor.self, but can be null
val t = Thread.currentThread()
- if (t.isInstanceOf[ActorThread])
- t.asInstanceOf[ActorThread]
- else {
- val a = Actor.selfs.get(t).asInstanceOf[Actor]
- a
- }
+ val a = Actor.selfs.get(t).asInstanceOf[Actor]
+ a
}
private var received: Msg = _
@@ -48,7 +44,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private var waitingFor: Msg => boolean = waitingForNone
private var waitingForSender: Actor = null
- private val messageQueue = new MessageQueue[Msg]
+ //private val messageQueue = new MessageQueue[Msg]
+ private val mailbox = new scala.collection.mutable.Queue[Pair[Msg, Actor]]
private def send(msg: Msg, sender: Actor) = receiver.synchronized {
receiver.tick()
@@ -67,9 +64,13 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
}
- receiver.scheduleActor(null, msg)
+ if (isSuspended)
+ receiver.resumeActor()
+ else
+ receiver.scheduleActor(null, msg)
} else {
- messageQueue.append(msg, sender)
+ //messageQueue.append(msg, sender)
+ mailbox += Pair(msg, sender)
}
}
@@ -96,15 +97,18 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
*/
def forward(msg: Msg): unit = send(msg, receiver.sender)
+ private var isSuspended = false
+
/**
* Receives a message from this <code>Channel</code>.
*/
def receive[R](f: PartialFunction[Msg, R]): R = {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
- assert(receiver.isThreaded, "receive invoked from reactor")
+ //assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
+/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
received = q.msg
@@ -112,8 +116,28 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
}
// acquire lock because we might call wait()
else synchronized {
+ isSuspended = true
receiver.suspendActor()
}
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1)
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = msg
+ receiver.pushSender(sender)
+ }
+ case None => {
+ // acquire lock because we might call wait()
+ this.synchronized {
+ isSuspended = true
+ receiver.suspendActor()
+ }
+ }
+ }
+
+ isSuspended = false
waitingFor = waitingForNone
}
receiver.resetActor()
@@ -124,11 +148,12 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
- assert(receiver.isThreaded, "receive invoked from reactor")
+ //assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
waitingForSender = r
+/*
var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => {
waitingFor(item.msg) && item.sender == r
})
@@ -137,8 +162,28 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver.pushSender(q.sender)
}
else synchronized {
+ isSuspended = true
receiver.suspendActor()
}
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1) && p._2 == r
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = msg
+ receiver.pushSender(sender)
+ }
+ case None => {
+ // acquire lock because we might call wait()
+ this.synchronized {
+ isSuspended = true
+ receiver.suspendActor()
+ }
+ }
+ }
+
+ isSuspended = false
waitingFor = waitingForNone
waitingForSender = null
}
@@ -156,19 +201,22 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
*/
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
assert(Actor.self == receiver, "receive from channel belonging to other actor")
- assert(receiver.isThreaded, "receive invoked from reactor")
+ //assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
+/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
received = q.msg
receiver.pushSender(q.sender)
}
else synchronized {
+ isSuspended = true
receiver.suspendActorFor(msec)
if (received == null)
if (f.isDefinedAt(TIMEOUT)) {
+ isSuspended = false
receiver.resetActor()
val result = f(TIMEOUT)
return result
@@ -176,6 +224,34 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
else
error("unhandled timeout")
}
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1)
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = msg
+ receiver.pushSender(sender)
+ }
+ case None => {
+ // acquire lock because we might call wait()
+ this.synchronized {
+ isSuspended = true
+ receiver.suspendActorFor(msec)
+ if (received == null)
+ if (f.isDefinedAt(TIMEOUT)) {
+ isSuspended = false
+ receiver.resetActor()
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+ }
+ }
+
+ isSuspended = false
waitingFor = waitingForNone
}
receiver.resetActor()
@@ -192,6 +268,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
+/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
received = q.msg
@@ -202,6 +279,24 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
else synchronized {
receiver.detachActor(f)
}
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1)
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = msg
+ receiver.pushSender(sender)
+ waitingFor = waitingForNone
+ receiver.scheduleActor(f, received)
+ }
+ case None => {
+ this.synchronized {
+ receiver.detachActor(f)
+ }
+ }
+ }
+
throw new SuspendActorException
}
}
@@ -214,6 +309,7 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver.synchronized {
receiver.tick()
waitingFor = f.isDefinedAt
+/*
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
received = q.msg
@@ -226,6 +322,26 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
receiver.asInstanceOf[Reactor].timeoutPending = true
receiver.detachActor(f)
}
+*/
+
+ mailbox.dequeueFirst((p: Pair[Msg, Actor]) => {
+ waitingFor(p._1)
+ }) match {
+ case Some(Pair(msg, sender)) => {
+ received = msg
+ receiver.pushSender(sender)
+ waitingFor = waitingForNone
+ receiver.scheduleActor(f, received)
+ }
+ case None => {
+ this.synchronized {
+ TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec)
+ receiver.asInstanceOf[Reactor].timeoutPending = true
+ receiver.detachActor(f)
+ }
+ }
+ }
+
throw new SuspendActorException
}
}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 015dbd82f2..aa16eea7ac 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -20,12 +20,16 @@ package scala.actors
* @author Philipp Haller
*/
trait Reactor extends Actor {
- private var lastSender: Actor = null
- private[actors] def sender: Actor = lastSender
- private[actors] def pushSender(sender: Actor): Unit = lastSender = sender
- private[actors] def popSender(): Unit = lastSender = null
- private[actors] def isThreaded = false
+ private val lastSenders = new scala.collection.mutable.Stack[Actor]
+
+ private[actors] def sender: Actor = {
+ if (lastSenders.isEmpty) null
+ else lastSenders.top
+ }
+
+ private[actors] def pushSender(s: Actor) = { lastSenders.push(s) }
+ private[actors] def popSender(): Unit = { lastSenders.pop }
private[actors] var continuation: PartialFunction[Any, Unit] = null
private[actors] var timeoutPending = false
@@ -51,9 +55,10 @@ trait Reactor extends Actor {
}
private[actors] def resetActor(): Unit = {
+ suspendActor = () => wait()
+ suspendActorFor = (msec: long) => wait(msec)
+ resumeActor = () => notify()
detachActor = defaultDetachActor
- suspendActor = () => error("suspendActor called on reactor.")
- suspendActorFor = (msec: long) => error("suspendActorFor called on reactor.")
kill = () => {}
}
diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala
index 7059b70f3e..8f6f53d025 100644
--- a/src/actors/scala/actors/ThreadedActor.scala
+++ b/src/actors/scala/actors/ThreadedActor.scala
@@ -36,6 +36,7 @@ trait ThreadedActor extends Actor {
private[actors] def resetActor() = {
suspendActor = () => wait()
suspendActorFor = (msec: long) => wait(msec)
+ resumeActor = () => notify()
detachActor = (f: PartialFunction[Any, Unit]) => wait()
kill = () => {}
}
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
index 1a621005b7..d6705fccd7 100644
--- a/src/actors/scala/actors/remote/NetKernel.scala
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -45,7 +45,7 @@ class NetKernel(service: Service) {
actors.get(receiver) match {
case Some(a) => {
val msg = service.serializer.deserialize(data)
- val senderProxy = new ActorThread {
+ val senderProxy = new Reactor {
def act() = { a ! msg }
override def !(msg: Any): Unit = {
msg match {
diff --git a/src/actors/scala/actors/remote/RemoteActor.scala b/src/actors/scala/actors/remote/RemoteActor.scala
new file mode 100644
index 0000000000..0984834623
--- /dev/null
+++ b/src/actors/scala/actors/remote/RemoteActor.scala
@@ -0,0 +1,97 @@
+package scala.actors.remote
+
+/**
+ This object provides methods for creating, registering, and
+ selecting remotely accessible actors.
+
+ A remote actor is typically created like this:
+ <pre>
+ actor {
+ alive(9010)
+ register('myName, self)
+
+ // behavior
+ }
+ </pre>
+
+ It can be accessed by an actor running on a (possibly)
+ different node by selecting it in the following way:
+ <pre>
+ actor {
+ // ...
+ <b>val</b> c = select(TcpNode("127.0.0.1", 9010), 'myName)
+ c ! msg
+ // ...
+ }
+ </pre>
+
+ @author Philipp Haller
+ */
+object RemoteActor {
+
+ private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel]
+
+ /**
+ * Makes <code>self</code> remotely accessible on TCP port
+ * <code>port</code>.
+ */
+ def alive(port: int): Unit = {
+ val serv = new TcpService(port)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ }
+
+ /**
+ * Registers <code>a</code> under <code>name</code> on this
+ * node.
+ */
+ def register(name: Symbol, a: Actor): Unit = {
+ val kernel = kernels.get(Actor.self) match {
+ case None => {
+ val serv = new TcpService(TcpService.generatePort)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ serv.kernel
+ }
+ case Some(k) => k
+ }
+ kernel.register(name, a)
+ }
+
+ /**
+ * Returns (a proxy for) the actor registered under
+ * <code>name</code> on <code>node</code>.
+ */
+ def select(node: Node, name: Symbol): Actor =
+ new Reactor {
+ def act(): Unit = {}
+ override def !(msg: Any): Unit = msg match {
+ case a: AnyRef => {
+ // establish remotely accessible
+ // return path (sender)
+ val kernel = kernels.get(Actor.self) match {
+ case None => {
+ val serv = new TcpService(TcpService.generatePort)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ serv.kernel
+ }
+ case Some(k) => k
+ }
+ kernel.send(node, name, a)
+ }
+ case other =>
+ error("Cannot send non-AnyRef value remotely.")
+ }
+ override def !?(msg: Any): Any =
+ error("!? not implemented for remote actors.")
+ }
+}
+
+
+/**
+ * This class represents a machine node on a TCP network.
+ *
+ * @author Philipp Haller
+ */
+case class Node(address: String, port: Int)