path: root/src
diff options
authorPhilipp Haller <>2006-07-25 18:06:22 +0000
committerPhilipp Haller <>2006-07-25 18:06:22 +0000
commitb41132eeb39f91b428ddee6fa85e9f354d0fceb1 (patch)
tree560bd9a3cb4e3e1f66ec7eb1042d5a0ddd641506 /src
parente753bc53ac989970c0c73f1bffcb71049eafb0b3 (diff)
Added thread-based actors to scala.actors.
Diffstat (limited to 'src')
4 files changed, 347 insertions, 0 deletions
diff --git a/src/actors/scala/actors/threads/Actor.scala b/src/actors/scala/actors/threads/Actor.scala
new file mode 100644
index 0000000000..eb31782733
--- /dev/null
+++ b/src/actors/scala/actors/threads/Actor.scala
@@ -0,0 +1,35 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+// $Id: Actor.scala 7955 2006-06-22 13:55:30Z michelou $
+package scala.actors.threads
+ * The class <code>Actor</code> ...
+ *
+ * @author Martin Odersky
+ * @version 1.0
+ */
+trait Actor[T] extends Thread with scala.actors.Actor[T] {
+ private val in = new MailBox
+ def !(msg: T): Unit =
+ in.send(msg)
+ def send(msg: T): Unit =
+ in.send(msg)
+ def receive[a](f: PartialFunction[Any, a]): a =
+ if (Thread.currentThread() == this) in.receive(f)
+ else error("receive called not on own process")
+ def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a =
+ if (Thread.currentThread() == this) in.receiveWithin(msec)(f)
+ else error("receiveWithin called not on own process")
diff --git a/src/actors/scala/actors/threads/MailBox.scala b/src/actors/scala/actors/threads/MailBox.scala
new file mode 100644
index 0000000000..859254d826
--- /dev/null
+++ b/src/actors/scala/actors/threads/MailBox.scala
@@ -0,0 +1,176 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+// $Id: MailBox.scala 6679 2006-03-10 16:09:08Z odersky $
+package scala.actors.threads;
+ * The class <code>MailBox</code> ...
+ *
+ * @author Martin Odersky
+ * @version 1.0
+ */
+class MailBox extends AnyRef with ListQueueCreator {
+ type Message = Any;
+ private abstract class PreReceiver {
+ var msg: Message = null;
+ def isDefinedAt(msg: Message): boolean;
+ }
+ private class Receiver[a](receiver: PartialFunction[Message, a]) extends PreReceiver {
+ def isDefinedAt(msg: Message) = receiver.isDefinedAt(msg);
+ def receive(): a = synchronized {
+ while (msg == null) wait();
+ receiver(msg)
+ }
+ def receiveWithin(msec: long): a = synchronized {
+ if (msg == null) wait(msec);
+ receiver(if (msg != null) msg else TIMEOUT)
+ }
+ }
+ private val messageQueue = queueCreate[Message];
+ private val receiverQueue = queueCreate[PreReceiver];
+ /** Unconsumed messages. */
+ private var sent = messageQueue.make;
+ /** Pending receivers. */
+ private var receivers = receiverQueue.make;
+ /**
+ * Check whether the receiver can be applied to an unconsumed message.
+ * If yes, the message is extracted and associated with the receiver.
+ * Otherwise the receiver is appended to the list of pending receivers.
+ */
+ private def scanSentMsgs[a](receiver: Receiver[a]): unit = synchronized {
+ messageQueue.extractFirst(sent, msg => receiver.isDefinedAt(msg)) match {
+ case None => receivers = receiverQueue.append(receivers, receiver)
+ case Some(Pair(msg, withoutMsg)) => {
+ sent = withoutMsg;
+ receiver.msg = msg
+ }
+ }
+ }
+ /**
+ * First check whether a pending receiver is applicable to the sent
+ * message. If yes, the receiver is notified. Otherwise the message
+ * is appended to the linked list of sent messages.
+ */
+ def send(msg: Message): unit = synchronized {
+ receiverQueue.extractFirst(receivers, r => r.isDefinedAt(msg)) match {
+ case None => sent = messageQueue.append(sent, msg)
+ case Some(Pair(receiver, withoutReceiver)) => {
+ receivers = withoutReceiver;
+ receiver.msg = msg;
+ receiver synchronized { receiver.notify() };
+ }
+ }
+ }
+ /**
+ * Block until there is a message in the mailbox for which the processor
+ * <code>f</code> is defined.
+ */
+ def receive[a](f: PartialFunction[Message, a]): a = {
+ val r = new Receiver(f);
+ scanSentMsgs(r);
+ r.receive()
+ }
+ /**
+ * Block until there is a message in the mailbox for which the processor
+ * <code>f</code> is defined or the timeout is over.
+ */
+ def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a = {
+ val r = new Receiver(f);
+ scanSentMsgs(r);
+ r.receiveWithin(msec)
+ }
+* Module for dealing with queues.
+trait QueueModule[a] {
+ /** Type of queues. */
+ type t;
+ /** Create an empty queue. */
+ def make: t;
+ /** Append an element to a queue. */
+ def append(l: t, x: a): t;
+ /** Extract an element satisfying a predicate from a queue. */
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]];
+/** Inefficient but simple queue module creator. */
+trait ListQueueCreator {
+ def queueCreate[a]: QueueModule[a] = new QueueModule[a] {
+ type t = List[a];
+ def make: t = Nil;
+ def append(l: t, x: a): t = l ::: x :: Nil;
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] =
+ l match {
+ case Nil => None
+ case head :: tail =>
+ if (p(head))
+ Some(Pair(head, tail))
+ else
+ extractFirst(tail, p) match {
+ case None => None
+ case Some(Pair(x, without_x)) => Some(Pair(x, head :: without_x))
+ }
+ }
+ }
+/** Efficient queue module creator based on linked lists. */
+trait LinkedListQueueCreator {
+ import scala.collection.mutable.LinkedList;
+ def queueCreate[a >: Null <: AnyRef]: QueueModule[a] = new QueueModule[a] {
+ type t = Pair[LinkedList[a], LinkedList[a]]; // fst = the list, snd = last elem
+ def make: t = {
+ val l = new LinkedList[a](null, null);
+ Pair(l, l)
+ }
+ def append(l: t, x: a): t = {
+ val atTail = new LinkedList(x, null);
+ l._2 append atTail;
+ Pair(l._1, atTail)
+ }
+ def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = {
+ var xs = l._1;
+ var xs1 =;
+ while (xs1 != null && !p(xs1.elem)) {
+ xs = xs1;
+ xs1 =;
+ }
+ if (xs1 != null) {
+ =;
+ if ( == null)
+ Some(Pair(xs1.elem, Pair(l._1, xs)))
+ else
+ Some(Pair(xs1.elem, l))
+ }
+ else
+ None
+ }
+ }
diff --git a/src/actors/scala/actors/threads/NameServer.scala b/src/actors/scala/actors/threads/NameServer.scala
new file mode 100644
index 0000000000..a889411ad2
--- /dev/null
+++ b/src/actors/scala/actors/threads/NameServer.scala
@@ -0,0 +1,37 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+// $Id: NameServer.scala 5889 2006-03-05 00:33:02Z mihaylov $
+package scala.actors.threads;
+object NameServer {
+ val names = new scala.collection.mutable.HashMap[Symbol, Process];
+ def register(name: Symbol, proc: Process) = {
+ if (names.contains(name)) error("Name:" + name + " already registred");
+ names += name -> proc;
+ }
+ def unregister(name: Symbol) = {
+ if (names.contains(name))
+ names -= name;
+ else
+ error("Name:" + name + " not registred");
+ }
+ def whereis(name: Symbol): Option[Process] =
+ names.get(name);
+ def send(name: Symbol, msg: Any) =
+ names(name).send(msg);
diff --git a/src/actors/scala/actors/threads/Process.scala b/src/actors/scala/actors/threads/Process.scala
new file mode 100644
index 0000000000..e5d9b8f53e
--- /dev/null
+++ b/src/actors/scala/actors/threads/Process.scala
@@ -0,0 +1,99 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+// $Id: Process.scala 7931 2006-06-20 16:34:51Z odersky $
+package scala.actors.threads;
+ * The object <code>Process</code> ...
+ *
+ * @author Martin Odersky
+ * @version 1.0
+ */
+object Process {
+ def spawn(body: => Unit): Process = {
+ val p = new Process {
+ override def run(): Unit = body
+ }
+ p.start();
+ p
+ }
+ def spawnLink(body: => Unit): Process = {
+ val p = new Process {
+ override def run(): Unit = body
+ }
+ p.start()
+ p
+ }
+ def send(p: Process, msg: Any) =
+ p ! msg
+ def receive[a](f: PartialFunction[Any, a]): a =
+ self.receive(f);
+ def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a =
+ self.receiveWithin(msec)(f);
+ def self: Process =
+ if (Thread.currentThread().isInstanceOf[Process])
+ Thread.currentThread().asInstanceOf[Process]
+ else
+ error("Self called outside a process");
+ def exit(p: Process, reason: Symbol) =
+ p.exit(reason);
+ * The class <code>Process</code> ...
+ *
+ * @author Martin Odersky
+ * @version 1.0
+ */
+class Process extends Actor[Any] with scala.actors.Process {
+ private var exitReason: AnyRef = null;
+ private var links: List[scala.actors.Process] = Nil;
+ override def start(): Unit = try { run() }
+ catch {
+ case _: java.lang.InterruptedException =>
+ signal(exitReason)
+ case (exitSignal) =>
+ signal(exitSignal)
+ }
+ private def signal(s: Any) =
+ links.foreach { p: scala.actors.Process => p ! Triple('EXIT, this, s) }
+ def link(p: scala.actors.Process): Unit =
+ links = p::links;
+ def linkTo(to: scala.actors.Process): Unit =
+ links = to::links;
+ // TODO
+ def unlink(from: scala.actors.Process): Unit = {}
+ // TODO
+ def unlinkFrom(from: scala.actors.Process): Unit = {}
+ def exit(reason: Symbol): Unit = {
+ exitReason = reason;
+ interrupt()
+ }
+ def exit(from: scala.actors.Process, reason: Symbol): Unit =
+ from.exit(reason)