From 0586e1b756b90f628a2629f89b2e6a353de0e60e Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Fri, 29 Sep 2006 11:28:12 +0000 Subject: Removed old actors lib. --- src/actors/scala/actors/Actor.scala | 18 - src/actors/scala/actors/Debug.scala | 48 -- src/actors/scala/actors/Done.scala | 19 - src/actors/scala/actors/Process.scala | 21 - src/actors/scala/actors/Reactions.scala | 65 --- src/actors/scala/actors/TIMEOUT.scala | 11 - .../scala/actors/distributed/JXTAServiceBase.scala | 22 - .../scala/actors/distributed/JavaSerializer.scala | 39 -- .../scala/actors/distributed/MessagesComb.scala | 48 -- src/actors/scala/actors/distributed/Name.scala | 20 - .../scala/actors/distributed/NetKernel.scala | 597 --------------------- src/actors/scala/actors/distributed/Node.scala | 15 - src/actors/scala/actors/distributed/NodeComb.scala | 25 - .../scala/actors/distributed/RemoteActor.scala | 173 ------ .../scala/actors/distributed/RemotePid.scala | 195 ------- .../scala/actors/distributed/Serializer.scala | 58 -- src/actors/scala/actors/distributed/Service.scala | 83 --- .../scala/actors/distributed/SystemMessage.scala | 33 -- .../actors/distributed/TcpSerializerComb.scala | 135 ----- .../scala/actors/distributed/TcpService.scala | 226 -------- .../actors/distributed/TcpServiceWorker.scala | 99 ---- src/actors/scala/actors/distributed/Util.scala | 70 --- src/actors/scala/actors/gui/Button.scala | 20 - src/actors/scala/actors/gui/Caret.scala | 8 - src/actors/scala/actors/gui/Component.scala | 9 - src/actors/scala/actors/gui/Container.scala | 18 - src/actors/scala/actors/gui/EmptyBorder.scala | 8 - .../scala/actors/gui/FormattedTextField.scala | 9 - src/actors/scala/actors/gui/Frame.scala | 33 -- src/actors/scala/actors/gui/GUIApplication.scala | 20 - src/actors/scala/actors/gui/Label.scala | 14 - src/actors/scala/actors/gui/MainFrame.scala | 14 - src/actors/scala/actors/gui/Orientation.scala | 11 - src/actors/scala/actors/gui/Panel.scala | 15 - src/actors/scala/actors/gui/Publisher.scala | 123 ----- .../scala/actors/gui/SimpleGUIApplication.scala | 14 - src/actors/scala/actors/gui/SwingComponent.scala | 11 - src/actors/scala/actors/gui/TextComponent.scala | 22 - src/actors/scala/actors/gui/TextField.scala | 22 - .../scala/actors/gui/event/ButtonPressed.scala | 3 - .../scala/actors/gui/event/CaretUpdate.scala | 3 - src/actors/scala/actors/gui/event/Event.scala | 3 - .../scala/actors/gui/event/MouseDragged.scala | 3 - src/actors/scala/actors/gui/event/MouseEvent.scala | 3 - src/actors/scala/actors/gui/event/MouseMoved.scala | 3 - .../scala/actors/gui/event/TextModified.scala | 3 - .../scala/actors/gui/event/WindowActivated.scala | 3 - .../scala/actors/gui/event/WindowClosed.scala | 3 - .../scala/actors/gui/event/WindowClosing.scala | 3 - .../scala/actors/gui/event/WindowDeactivated.scala | 3 - .../scala/actors/gui/event/WindowDeiconified.scala | 3 - .../scala/actors/gui/event/WindowEvent.scala | 5 - .../scala/actors/gui/event/WindowIconified.scala | 3 - .../scala/actors/gui/event/WindowOpened.scala | 3 - src/actors/scala/actors/gui/layout.scala | 12 - src/actors/scala/actors/multi/Actor.scala | 25 - src/actors/scala/actors/multi/MailBox.scala | 182 ------- src/actors/scala/actors/multi/Process.scala | 265 --------- src/actors/scala/actors/multi/ReceiverTask.scala | 27 - src/actors/scala/actors/multi/Scheduler.scala | 262 --------- src/actors/scala/actors/multi/TimerThread.scala | 162 ------ src/actors/scala/actors/multi/WorkerThread.scala | 44 -- src/actors/scala/actors/single/Actor.scala | 25 - src/actors/scala/actors/single/MailBox.scala | 157 ------ src/actors/scala/actors/single/Process.scala | 77 --- src/actors/scala/actors/threads/Actor.scala | 35 -- src/actors/scala/actors/threads/MailBox.scala | 176 ------ src/actors/scala/actors/threads/NameServer.scala | 37 -- src/actors/scala/actors/threads/Process.scala | 99 ---- 69 files changed, 4023 deletions(-) delete mode 100644 src/actors/scala/actors/Actor.scala delete mode 100644 src/actors/scala/actors/Debug.scala delete mode 100644 src/actors/scala/actors/Done.scala delete mode 100644 src/actors/scala/actors/Process.scala delete mode 100644 src/actors/scala/actors/Reactions.scala delete mode 100644 src/actors/scala/actors/TIMEOUT.scala delete mode 100644 src/actors/scala/actors/distributed/JXTAServiceBase.scala delete mode 100644 src/actors/scala/actors/distributed/JavaSerializer.scala delete mode 100644 src/actors/scala/actors/distributed/MessagesComb.scala delete mode 100644 src/actors/scala/actors/distributed/Name.scala delete mode 100644 src/actors/scala/actors/distributed/NetKernel.scala delete mode 100644 src/actors/scala/actors/distributed/Node.scala delete mode 100644 src/actors/scala/actors/distributed/NodeComb.scala delete mode 100644 src/actors/scala/actors/distributed/RemoteActor.scala delete mode 100644 src/actors/scala/actors/distributed/RemotePid.scala delete mode 100644 src/actors/scala/actors/distributed/Serializer.scala delete mode 100644 src/actors/scala/actors/distributed/Service.scala delete mode 100644 src/actors/scala/actors/distributed/SystemMessage.scala delete mode 100644 src/actors/scala/actors/distributed/TcpSerializerComb.scala delete mode 100644 src/actors/scala/actors/distributed/TcpService.scala delete mode 100644 src/actors/scala/actors/distributed/TcpServiceWorker.scala delete mode 100644 src/actors/scala/actors/distributed/Util.scala delete mode 100644 src/actors/scala/actors/gui/Button.scala delete mode 100644 src/actors/scala/actors/gui/Caret.scala delete mode 100644 src/actors/scala/actors/gui/Component.scala delete mode 100644 src/actors/scala/actors/gui/Container.scala delete mode 100644 src/actors/scala/actors/gui/EmptyBorder.scala delete mode 100644 src/actors/scala/actors/gui/FormattedTextField.scala delete mode 100644 src/actors/scala/actors/gui/Frame.scala delete mode 100644 src/actors/scala/actors/gui/GUIApplication.scala delete mode 100644 src/actors/scala/actors/gui/Label.scala delete mode 100644 src/actors/scala/actors/gui/MainFrame.scala delete mode 100644 src/actors/scala/actors/gui/Orientation.scala delete mode 100644 src/actors/scala/actors/gui/Panel.scala delete mode 100644 src/actors/scala/actors/gui/Publisher.scala delete mode 100644 src/actors/scala/actors/gui/SimpleGUIApplication.scala delete mode 100644 src/actors/scala/actors/gui/SwingComponent.scala delete mode 100644 src/actors/scala/actors/gui/TextComponent.scala delete mode 100644 src/actors/scala/actors/gui/TextField.scala delete mode 100644 src/actors/scala/actors/gui/event/ButtonPressed.scala delete mode 100644 src/actors/scala/actors/gui/event/CaretUpdate.scala delete mode 100644 src/actors/scala/actors/gui/event/Event.scala delete mode 100644 src/actors/scala/actors/gui/event/MouseDragged.scala delete mode 100644 src/actors/scala/actors/gui/event/MouseEvent.scala delete mode 100644 src/actors/scala/actors/gui/event/MouseMoved.scala delete mode 100644 src/actors/scala/actors/gui/event/TextModified.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowActivated.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowClosed.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowClosing.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowDeactivated.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowDeiconified.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowEvent.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowIconified.scala delete mode 100644 src/actors/scala/actors/gui/event/WindowOpened.scala delete mode 100644 src/actors/scala/actors/gui/layout.scala delete mode 100644 src/actors/scala/actors/multi/Actor.scala delete mode 100644 src/actors/scala/actors/multi/MailBox.scala delete mode 100644 src/actors/scala/actors/multi/Process.scala delete mode 100644 src/actors/scala/actors/multi/ReceiverTask.scala delete mode 100644 src/actors/scala/actors/multi/Scheduler.scala delete mode 100644 src/actors/scala/actors/multi/TimerThread.scala delete mode 100644 src/actors/scala/actors/multi/WorkerThread.scala delete mode 100644 src/actors/scala/actors/single/Actor.scala delete mode 100644 src/actors/scala/actors/single/MailBox.scala delete mode 100644 src/actors/scala/actors/single/Process.scala delete mode 100644 src/actors/scala/actors/threads/Actor.scala delete mode 100644 src/actors/scala/actors/threads/MailBox.scala delete mode 100644 src/actors/scala/actors/threads/NameServer.scala delete mode 100644 src/actors/scala/actors/threads/Process.scala (limited to 'src/actors') diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala deleted file mode 100644 index b99f281f3e..0000000000 --- a/src/actors/scala/actors/Actor.scala +++ /dev/null @@ -1,18 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors - -/** - * @author Philipp Haller - */ -trait Actor[T] { - def run(): Unit - def start(): Unit - def !(msg: T): Unit -} diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala deleted file mode 100644 index 81e1152597..0000000000 --- a/src/actors/scala/actors/Debug.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -/** - * @author Philipp Haller - */ -object Debug { - var lev = 2 - - def level = lev - def level_= (lev: int) = { - //Console.println("Setting debug level to " + lev) - this.lev = lev - } - - def info(s: String) = - if (lev > 2) System.out.println("Info: " + s) - def warning(s: String) = - if (lev > 1) System.err.println("Warning: " + s) - def error(s: String) = - if (lev > 0) System.err.println("Error: " + s) -} - -class Debug(tag: String) { - var lev = 2 - - def level = lev - def level_= (lev: int) = { - //Console.println("Setting debug level (" + tag + ") to " + lev) - this.lev = lev - } - - def info(s: String) = - if (lev > 2) System.out.println(tag + " (info): " + s) - def warning(s: String) = - if (lev > 1) System.err.println(tag + " (warn): " + s) - def error(s: String) = - if (lev > 0) System.err.println(tag + " (erro): " + s) -} diff --git a/src/actors/scala/actors/Done.scala b/src/actors/scala/actors/Done.scala deleted file mode 100644 index 2291df6399..0000000000 --- a/src/actors/scala/actors/Done.scala +++ /dev/null @@ -1,19 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -/** - * @author Philipp Haller - */ -class Done extends Throwable { - override def fillInStackTrace(): Throwable = - this; -} diff --git a/src/actors/scala/actors/Process.scala b/src/actors/scala/actors/Process.scala deleted file mode 100644 index 1f80da6218..0000000000 --- a/src/actors/scala/actors/Process.scala +++ /dev/null @@ -1,21 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors - -/** - * @author Philipp Haller - */ -trait Process extends Actor[Any] { - def link(to: Process): Unit - def linkTo(to: Process): Unit - def unlink(from: Process): Unit - def unlinkFrom(from: Process): Unit - def exit(reason: Symbol): Unit - def exit(from: Process, reason: Symbol): Unit -} diff --git a/src/actors/scala/actors/Reactions.scala b/src/actors/scala/actors/Reactions.scala deleted file mode 100644 index 2262ea2727..0000000000 --- a/src/actors/scala/actors/Reactions.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors - -/** An enclosing trait for reactions. - * Examples of reactions are: actor.receive's, event.handle's, etc - * - * @author Martin Odersky - * @version 1.0 - * - * @param m The input type of a Reaction; typically the type of messages or events. - */ -trait Reactions[m] { - - /** The partial function underlying a reaction. Note that this is formulated - * in CPS style. - */ - type Re[r] = PartialFunction[m, (r => unit) => unit] - - /** Activate the given partial function `f', for instance by reading - * a message or waiting for an event, and applying `f' to the result. - */ - def activate[r](f: Re[r]): (r => unit) => unit - - /** The class of reactions - * @param r The type of values returned by the reaction. - * (More precisely, the type of values passed to its continuation) - * @param fun The partial function underlying a reaction - */ - class Reaction[+r](private val fun: Re[r]) extends Responder[r] { - - def respond(k: r => unit): unit = activate(fun)(k) - - override def map[s](f: r => s) = new Reaction[s] ( - fun andThen { - result: ((r => unit) => unit) => - k: (s => unit) => result((x: r) => k(f(x))) - } - ) - - def flatMap[s](f: r => Reaction[s]) = new Reaction[s] ( - fun andThen { - result: ((r => unit) => unit) => - k: (s => unit) => result((x: r) => f(x).respond(k)) - } - ) - - override def filter(p: r => boolean) = new Reaction[r] ( - fun andThen { - result: ((r => unit) => unit) => - k: (r => unit) => result((x: r) => if (p(x)) k(x) else ()) - } - ) - - def orElse[r1 >: r](that: Reaction[r1]) = new Reaction[r1] ( - this.fun orElse that.fun - ) - } -} diff --git a/src/actors/scala/actors/TIMEOUT.scala b/src/actors/scala/actors/TIMEOUT.scala deleted file mode 100644 index ea2889be0e..0000000000 --- a/src/actors/scala/actors/TIMEOUT.scala +++ /dev/null @@ -1,11 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -package scala.actors - -case object TIMEOUT diff --git a/src/actors/scala/actors/distributed/JXTAServiceBase.scala b/src/actors/scala/actors/distributed/JXTAServiceBase.scala deleted file mode 100644 index 46c3ae7743..0000000000 --- a/src/actors/scala/actors/distributed/JXTAServiceBase.scala +++ /dev/null @@ -1,22 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -/** - * @author Philipp Haller - */ -abstract class JXTAServiceBase(nodename: String) extends Thread with Service { - val serializer = new JavaSerializer(this) - private val internalNode = new JXTANode(nodename) - def node: Node = internalNode - def createPid(actor: RemoteActor): RemotePid = - new JXTAPid(internalNode, makeUid, kernel, actor) -} diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala deleted file mode 100644 index f5d114e762..0000000000 --- a/src/actors/scala/actors/distributed/JavaSerializer.scala +++ /dev/null @@ -1,39 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io._ -import scala.io.BytePickle.SPU - -[serializable] -class JavaSerializer(serv: Service) extends Serializer(serv) { - val debug = true - - def log (s: String) = - if (debug) scala.Console.println("JAVASerializer: " + s) - - def serialize(o: AnyRef): Array[Byte] = { - val bos = new ByteArrayOutputStream() - val out = new ObjectOutputStream(bos) - out.writeObject(o) - out.flush() - bos.toByteArray() - } - - def deserialize(bytes: Array[Byte]): AnyRef = { - val bis = new ByteArrayInputStream(bytes) - val in = new ObjectInputStream(bis) - in.readObject() - } - - def pid: SPU[RemotePid] = null - def addRep(name: String, repCons: Serializer => AnyRef): Unit = {} -} diff --git a/src/actors/scala/actors/distributed/MessagesComb.scala b/src/actors/scala/actors/distributed/MessagesComb.scala deleted file mode 100644 index ea30e7e5b4..0000000000 --- a/src/actors/scala/actors/distributed/MessagesComb.scala +++ /dev/null @@ -1,48 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.io.BytePickle._ - -/** - * @author Philipp Haller - */ -object MessagesComb { - def sendPU(ser: Serializer): SPU[Send] = - wrap((p: Pair[RemotePid,Array[byte]]) => Send(p._1, p._2), - (s: Send) => Pair(s.rec, s.data), - pair(ser.pid, bytearray)); - - def namedSendPU: SPU[NamedSend] = - wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2), - (ns: NamedSend) => Pair(ns.sym, ns.data), - pair(symbolPU, bytearray)); - - def spawnPU(ser: Serializer): SPU[Spawn] = - wrap((p: Pair[RemotePid,String]) => Spawn(p._1, p._2), - (s: Spawn) => Pair(s.replyto, s.p), - pair(ser.pid, string)); - - def spawnObjectPU(ser: Serializer): SPU[SpawnObject] = - wrap((p: Pair[RemotePid,Array[byte]]) => SpawnObject(p._1, p._2), - (s: SpawnObject) => Pair(s.replyto, s.data), - pair(ser.pid, bytearray)); - - def exitPU(ser: Serializer): SPU[Exit1] = - wrap((p: Triple[RemotePid,RemotePid,Symbol]) => Exit1(p._1, p._2, p._3), - (e: Exit1) => Triple(e.from, e.to, e.reason), - triple(ser.pid, ser.pid, symbolPU)); - - def symbolPU: SPU[Symbol] = - wrap((s: String) => Symbol(s), - (sym: Symbol) => sym.name, - string); -} diff --git a/src/actors/scala/actors/distributed/Name.scala b/src/actors/scala/actors/distributed/Name.scala deleted file mode 100644 index 04c1845890..0000000000 --- a/src/actors/scala/actors/distributed/Name.scala +++ /dev/null @@ -1,20 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -/** - * @author Philipp Haller - */ -case class Name(node: Node, sym: Symbol, kernel: NetKernel) { - def !(msg: AnyRef): unit = { - kernel.namedSend(this, msg) - } -} diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala deleted file mode 100644 index 481ce94833..0000000000 --- a/src/actors/scala/actors/distributed/NetKernel.scala +++ /dev/null @@ -1,597 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io.{IOException,StringReader,StringWriter} -import java.lang.SecurityException -import java.net.UnknownHostException -import java.util.logging.{ConsoleHandler,Level,Logger} - -import scala.collection.mutable.{HashMap,HashSet} -import scala.actors.multi.{Actor,ExcHandlerDesc} - -case class RA(a: RemoteActor) - -object NetKernel { - var kernel: NetKernel = null -} - -/** - * @author Philipp Haller - */ -class NetKernel(service: Service) { - NetKernel.kernel = this - - // contains constructors - private val ptable = - new HashMap[String, () => RemoteActor] - - // maps local ids to scala.actors - private val rtable = - new HashMap[int, RemoteActor] - - // maps scala.actors to their RemotePid - private val pidTable = - new HashMap[RemoteActor, RemotePid] - - private var running = true; - - val logLevel = Level.FINE - val l = Logger.getLogger("NetKernel") - l.setLevel(logLevel) - val consHand = new ConsoleHandler - consHand.setLevel(logLevel) - l.addHandler(consHand) - - //start // start NetKernel - - /** only called if destDesc is local. */ - def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = - destDesc.p match { - case rpid: RemotePid => - (rtable get rpid.localId) match { - case Some(actor) => - actor.handleExc(destDesc, e) - case None => - error("exc desc refers to non-registered actor") - } - } - - def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = - // locality check (handler local to this node?) - destDesc.p match { - case rpid: RemotePid => - if (rpid.node == this.node) - handleExc(destDesc, e) - else - sendToNode(rpid.node, ForwardExc(destDesc, e)) - } - - def sendToNode(node: Node, msg: AnyRef) = { - //val sw = new StringWriter - val bytes = service.serializer.serialize(msg /*, sw*/) - service.send(node, bytes) - //service.send(node, sw.toString()) - } - - def addConstructor(key: String, value: () => RemoteActor) = - ptable.update(key, value); - - def node: Node = service.node - - def nodes: List[Node] = service.nodes - - def pidOf(actor: RemoteActor): RemotePid = synchronized { - pidTable.get(actor) match { - case None => error("malformed pid table in " + this) - case Some(pid) => pid - } - } - - def disconnectNode(n: Node) = synchronized { - service.disconnectNode(n) - } - - def getLocalRef(locId: Int): RemoteActor = - rtable.get(locId) match { - case None => - error("" + locId + " is not registered at " + this) - case Some(remoteActor: RemoteActor) => - remoteActor - } - - def localSend(localId: Int, msg: Any): Unit = synchronized { - rtable.get(localId) match { - case None => - error("" + localId + " is not registered at " + this) - case Some(remoteActor: RemoteActor) => - //Console.println("local send to " + remoteActor) - remoteActor send msg - } - } - - def localSend(pid: RemotePid, msg: Any): Unit = - localSend(pid.localId, msg) - - def remoteSend(pid: RemotePid, msg: Any) = synchronized { - //Console.println("NetKernel: Remote msg delivery to " + pid) - msg match { - case m: AnyRef => - service.remoteSend(pid, m) - } - } - - def namedSend(name: Name, msg: AnyRef): Unit = - if (name.node == this.node) { - // look-up name - nameTable.get(name.sym) match { - case None => - // message is lost - //Console.println("lost message " + msg + " because " + name + " not registered.") - case Some(localId) => localSend(localId, msg) - } - } - else { - // remote send - - // serialize msg - ///val sw = new StringWriter - val bytes = service.serializer.serialize(msg/*, sw*/) - - sendToNode(name.node, NamedSend(name.sym, bytes)) - //sendToNode(name.node, NamedSend(name.sym, sw.toString())) - } - - val nameTable = new HashMap[Symbol, Int] - - def registerName(name: Symbol, pid: RemotePid): Unit = synchronized { - nameTable += name -> pid.localId - } - - def registerName(name: Symbol, a: RemoteActor): Unit = synchronized { - val pid = register(a) - registerName(name, pid) - a.start - } - - /*override def run: unit = receive { - case ForwardExc(destDesc, e) => - // TODO - case Spawn(reply: RemotePid, pname) => - val newPid = spawn(pname) - // need to send back the Pid - remoteSend(reply, newPid) - run - - case SpawnObject(reply: RemotePid, data: Array[byte]) => - //val sr = new StringReader(data) - //service.serializer.deserialize(sr) match { - - service.serializer.deserialize(data) match { - case RA(actor) => - val newPid = register(actor) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - actor.start - // need to send back the Pid - remoteSend(reply, newPid) - } - run - - case NamedSend(sym: Symbol, data) => - // look-up name - nameTable.get(sym) match { - case None => - // message is lost - Console.println("lost message " + data + " because " + sym + " not registered.") - case Some(localId) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - - val msg = service.serializer.deserialize(data) - localSend(localId, msg) - } - run - - case Send(pid: RemotePid, data) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - val msg = service.serializer.deserialize(data) - - Console.println("locally send " + msg + " to " + pid) - localSend(pid, msg) - run - - case Link(from:RemotePid, to:RemotePid) => - // assume from is local - linkFromLocal(from, to) - run - - case UnLink(from:RemotePid, to:RemotePid) => - // assume from is local - unlinkFromLocal(from, to) - run - - case Exit1(from:RemotePid, to:RemotePid, reason) => - // check if "to" traps exit signals - // if so send a message - if (trapExits.contains(to.localId)) - // convert signal into message - localSend(to, Exit1(from, to, reason)) - else - if (reason.name.equals("normal")) { - // ignore signal - } - else - exit(from, to, reason) - run - }*/ - - // TODO - /*def isReachable(remoteNode: Node): boolean = { - val pingMsg = new Ping(node) - val sw = new StringWriter - service.serializer.serialize(pingMsg, sw) - service.send(remoteNode, sw.toString()) - }*/ - - def processMsg(msg: AnyRef): Unit = synchronized { - msg match { - case Spawn(reply: RemotePid, pname) => - val newPid = spawn(pname) - // need to send back the Pid - remoteSend(reply, newPid) - - case SpawnObject(reply: RemotePid, data) => - //val sr = new StringReader(data) - //service.serializer.deserialize(sr) match { - - service.serializer.deserialize(data) match { - case RA(actor) => - val newPid = register(actor) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - actor.start - // need to send back the Pid - remoteSend(reply, newPid) - } - - case NamedSend(sym: Symbol, data) => - // look-up name - nameTable.get(sym) match { - case None => - // message is lost - //Console.println("lost message " + msg + " because " + sym + " not registered.") - case Some(localId) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - val msg = service.serializer.deserialize(data) - localSend(localId, msg) - } - - case Send(pid: RemotePid, data) => - // deserialize data - //val sr = new StringReader(data) - //val msg = service.serializer.deserialize(sr) - val msg = service.serializer.deserialize(data) - //Console.println("locally send " + msg + " to " + pid) - localSend(pid, msg) - - case Link(from:RemotePid, to:RemotePid) => - // assume from is local - linkFromLocal(from, to) - - case UnLink(from:RemotePid, to:RemotePid) => - // assume from is local - unlinkFromLocal(from, to) - - case Exit1(from:RemotePid, to:RemotePid, reason) => - // check if "to" traps exit signals - // if so send a message - if (trapExits.contains(to.localId)) { - // convert signal into message - // TODO: simpler message (w/o to) for actor! - localSend(to, Exit1(from, to, reason)) - } - else { - if (reason.name.equals("normal")) { - // ignore signal - } - else - exit(from, to, reason) - } - } - } - - /* Registers an instance of a remote actor inside this NetKernel. - */ - def register(newProc: RemoteActor): RemotePid = synchronized { - newProc.kernel = this - val newPid = service.createPid(newProc) - rtable += newPid.localId -> newProc - pidTable += newProc -> newPid - newPid - } - - // local spawn - def spawn(pname: String): RemotePid = synchronized { - // get constructor out of table - (ptable.get(pname)) match { - case None => - //error("No constructor found. Cannot start process.") - //null - - val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor] - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - - case Some(cons) => - val newProc = cons() - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - } - } - - // local spawn - def spawn(name: String, arg: RemotePid): RemotePid = synchronized { - val newPid = spawn(name) - localSend(newPid, arg) - newPid - } - - // assume this.node != node - def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): Unit = { - val ra = RA(a) - //val rsw = new StringWriter - //service.serializer.serialize(ra, rsw) - val bytes = service.serializer.serialize(ra) - //sendToNode(node, SpawnObject(replyTo, rsw.toString())) - sendToNode(node, SpawnObject(replyTo, bytes)) - } - - - def registerSerializer(index: String, rep: Serializer => AnyRef) = { - service.serializer.addRep(index, rep) - - // send registering requests to remote nodes - } - - // remote spawn - def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized { - // check if actor is to be spawned locally - if (node == this.node) { - val newPid = spawn(name) - newPid - } - else { - sendToNode(node, Spawn(replyTo, name)) - null // pid needs to be sent back - } - } - - /* Spawns a new actor (locally), executing "fun". - */ - def spawn(fun: RemoteActor => Unit): RemotePid = synchronized { - val newProc = new RemoteActor { - override def run: unit = - fun(this); - } - - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - newProc.start - newPid - } - - /* Spawns a new actor (locally), executing "fun". - */ - def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized { - val newProc = new RemoteActor { - override def run: unit = - fun(this); - } - - // create Pid for remote communication and register process - val newPid = register(newProc) - //Console.println("Spawned a new " + newProc + " (" + newPid + ")") - - // link new process to pid (assume pid is local) - link(pid, newPid) - - newProc.start - newPid - } - - // maps local ids to their linked pids - private val links = new HashMap[int,HashSet[RemotePid]]; - - // which of the local processes traps exit signals? - private val trapExits = new HashSet[int]; - - def processFlag(pid: RemotePid, flag: Symbol, set: Boolean) = synchronized { - if (flag.name.equals("trapExit")) { - if (trapExits.contains(pid.localId) && !set) - trapExits -= pid.localId - else if (!trapExits.contains(pid.localId) && set) - trapExits += pid.localId - } - } - - // assume from.node == this.node - private def unlinkFromLocal(from: RemotePid, to: RemotePid): Unit = - links.get(from.localId) match { - case None => - // has no links -> ignore - case Some(set) => - set -= to - if (set.size == 0) links -= from.localId - }; - - /* - unlinks bi-directional link - assume from.node == this.node - */ - def unlink(from: RemotePid, to: RemotePid): Unit = synchronized { - unlinkFromLocal(from, to) - if (to.node == this.node) - unlinkFromLocal(to, from) - else - // (2) send message to NetKernel of "to" to unlink a - // uni-directional link from "to" to "from" - sendToNode(to.node, UnLink(to, from)) - } - - // assume from.node == this.node - private def linkFromLocal(from: RemotePid, to: RemotePid): Unit = - // TODO: send Exit to from if to is invalid - links.get(from.localId) match { - case None => - // from has no links, yet - val linksTo = new HashSet[RemotePid] - linksTo += to - links += from.localId -> linksTo - case Some(set) => - set += to - }; - - /* - creates bi-directional link - assume from.node == this.node - */ - def link(from: RemotePid, to: RemotePid): unit = synchronized { - // (1) create locally a uni-directional link - linkFromLocal(from, to) - if (to.node == this.node) - linkFromLocal(to, from) - else - // (2) send message to NetKernel of "to" to create a - // uni-directional link from "to" to "from" - sendToNode(to.node, Link(to, from)) - } - - // Assume "to" is local. - def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = { - // remove link - unlinkFromLocal(to, from) - exit(to, reason) - } - - val exitMarks = new HashSet[RemotePid] - - /* - If reason is unequal to 'normal then - this will cause all linked processes to - (transitively) terminate abnormally. - - Assume pid is local. - */ - def exit(pid: RemotePid, reason: Symbol): Unit = synchronized { - if (!(exitMarks contains pid)) { - exitMarks += pid // mark pid as exiting - //Console.println("" + pid + " is exiting (" + reason + ").") - - // first look-up remote actor in rtable - val actor = rtable(pid.localId) - // remove from table of running processes - rtable -= pid.localId - // remove from pid table - pidTable -= actor - - // send exit signals to linked processes - links.get(pid.localId) match { - case None => - //Console.println("no linked processes") - - case Some(set) => // set of remote pids that we want to terminate - //Console.println("sending exit signals to linked processes") - - val iter = set.elements - while (iter.hasNext) { - val linkedPid = iter.next - - unlinkFromLocal(pid, linkedPid) - - if (linkedPid.node == this.node) { - unlinkFromLocal(linkedPid, pid) - - if (trapExits.contains(linkedPid.localId)) - localSend(linkedPid, Exit1(pid, linkedPid, reason)) - else if (!reason.name.equals("normal")) - exit(linkedPid, reason) - } - else - sendToNode(linkedPid.node, - Exit1(pid, linkedPid, reason)) - } - exitMarks -= pid - } - } - } - - private val monNodes = - new HashMap[Node,HashMap[RemotePid,Int]] - - def monitorNode(client: RemotePid, mnode: Node, cond: Boolean) = synchronized { - monNodes.get(mnode) match { - case None => - // nobody is monitoring this node - if (cond) { - val map = new HashMap[RemotePid,int] - map += client -> 1 - monNodes += mnode -> map - } - case Some(map) => - map.update(client, map(client) + (if (cond) 1 else -1)) - } - - // if no connection exists: - // try connecting, if it fails deliver nodedown msg - if (cond && !service.isConnected(mnode)) { - try { - service.connect(mnode) - } - catch { - case uhe: UnknownHostException => - nodeDown(mnode) - case ioe: IOException => - nodeDown(mnode) - case se: SecurityException => - nodeDown(mnode) - } - } - } - - def nodeDown(mnode: Node) = - // send NodeDown msg to registered RemotePids - monNodes.get(mnode) match { - case None => - // nobody is monitoring this node - case Some(map) => - // iterate over keys (RemotePids of interested clients) - val iter = map.keys - while (iter.hasNext) { - val client = iter.next - for (val i <- List.range(0, map(client))) { - // send nodedown msg - client ! Pair(NodeDown(), mnode) - } - } - } - -} diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala deleted file mode 100644 index 1f28e65767..0000000000 --- a/src/actors/scala/actors/distributed/Node.scala +++ /dev/null @@ -1,15 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -[serializable] abstract class Node; -[serializable] case class TcpNode(address: String, port: Int) extends Node -[serializable] case class JXTANode(name: String) extends Node diff --git a/src/actors/scala/actors/distributed/NodeComb.scala b/src/actors/scala/actors/distributed/NodeComb.scala deleted file mode 100644 index 5866c87983..0000000000 --- a/src/actors/scala/actors/distributed/NodeComb.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.io.BytePickle._ - -/** - * @author Philipp Haller - */ -object NodeComb { - def tcpNodePU: SPU[TcpNode] = - wrap((p: Pair[String,int]) => TcpNode(p._1, p._2), - (n: TcpNode) => Pair(n.address, n.port), pair(string, nat)); - def jxtaNodePU: SPU[JXTANode] = - wrap((s: String) => JXTANode(s), - (n: JXTANode) => n.name, string); -} diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala deleted file mode 100644 index fc9764f63b..0000000000 --- a/src/actors/scala/actors/distributed/RemoteActor.scala +++ /dev/null @@ -1,173 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.actors.multi.{Process,ExcHandlerDesc} -import scala.collection.mutable.{HashMap,Stack} - -abstract class ServiceName -case class JXTA(groupName: String) extends ServiceName -case class TCP() extends ServiceName - -/** - * @author Philipp Haller - */ -class RemoteActor extends Process { - - override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { - // locality check (handler local to this actor?) - if (destDesc.p == this) - handleExc(destDesc, e) - else - kernel.forwardExc(destDesc, e) - } - - override def receive(f: PartialFunction[Any,Unit]): Nothing = { - if (isAlive) { - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - try { - f(msg) - } - catch { - case d: Done => - throw new Done - case t: Throwable => - if (!excHandlerDescs.isEmpty) - forwardExc(excHandlerDescs.top, t) - else - die(Symbol(t.toString())) - } - die() - case None => - continuation = f - Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - throw new Done - } - - var kernel: NetKernel = null - - def node = self.node - - def nodes = kernel.nodes - - private var selfCached: RemotePid = null - - def self: RemotePid = { - if (selfCached == null) - selfCached = kernel pidOf this - selfCached - } - - def serialize(index: String, rep: Serializer => AnyRef) = - kernel.registerSerializer(index, rep) - - def alive(s: ServiceName): Unit = { - var service: Service = null - s match { - case TCP() => - val serv = new TcpService(TcpService.generatePort) - service = serv - serv.start() - /*case JXTA(groupName) => - val serv = new ch.epfl.lamp.scala.actors.jxta.JXTAService("AliveActor" - + new java.util.Date().getTime() + "-" - + new java.util.Random().nextInt(1000), - java.util.logging.Level.FINEST) { - //millis before we give up group creation and create the group. - override def TIME_BEFORE_AUTO_GROUP_CREATE: long = 30000 - val PIPE_ID:String="1" - //val ADV_LIFETIME:long = 1 * 60 * 60 * 1000 //millis to keep advertisements ... - override def MY_GROUP_NAME:String = groupName - /*val SENDER_MESSAGE = "PalcomDemo"; //used to identify the message element in jxta messages - val PIPE_BASE_ID:String = "PIPE4Pal4"+MY_GROUP_NAME; - val MESSAGE_THRESHOLD = 5;*/ - } - service = serv - serv.start()*/ - case _ => - throw new Exception ("Unknown Service in RemoteActor") - } - // create RemotePid - selfCached = service.kernel.register(this) - } - - def node(pid: RemotePid) = pid.node - - def disconnectNode(node: Node) = - kernel.disconnectNode(node) - - //does not call start def of Actor - def register(name: Symbol, pid: RemotePid): Unit = - kernel.registerName(name, pid) - - //calls start def of Actor - def register(name: Symbol, a: RemoteActor): Unit = - kernel.registerName(name, a) - - def name(node: Node, sym: Symbol): Name = - Name(node, sym, kernel) - - def spawn(node: Node, name: String): RemotePid = - kernel.spawn(self, node, name) - - def spawn(node: Node, a: RemoteActor): Unit = - kernel.spawn(self, node, a) - - def spawn(fun: RemoteActor => Unit): RemotePid = - kernel.spawn(fun) - - def spawn(a: RemoteActor): RemotePid = { - val pid = kernel.register(a) - a.start - pid - } - - def spawnLink(fun: RemoteActor => unit): RemotePid = - kernel.spawnLink(self, fun) - - def monitorNode(node: Node, cond: Boolean) = - kernel.monitorNode(self, node, cond) - - // this should be: - // self.link(pid) - // if self is RemotePid it will invoke NetKernel - - def link(pid: RemotePid): Unit = - kernel.link(self, pid) - - def unlink(pid: RemotePid): Unit = - kernel.unlink(self, pid) - - override def exit(reason: Symbol): Unit = - kernel.exit(self, reason) - - override def processFlag(flag: Symbol, set: Boolean) = - kernel.processFlag(self, flag, set) - - override def die(reason: Symbol) = - if (isAlive) { - isAlive = false - Debug.info("" + this + " died.") - kernel.exit(self, reason) - } - - override def die() = - if (isAlive) { - isAlive = false - Debug.info("" + this + " died.") - kernel.exit(self, 'normal) - } - -} diff --git a/src/actors/scala/actors/distributed/RemotePid.scala b/src/actors/scala/actors/distributed/RemotePid.scala deleted file mode 100644 index e4ea365460..0000000000 --- a/src/actors/scala/actors/distributed/RemotePid.scala +++ /dev/null @@ -1,195 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.actors.multi.ExcHandlerDesc - -import java.io._ - -/** - * @author Philipp Haller - */ -[serializable] -abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends scala.actors.multi.Process { - def this() = this(0, null, null) // for serialization - - private var _locId = locId - - override def equals(that: Any) = that match { - case rpid: RemotePid => - (this.node == rpid.node && this.localId == rpid.localId) - case _ => false - } - - //[throws(classOf[IOException])] - private def writeObject(out: ObjectOutputStream): Unit = { - //scala.Console.println("writing locID"+locId) - out.writeInt(locId) - } - - //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] - private def readObject(in: ObjectInputStream): Unit = { - _locId = in.readInt() - //scala.Console.println("read _locID"+_locId) - } - - //[throws(classOf[ObjectStreamException])] - private def readResolve(): AnyRef = { - scala.Console.println("readResolve") - null - //build nothing. Subclasses will do... - } - - def node: Node; - - def localId: int = locId; - - def kernel = kern; - - override def !(msg: Any): unit = { - //scala.Console.println("! " + msg) - if (actor != null) - actor send msg - else - kernel.remoteSend(this, msg) - } - - override def link(other: Process): Unit = - other match { - case rpid: RemotePid => - kernel.link(this, rpid) - }; - - //TODO (dont know if this is local to kernel.node) - override def linkTo(other: Process): Unit = - other match { - case rpid: RemotePid => - // do nothing - }; - - override def unlink(other: Process): Unit = - other match { - case rpid: RemotePid => - kernel.unlink(this, rpid) - }; - - //TODO (dont know if this is local to kernel.node) - override def unlinkFrom(other: Process): Unit = - other match { - case rpid: RemotePid => - // do nothing - }; - - override def exit(reason: Symbol): Unit = kernel.exit(this, reason); - override def exit(from: Process, reason: Symbol): unit = { - from match { - case rpid: RemotePid => - kernel.exit(rpid, reason); - } - } - - override def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {} -} - -[serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) { - def node: TcpNode = n; - - private var _locId = locId - private var _node = n - - override def equals(that: Any) = - super.equals(that) - - //[throws(classOf[IOException])] - private def writeObject(out: ObjectOutputStream): Unit = { - out.writeInt(locId) - out.writeObject(n) - } - - //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] - private def readObject(in: ObjectInputStream): Unit = { - _locId = in.readInt() - _node = in.readObject().asInstanceOf[TcpNode] - } - - //[throws(classOf[ObjectStreamException])] - private def readResolve(): AnyRef = { - val kernel = NetKernel.kernel; - //TODO val actor = kernel.getLocalRef(_locId) - TcpPid(_node, _locId, kernel, actor) - } -} - -[serializable] case class JXTAPid(n: JXTANode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) { - def node: JXTANode = n; - - private var _locId = locId - private var _node = n - - override def equals(that: Any) = - super.equals(that) - - //[throws(classOf[IOException])] - private def writeObject(out: ObjectOutputStream): Unit = { - out.writeInt(locId) - out.writeObject(n) - } - - //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] - private def readObject(in: ObjectInputStream): Unit = { - _locId = in.readInt() - _node = in.readObject().asInstanceOf[JXTANode] - } - - //[throws(classOf[ObjectStreamException])] - private def readResolve(): AnyRef = { - val kernel = NetKernel.kernel; - //TODO val actor = kernel.getLocalRef(_locId) - JXTAPid(_node, _locId, kernel, actor) - } -} - -//================================================================================ - -object CaseTest { - - def getBytes(obj: AnyRef): Array[byte] = { - val bos = new ByteArrayOutputStream() - val out = new ObjectOutputStream(bos) - out.writeObject(obj) - out.flush() - bos.toByteArray() - } - - def getObject(a: Array[byte]): AnyRef = { - val bis = new ByteArrayInputStream(a) - val in = new ObjectInputStream(bis) - val obj = in.readObject() - obj - } - - def main(args: Array[String]): Unit = { - val node = JXTANode ("test node"); - val pid1 = JXTAPid (node, 4, null, null); - val pid2 = JXTAPid (node, 4, new NetKernel(null), null); - - scala.Console.println("node Before: " + node) - scala.Console.println("node After : " + getObject(getBytes(node))) - - scala.Console.println("pid1 Before: " + pid1) - scala.Console.println("pid1 After : " + getObject(getBytes(pid1))) - - scala.Console.println("pid2 Before: " + pid2) - scala.Console.println("pid2 After : " + getObject(getBytes(pid2))) - - scala.Console.println("pid2 After : " + getObject((new String (getBytes(pid2))).getBytes)) - } -} diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala deleted file mode 100644 index d934168b5b..0000000000 --- a/src/actors/scala/actors/distributed/Serializer.scala +++ /dev/null @@ -1,58 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io.{DataInputStream,DataOutputStream,EOFException} -import scala.io.BytePickle.SPU - -abstract class Serializer(s: Service) { - def serialize(o: AnyRef/*, w: Writer*/): Array[byte] - def deserialize(a: Array[byte]/*r: Reader*/): AnyRef - - // throws IOException - def readBytes(inputStream: DataInputStream): Array[byte] = { - try { - val length = inputStream.readInt() - val bytes = new Array[byte](length) - inputStream.readFully(bytes, 0, length) - return bytes - } - catch { - case npe: NullPointerException => - throw new EOFException("Connection closed.") - } - } - - // throws IOException, ClassNotFoundException - def readObject(inputStream: DataInputStream): AnyRef = { - val bytes = readBytes(inputStream) - deserialize(bytes) - } - - // throws IOException - def writeBytes(outputStream: DataOutputStream, bytes: Array[byte]): unit = { - val length = bytes.length; - // original length - outputStream.writeInt(length) - outputStream.write(bytes, 0, length) - outputStream.flush() - } - - // throws IOException - def writeObject(outputStream: DataOutputStream, obj: AnyRef) = { - val bytes = serialize(obj) - writeBytes(outputStream, bytes) - } - - def pid: SPU[RemotePid] - def service = s - def addRep(name: String, repCons: Serializer => AnyRef): unit -} diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala deleted file mode 100644 index f5164751b3..0000000000 --- a/src/actors/scala/actors/distributed/Service.scala +++ /dev/null @@ -1,83 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io.StringWriter - -/** - * @author Philipp Haller - */ -trait Service { - val serializer: Serializer - def node: Node - def createPid(actor: RemoteActor): RemotePid - def send(node: Node, data: Array[byte]): Unit - def connect(node: Node): Unit // non blocking. - def disconnectNode(node: Node): Unit - def isConnected(node: Node): Boolean - - //blocking. timeout depends on Implementation. - def isReachable(node: Node): Boolean - - def getRoundTripTimeMillis(node: Node): Long //blocking - - def nodes:List[Node] - -// implemented parts: - - private val kern = new NetKernel(this) - def kernel = kern - - def spawn(name: String): RemotePid = - kern spawn name - - def spawn(name: String, arg: RemotePid): RemotePid = - kern.spawn(name, arg) - - - //suggested addition by seb - def spawn(fun: RemoteActor => unit): RemotePid = - kernel.spawn(fun); - def spawn(a:RemoteActor):RemotePid = { - //Console.println("Service:spawn(RemoteActor)") - val pid = kernel.register(a) - //Console.println("RemoteActor("+a+") registered in kernel") - a.start - //Console.println("RemoteActor("+a+") started") - pid - } - - def send(pid: RemotePid, msg: AnyRef): unit = synchronized { - if (pid.node == this.node) - kernel.localSend(pid, msg) - else - kernel.remoteSend(pid, msg) - } - - def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized { - //Console.println("Service: Sending " + msg + " to " + pid) - // lets try to serialize the message - //val sw = new StringWriter - //serializer.serialize(msg, sw) - val bytes = serializer.serialize(msg) - //val sendMsg = Send(pid, sw.toString()) - val sendMsg = Send(pid, bytes) - //val sw2 = new StringWriter - //serializer.serialize(sendMsg, sw2) - //send(pid.node, sw2.toString()) - val bytes2 = serializer.serialize(sendMsg) - send(pid.node, bytes2) - } - - private var idCnt = 0 - def makeUid = { idCnt = idCnt + 1; idCnt } - -} diff --git a/src/actors/scala/actors/distributed/SystemMessage.scala b/src/actors/scala/actors/distributed/SystemMessage.scala deleted file mode 100644 index 344973e010..0000000000 --- a/src/actors/scala/actors/distributed/SystemMessage.scala +++ /dev/null @@ -1,33 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.actors.multi.ExcHandlerDesc - -abstract class MessageTyper { - type DataType = Array[Byte] -} - -abstract class SystemMessage -case class Send(rec: RemotePid, data: MessageTyper#DataType) extends SystemMessage -case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage -case class Spawn(replyto: RemotePid, p: String) extends SystemMessage -case class SpawnObject(replyto: RemotePid, data: MessageTyper#DataType) extends SystemMessage -case class Exit1(from: RemotePid, to: RemotePid, reason: Symbol) extends SystemMessage - -case class RemotePidReply(res: RemotePid) extends SystemMessage -case class Disconnect() extends SystemMessage -case class NodeDown() extends SystemMessage - -// CAUTION: Tells "from" to create a _uni-directional_ link! -case class Link(from: RemotePid, to: RemotePid) extends SystemMessage -case class UnLink(from: RemotePid, to: RemotePid) extends SystemMessage -case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala deleted file mode 100644 index 4fe9590221..0000000000 --- a/src/actors/scala/actors/distributed/TcpSerializerComb.scala +++ /dev/null @@ -1,135 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io.Reader -import scala.io.BytePickle._ - -import scala.actors.distributed.MessagesComb._ -import scala.actors.distributed.NodeComb._ -import scala.collection.mutable.HashMap - -/** - * @author Philipp Haller - */ -//TODO: change Service to NetKernel in Serializer interface -class TcpSerializerComb(serv: Service) extends Serializer(serv) { - - private def lookup(typename: String): PU[AnyRef] = { - val op = table.get(typename) - op match { - case None => - error("No type representation found.") - null - case Some(rep) => - val repr = rep.asInstanceOf[PU[AnyRef]] - repr - } - } - - private def lookup(r: Reader): PU[AnyRef] = { - // read length of type name - val carr = new Array[char](8) - r.read(carr) - val len = Util.decode(new String(carr)) - val content = new Array[char](len) - r.read(content) - lookup(new String(content)) - } - - def pid: SPU[RemotePid] = { - val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel, - if (p._1 == serv.node) serv.kernel.getLocalRef(p._2) - else null), - (t: TcpPid) => Pair(t.node, t.localId), - pair(tcpNodePU, nat)); - - wrap((p:RemotePid) => p, (pid:RemotePid) => pid match { - case tpid: TcpPid => - tpid - case other => - error("no instance of TcpPid!!") - }, nodeIntPU) - } - - def anyRef: SPU[AnyRef] = - wrap((typename: String) => Class.forName(typename).newInstance().asInstanceOf[AnyRef], - (obj: AnyRef) => Util.baseName(obj), - string); - - def actorPU: SPU[RA] = - wrap((typename: String) => RA(Class.forName(typename).newInstance().asInstanceOf[RemoteActor]), - (obj: RA) => Util.baseName(obj.a), - string); - - val log = new Debug("TcpSerializerComb") - log.level = 3 - - val table = new HashMap[String, AnyRef] - - initialize - def initialize = { - table += "int" -> nat - table += "Send" -> sendPU(this) - table += "Spawn" -> spawnPU(this) - table += "TcpNode" -> tcpNodePU - table += "TcpPid" -> pid - table += "Exit" -> exitPU(this) - table += "AnyRef" -> anyRef - - table += "RA" -> actorPU - table += "SpawnObject" -> spawnObjectPU(this) - - //table += "Incr" -> incrPU(this) - //table += "Value" -> valuePU(this) - //table += "Result" -> resultPU(this) - } - - def addRep(name: String, repCons: Serializer => AnyRef) = - table.update(name, repCons(this)) - - def +=(name: String) = - new InternalMapTo(name) - - class InternalMapTo(name: String) { - def ->(repCons: Serializer => AnyRef): unit = - table.update(name, repCons(TcpSerializerComb.this)) - } - - def serialize(o: AnyRef): Array[byte] = { - log.info("pickling value of type " + Util.baseName(o)) - val op = table.get(Util.baseName(o)) - op match { - case None => error("No type representation for " + Util.baseName(o) + " found. Cannot serialize."); - case Some(rep) => - // first write type name - val bytes = pickle(string, Util.baseName(o)) - val repr = rep.asInstanceOf[SPU[AnyRef]] - log.info("using type representation " + repr) - val res = repr.appP(o, new PicklerState(bytes, new PicklerEnv)).stream - res - } - - } - - def deserialize(bytes: Array[byte]): AnyRef = { - val ups = string.appU(new UnPicklerState(bytes, new UnPicklerEnv)) - val typename = ups._1 - table.get(typename) match { - case None => error("No type representation for " + typename + " found. Cannot deserialize.") - case Some(rep) => - val repr = rep.asInstanceOf[SPU[AnyRef]]; - val obj = repr.appU(ups._2)._1 - log.info("unpickling successful") - obj - } - } -} diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala deleted file mode 100644 index a1efa01d9b..0000000000 --- a/src/actors/scala/actors/distributed/TcpService.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io.IOException -import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException} - -/** - * @author Philipp Haller - */ -object TcpService { - val random = new java.util.Random(System.currentTimeMillis()) - - def generatePort: int = { - var portnum = 0 - try { - portnum = 8000 + random.nextInt(500) - val socket = new ServerSocket(portnum) - socket.close() - } - catch { - case ioe: IOException => - // this happens when trying to open a socket twice at the same port - // try again - generatePort - case se: SecurityException => - // do nothing - } - portnum - } -} - -object TestPorts { - def main(args: Array[String]): Unit = { - val random = new java.util.Random(System.currentTimeMillis()) - val socket = new ServerSocket(8000 + random.nextInt(500)) - Console.println(TcpService.generatePort) - } -} - -/** - * @author Philipp Haller - */ -class TcpService(port: Int) extends Thread with Service { - val serializer: JavaSerializer = new JavaSerializer(this) - - private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port); - def node: TcpNode = internalNode - - def createPid(actor: RemoteActor): RemotePid = - new TcpPid(internalNode, makeUid, kernel, actor) - - def send(node: Node, data: String): unit = synchronized { - // retrieve worker thread (if any) that already has connection - node match { - case tnode: TcpNode => - getConnection(tnode) match { - case None => - // we are not connected, yet - Console.println("We are not connected, yet."); - val newWorker = connect(tnode); //bad in a sync BLOCK!!! - newWorker transmit data - case Some(worker) => worker transmit data - } - case any => error("no TcpNode!"); - } - } - - def send(node: Node, data: Array[byte]): unit = synchronized { - // retrieve worker thread (if any) that already has connection - node match { - case tnode: TcpNode => - getConnection(tnode) match { - case None => - // we are not connected, yet - Console.println("We are not connected, yet."); - val newWorker = connect(tnode); //bad in a sync BLOCK!!! - newWorker transmit data - case Some(worker) => worker transmit data - } - case any => error("no TcpNode!"); - } - } - - override def run(): Unit = - try { - val socket = new ServerSocket(port); - Console.println("Tcp Service started: " + node); - - while (true) { - val nextClient = socket.accept(); - Console.println("Received request from " + nextClient.getInetAddress() + ":" + nextClient.getPort()); - - // this is bad because client will have other port than actual node - // solution: worker should read node from stream - // and call main thread to update connection table - - // spawn new worker thread - val worker = new TcpServiceWorker(this, nextClient); - worker.readNode; - // start worker thread - worker.start() - } - } - catch { - case ioe: IOException => - // do nothing - case sec: SecurityException => - // do nothing - } - - // connection management - - private val connections = - new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker] - - def nodes: List[Node] = - throw new Exception ("nodes need to be implemented in TcpService!") - - def addConnection(n: TcpNode, w: TcpServiceWorker) = synchronized { - connections += n -> w - } - - def getConnection(n: TcpNode) = synchronized { - connections.get(n) - } - - def isConnected(n: Node): Boolean = synchronized { - n match { - case tnode: TcpNode => - ! connections.get(tnode).isEmpty - case _ => - false - } - } - - def connect(n: Node): Unit = synchronized { - n match { - case tnode: TcpNode => - connect(tnode) - } - } - - def connect(n: TcpNode): TcpServiceWorker = synchronized { - Console.println("" + node + ": Connecting to node " + n + " ...") - val sock = new Socket(n.address, n.port) - Console.println("Connected.") - // spawn new worker thread - val worker = new TcpServiceWorker(this, sock) - worker.sendNode; - // start worker thread - worker.start() - // register locally (we want to reuse connections which correspond to connected sockets) - // update connection table - addConnection(n, worker) - worker - } - - def disconnectNode(n: Node) = synchronized { - n match { - case node: TcpNode => - Console.println("Disconnecting from " + node + " ...") - connections.get(node) match { - case None => Console.println("Cannot disconnect from " + node + ". Not connected.") - case Some(worker) => - //TODO: sending disconnect message - //worker.sendDisconnect; - // update table - connections -= node - Console.println("Halting worker...") - worker.halt - } - case any => error("No TcpNode!!"); - } - } - - def isReachable(node: Node): boolean = - if (isConnected(node)) true - else try { - connect(node) - return true - } - catch { - case uhe: UnknownHostException => - false - case ioe: IOException => - false - case se: SecurityException => - false - } - - def getRoundTripTimeMillis(node: Node): Long = 0 - - def nodeDown(mnode: TcpNode): Unit = synchronized { - kernel nodeDown mnode - connections -= mnode - } - - /*def closeConnection(worker: TcpServiceWorker): unit = synchronized { - connections.get(worker) match { - case None => - System.err.println("Worker " + worker + " not registered."); - case Some(socket) => { - try { - socket.close(); - connections -= worker - } - catch { - case ioe:IOException => - System.err.println("Couldn't close connection."); - connections -= worker - } - } - }; - System.out.println("OK. Connection closed.") - }*/ - -} diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala deleted file mode 100644 index 2587319fdb..0000000000 --- a/src/actors/scala/actors/distributed/TcpServiceWorker.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import java.io._ -import java.net.Socket - -/** - * @author Philipp Haller - */ -class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { - val in = so.getInputStream() - val out = so.getOutputStream() - - val datain = new DataInputStream(in) - val dataout = new DataOutputStream(out) - - val reader = new BufferedReader(new InputStreamReader(in)) - val writer = new PrintWriter(new OutputStreamWriter(out)) - - val log = new Debug("TcpServiceWorker") - log.level = 2 - - def transmit(msg: Send): Unit = synchronized { - val data = parent.serializer.serialize(msg) - transmit(data) - } - - def transmit(data: String): Unit = synchronized { - log.info("Transmitting " + data) - writer.write(data) - writer.flush() - } - - def transmit(data: Array[byte]): Unit = synchronized { - log.info("Transmitting " + data) - dataout.writeInt(data.length) - dataout.write(data) - dataout.flush() - } - - def sendNode = { - scala.Console.println("Sending our name " + parent.node) - parent.serializer.writeObject(dataout, parent.node) - } - - var connectedNode: TcpNode = _ - - def readNode = { - scala.Console.println("" + parent.node + ": Reading node name...") - //val node = parent.serializer.deserialize(reader) - val node = parent.serializer.readObject(datain) - scala.Console.println("Connection from " + node) - node match { - case n: TcpNode => { - connectedNode = n - scala.Console.println("Adding connection to " + node + " to table.") - parent.addConnection(n, this) - } - } - } - - var running = true - def halt = synchronized { - so.close() // close socket - running = false // stop - } - - override def run(): Unit = { - try { - while (running) { - if (in.available() > 0) { - log.info("deserializing..."); - //val msg = parent.serializer.deserialize(reader); - val msg = parent.serializer.readObject(datain); - log.info("Received object: " + msg); - parent.kernel.processMsg(msg) - } - } - } - catch { - case ioe: IOException => - scala.Console.println("" + ioe + " while reading from socket."); - parent nodeDown connectedNode - case e: Exception => - // catch-all - scala.Console.println("" + e + " while reading from socket."); - parent nodeDown connectedNode - } - } -} diff --git a/src/actors/scala/actors/distributed/Util.scala b/src/actors/scala/actors/distributed/Util.scala deleted file mode 100644 index 07ca4fb417..0000000000 --- a/src/actors/scala/actors/distributed/Util.scala +++ /dev/null @@ -1,70 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.distributed - -import scala.collection.mutable.{ArrayBuffer,Buffer} - -/** - * @author Philipp Haller - */ -object Util { - def pad(s: String, req: int): String = { - val buf = new StringBuffer - val add: int = req - s.length() - for (val i <- List.range(1, add+1)) - buf append "0"; - buf append s - buf.toString() - } - - def encode(i: Int) = pad(Integer.toHexString(i), 8) - def encode(l: Long) = pad(java.lang.Long.toHexString(l), 16) - def decode(s: String): Int = Integer.decode("0x" + s).intValue() - def decodeLong(s: String): Long = java.lang.Long.decode("0x" + s).longValue() - - def baseName(o: Any) = { - val s = o.toString() - - def baseName(s: String): String = { - if (s.indexOf('$') != -1) - baseName(s.substring(0,s.indexOf('$'))) - else if (s.indexOf('(') != -1) - baseName(s.substring(0,s.indexOf('('))) - else if (s.indexOf('@') != -1) - baseName(s.substring(0,s.indexOf('@'))) - else s - } - - baseName(s) - } - - def extractArgs(s: String): Buffer[String] = { - // extract strings between first-level commas - var level: int = 0; - val carr: Array[char] = s.toCharArray(); - var buf = new StringBuffer; // current string - val args = new ArrayBuffer[String]; - - for (val i <- List.range(0,carr.length)) { - if ((level == 0) && (carr(i) == ',')) { - // argument finished - args += buf.toString(); - buf = new StringBuffer - } else { - if (carr(i) == '(') level = level + 1; - if (carr(i) == ')') level = level - 1; - buf append carr(i) - } - } - args += buf.toString(); - args - } -} diff --git a/src/actors/scala/actors/gui/Button.scala b/src/actors/scala/actors/gui/Button.scala deleted file mode 100644 index 59ae64b9d8..0000000000 --- a/src/actors/scala/actors/gui/Button.scala +++ /dev/null @@ -1,20 +0,0 @@ -package scala.actors.gui - -import javax.swing._ -import event._ - -/** A class for buttons; standard constructor wraps around a swing button */ -class Button(val jbutton: JButton) extends Container(jbutton) with SwingComponent with Publisher { - def this(txt: String) = this(new JButton(txt)) - def this() = this(new JButton()) - def text: String = jbutton.getText() - def text_=(s: String) = jbutton.setText(s) - def icon: Icon = jbutton.getIcon() - def icon_=(i: Icon) = jbutton.setIcon(i) - jbutton.addActionListener { - new java.awt.event.ActionListener { - def actionPerformed(e: java.awt.event.ActionEvent): unit = - publish(ButtonPressed(Button.this)) - } - } -} diff --git a/src/actors/scala/actors/gui/Caret.scala b/src/actors/scala/actors/gui/Caret.scala deleted file mode 100644 index 05cfc2659a..0000000000 --- a/src/actors/scala/actors/gui/Caret.scala +++ /dev/null @@ -1,8 +0,0 @@ -package scala.actors.gui; - -import javax.swing - -class Caret(val jcaret: swing.text.Caret) { - def dot: int = jcaret.getDot() - def mark: int = jcaret.getMark() -} diff --git a/src/actors/scala/actors/gui/Component.scala b/src/actors/scala/actors/gui/Component.scala deleted file mode 100644 index 9610b433fa..0000000000 --- a/src/actors/scala/actors/gui/Component.scala +++ /dev/null @@ -1,9 +0,0 @@ -package scala.actors.gui; - -import javax.swing._; -import java.awt._; - -class Component(val acomponent: java.awt.Component) extends Subscriber { - def show: this.type = { acomponent.setVisible(true); this } -} - diff --git a/src/actors/scala/actors/gui/Container.scala b/src/actors/scala/actors/gui/Container.scala deleted file mode 100644 index e30ac78caa..0000000000 --- a/src/actors/scala/actors/gui/Container.scala +++ /dev/null @@ -1,18 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ -import scala.collection.mutable.ListBuffer - -class Container(val jcontainer: java.awt.Container) extends Component(jcontainer) { - def this() = this(new java.awt.Container()) - val elems = new ListBuffer[Component] - def += (c: Component) = { - elems += c - jcontainer.add(c.acomponent) - } - def -= (c: Component) = { - elems -= c - jcontainer.remove(c.acomponent) - } -} - diff --git a/src/actors/scala/actors/gui/EmptyBorder.scala b/src/actors/scala/actors/gui/EmptyBorder.scala deleted file mode 100644 index 01ff7481cb..0000000000 --- a/src/actors/scala/actors/gui/EmptyBorder.scala +++ /dev/null @@ -1,8 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ - -class EmptyBorder(_top: int, _left: int, _bottom: int, _right: int) -extends border.EmptyBorder(_top, _left, _bottom, _right) { - def this() = this(0, 0, 0, 0) -} diff --git a/src/actors/scala/actors/gui/FormattedTextField.scala b/src/actors/scala/actors/gui/FormattedTextField.scala deleted file mode 100644 index 74b0c6e359..0000000000 --- a/src/actors/scala/actors/gui/FormattedTextField.scala +++ /dev/null @@ -1,9 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ -import java.awt.event._ -import event._ - -class FormattedTextField(val jftextfield: JFormattedTextField) extends TextComponent(jftextfield) { - def this(format: java.text.Format) = this(new JFormattedTextField(format)); -} diff --git a/src/actors/scala/actors/gui/Frame.scala b/src/actors/scala/actors/gui/Frame.scala deleted file mode 100644 index f5269e3d5f..0000000000 --- a/src/actors/scala/actors/gui/Frame.scala +++ /dev/null @@ -1,33 +0,0 @@ -package scala.actors.gui; - -import javax.swing._; -import event._; - -class Frame(val jframe: JFrame) extends Container(jframe) with Publisher { - def this() = this(new JFrame("Untitled Frame")) - def title: String = jframe.getTitle() - def title_=(s: String) = jframe.setTitle(s) - val contents = new Container(jframe.getContentPane()) - private var default_button: Button = null - def defaultButton = default_button - def defaultButton_=(b: Button) = { default_button = b; jframe.getRootPane().setDefaultButton(b.jbutton) } - def pack: this.type = { jframe.pack(); this } - jframe.addWindowListener { - new java.awt.event.WindowListener { - def windowActivated(e: java.awt.event.WindowEvent) = publish(WindowActivated(Frame.this)) - def windowClosed(e: java.awt.event.WindowEvent) = publish(WindowClosed(Frame.this)) - def windowClosing(e: java.awt.event.WindowEvent) = publish(WindowClosing(Frame.this)) - def windowDeactivated(e: java.awt.event.WindowEvent) = publish(WindowDeactivated(Frame.this)) - def windowDeiconified(e: java.awt.event.WindowEvent) = publish(WindowDeiconified(Frame.this)) - def windowIconified(e: java.awt.event.WindowEvent) = publish(WindowIconified(Frame.this)) - def windowOpened(e: java.awt.event.WindowEvent) = publish(WindowOpened(Frame.this)) - } - } - - /*jframe.addMouseMotionListener ( - new java.awt.event.MouseMotionListener { - def mouseDragged(e: java.awt.event.MouseEvent) = publish(MouseDragged(e)) - def mouseMoved(e: java.awt.event.MouseEvent) = publish(MouseMoved(e)) - } - )*/ -} diff --git a/src/actors/scala/actors/gui/GUIApplication.scala b/src/actors/scala/actors/gui/GUIApplication.scala deleted file mode 100644 index 7d52e75628..0000000000 --- a/src/actors/scala/actors/gui/GUIApplication.scala +++ /dev/null @@ -1,20 +0,0 @@ -package scala.actors.gui - -import javax.swing._ -import event.Event - -class GUIApplication { - def defaultLookAndFeelDecorated: boolean = true - - def init() = { - UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName()) - JFrame.setDefaultLookAndFeelDecorated(defaultLookAndFeelDecorated) - } - - def run(prog: => unit): unit = - SwingUtilities.invokeLater { - new Runnable() { - def run() = { init(); prog } - } - } -} diff --git a/src/actors/scala/actors/gui/Label.scala b/src/actors/scala/actors/gui/Label.scala deleted file mode 100644 index 5094f9436c..0000000000 --- a/src/actors/scala/actors/gui/Label.scala +++ /dev/null @@ -1,14 +0,0 @@ -package scala.actors.gui; - -import javax.swing._; - -class Label(val jlabel: JLabel) extends Container(jlabel) with SwingComponent { - def this(txt: String) = this(new JLabel(txt)) - def this() = this("Untitled Label") - def text: String = jlabel.getText() - def text_=(s: String) = jlabel.setText(s) - def halign: Orientation.Value = Orientation(jlabel.getHorizontalAlignment()) - def halign_=(x: Orientation.Value) = jlabel.setHorizontalAlignment(x.id) - def valign: Orientation.Value = Orientation(jlabel.getVerticalAlignment()) - def valign_=(x: Orientation.Value) = jlabel.setVerticalAlignment(x.id) -} diff --git a/src/actors/scala/actors/gui/MainFrame.scala b/src/actors/scala/actors/gui/MainFrame.scala deleted file mode 100644 index b76a170e71..0000000000 --- a/src/actors/scala/actors/gui/MainFrame.scala +++ /dev/null @@ -1,14 +0,0 @@ -package scala.actors.gui; - -import javax.swing._; -import scala.actors.gui.event._; - -class MainFrame(jframe: JFrame) extends Frame(jframe) { - def this() = this(new JFrame("Untitled Frame")) - - addHandler { - case WindowClosing(_) => System.exit(1) - } - - subscribe(this) -} diff --git a/src/actors/scala/actors/gui/Orientation.scala b/src/actors/scala/actors/gui/Orientation.scala deleted file mode 100644 index a71157380a..0000000000 --- a/src/actors/scala/actors/gui/Orientation.scala +++ /dev/null @@ -1,11 +0,0 @@ -package scala.actors.gui - -import javax.swing.SwingConstants._ - -object Orientation extends Enumeration { - val left = Value(LEFT, "left") - val right = Value(RIGHT, "right") - val bottom = Value(BOTTOM, "bottom") - val top = Value(TOP, "top") - val center = Value(CENTER, "center") -} diff --git a/src/actors/scala/actors/gui/Panel.scala b/src/actors/scala/actors/gui/Panel.scala deleted file mode 100644 index 59cbff6784..0000000000 --- a/src/actors/scala/actors/gui/Panel.scala +++ /dev/null @@ -1,15 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ -import java.awt.event._ - -class Panel(val jpanel: JPanel) extends Container(jpanel) with SwingComponent { - def this(layout: java.awt.LayoutManager, elements: Component*) = { - this(new JPanel(layout)); - for (val elem <- elements) this += elem - } - def this(elements: Component*) = this(new java.awt.FlowLayout, elements: _*) - - def layout: java.awt.LayoutManager = jpanel.getLayout() - def layout_=(x: java.awt.LayoutManager) = jpanel.setLayout(x) -} diff --git a/src/actors/scala/actors/gui/Publisher.scala b/src/actors/scala/actors/gui/Publisher.scala deleted file mode 100644 index 48a5622349..0000000000 --- a/src/actors/scala/actors/gui/Publisher.scala +++ /dev/null @@ -1,123 +0,0 @@ -package scala.actors.gui - -import scala.collection.mutable.ListBuffer -import scala.actors.single.Actor -import scala.actors.gui.event.Event - -class EventHandlers { - type Handler = PartialFunction[Any,unit] - - private val handlers = new ListBuffer[Handler] - - def += (h: Handler) = { handlers += h } - def -= (h: Handler) = { handlers -= h } - - def compoundHandler = new Handler { - def isDefinedAt(e: Any): boolean = handlers.exists(.isDefinedAt(e)) - - def apply(e: Any): unit = - handlers.find(.isDefinedAt(e)) match { - case Some(h) => h.apply(e) - case None => // do nothing - } - } -} - -trait Responder extends Actor[Any] { - protected val handlers = new EventHandlers - - final def eventloop(f: PartialFunction[Any,unit]): Nothing = - receive(new RecursiveProxyHandler(this, f)) - - def eventblock(f: PartialFunction[Any,unit]): unit = { - try { - receive(new RecursiveProxyHandler(this, f)) - } - catch { - case d: Done => - // do nothing - } - } - - private class RecursiveProxyHandler(a: Actor[Any], f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] { - def isDefinedAt(m: Any): boolean = - true // events should be removed from the mailbox immediately! - - def apply(m: Any): unit = { - if (f.isDefinedAt(m)) f(m) // overrides any installed handler - else - if (handlers.compoundHandler.isDefinedAt(m)) - handlers.compoundHandler(m) - else { - // do nothing - } - a receive this - } - } -} - -case class Subscribe(s: Subscriber) -case class Publish(e: Event) - -trait Subscriber extends Responder { - type Handler = PartialFunction[Any,unit] - def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this) -} - -trait Publisher extends Responder { - case class HandlerAdded() - - private val subscribers = new ListBuffer[Subscriber] - - handlers += { // installs _permanent_ handler! - case Subscribe(s) => - //Console.println("" + this + ": rec subscription from " + s) - subscribers += s - case Publish(e) => for (val s <- subscribers) s send e - } - - //Console.println("" + this + ": exec toplevel eventloop (Publisher)") - - eventblock { - case HandlerAdded() => - //Console.println("" + this + " received HandlerAdded()") - } - - def addHandler(h: EventHandlers#Handler) = { - //Console.println("" + this + ": installing new handler") - handlers += h - this send HandlerAdded() // causes currently active eventloop to recursively call itself - } - - def publish(e: Event) = { - //Console.println("Publishing event: " + e) - for (val s <- subscribers) s send e - } - - // TODO: super.receive might already be overridden! - //final override def receive(f: PartialFunction[Any,unit]): Nothing = - //super.receive(new ProxyPubSubHandler(f)) - - private class ProxyPubSubHandler(f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] { - def isDefinedAt(m: Any): boolean = - if (f.isDefinedAt(m)) true - else m match { - case Subscribe(s) => true - case Publish(e) => true - case other => false - } - - def apply(m: Any): unit = { - m match { - case Subscribe(s) => - //Console.println("Rec subscription: " + s) - subscribers += s - case Publish(e) => - for (val s <- subscribers) s send e - case other => - // do nothing - } - if (f.isDefinedAt(m)) f(m) - } - } -} diff --git a/src/actors/scala/actors/gui/SimpleGUIApplication.scala b/src/actors/scala/actors/gui/SimpleGUIApplication.scala deleted file mode 100644 index 30154932cc..0000000000 --- a/src/actors/scala/actors/gui/SimpleGUIApplication.scala +++ /dev/null @@ -1,14 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ - -abstract class SimpleGUIApplication extends GUIApplication { - - def top: Frame; - - def main(args: Array[String]) = { - run { top.pack.show } - } - - implicit def string2label(s: String): Label = new Label(s) -} diff --git a/src/actors/scala/actors/gui/SwingComponent.scala b/src/actors/scala/actors/gui/SwingComponent.scala deleted file mode 100644 index cbe6478c32..0000000000 --- a/src/actors/scala/actors/gui/SwingComponent.scala +++ /dev/null @@ -1,11 +0,0 @@ -package scala.actors.gui - -import javax.swing._ -import java.awt._ - -trait SwingComponent extends Component { - val jcomponent = acomponent.asInstanceOf[JComponent]; - def border: javax.swing.border.Border = jcomponent.getBorder() - def border_=(x: javax.swing.border.Border): unit = jcomponent.setBorder(x) -} - diff --git a/src/actors/scala/actors/gui/TextComponent.scala b/src/actors/scala/actors/gui/TextComponent.scala deleted file mode 100644 index 955745045f..0000000000 --- a/src/actors/scala/actors/gui/TextComponent.scala +++ /dev/null @@ -1,22 +0,0 @@ -package scala.actors.gui - -import javax.swing._ -import javax.swing.text.JTextComponent -import javax.swing.event.{CaretEvent,CaretListener} -import event.CaretUpdate - -class TextComponent(val jtextcomponent: JTextComponent) -extends Container(jtextcomponent) with SwingComponent with Publisher { - - def text: String = jtextcomponent.getText() - def text_=(x: String) = jtextcomponent.setText(x) - - val caret = new Caret(jtextcomponent.getCaret()) - - jtextcomponent.addCaretListener { - new CaretListener { - def caretUpdate(e: CaretEvent) = - publish(CaretUpdate(TextComponent.this)) - } - } -} diff --git a/src/actors/scala/actors/gui/TextField.scala b/src/actors/scala/actors/gui/TextField.scala deleted file mode 100644 index 0e32fc29f5..0000000000 --- a/src/actors/scala/actors/gui/TextField.scala +++ /dev/null @@ -1,22 +0,0 @@ -package scala.actors.gui; - -import javax.swing._ -import java.awt.event._ -import event._ - -class TextField(val jtextfield: JTextField) extends TextComponent(jtextfield) { - def this(text: String, columns: int) = this(new JTextField(text, columns)); - def this(text: String) = this(new JTextField(text)); - def this(columns: int) = this(new JTextField(columns)); - def this() = this(new JTextField()); - - def columns: int = jtextfield.getColumns() - def columns_=(x: int) = jtextfield.setColumns(x) - - jtextfield.addActionListener { - new ActionListener { - def actionPerformed(e: ActionEvent) = - publish(TextModified(TextField.this)) - } - } -} diff --git a/src/actors/scala/actors/gui/event/ButtonPressed.scala b/src/actors/scala/actors/gui/event/ButtonPressed.scala deleted file mode 100644 index 9d120314fb..0000000000 --- a/src/actors/scala/actors/gui/event/ButtonPressed.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class ButtonPressed(b: Button) extends Event diff --git a/src/actors/scala/actors/gui/event/CaretUpdate.scala b/src/actors/scala/actors/gui/event/CaretUpdate.scala deleted file mode 100644 index 15c48385d9..0000000000 --- a/src/actors/scala/actors/gui/event/CaretUpdate.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class CaretUpdate(text: TextComponent) extends Event diff --git a/src/actors/scala/actors/gui/event/Event.scala b/src/actors/scala/actors/gui/event/Event.scala deleted file mode 100644 index 949cf2cb4d..0000000000 --- a/src/actors/scala/actors/gui/event/Event.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -abstract class Event diff --git a/src/actors/scala/actors/gui/event/MouseDragged.scala b/src/actors/scala/actors/gui/event/MouseDragged.scala deleted file mode 100644 index 37b768b922..0000000000 --- a/src/actors/scala/actors/gui/event/MouseDragged.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class MouseDragged(override val event: java.awt.event.MouseEvent) extends MouseEvent(event); diff --git a/src/actors/scala/actors/gui/event/MouseEvent.scala b/src/actors/scala/actors/gui/event/MouseEvent.scala deleted file mode 100644 index da43fbd8b8..0000000000 --- a/src/actors/scala/actors/gui/event/MouseEvent.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -abstract class MouseEvent(val event: java.awt.event.MouseEvent) extends Event; diff --git a/src/actors/scala/actors/gui/event/MouseMoved.scala b/src/actors/scala/actors/gui/event/MouseMoved.scala deleted file mode 100644 index bf2028f159..0000000000 --- a/src/actors/scala/actors/gui/event/MouseMoved.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class MouseMoved(override val event: java.awt.event.MouseEvent) extends MouseEvent(event); diff --git a/src/actors/scala/actors/gui/event/TextModified.scala b/src/actors/scala/actors/gui/event/TextModified.scala deleted file mode 100644 index 0af1b49838..0000000000 --- a/src/actors/scala/actors/gui/event/TextModified.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class TextModified(text: TextComponent) extends Event diff --git a/src/actors/scala/actors/gui/event/WindowActivated.scala b/src/actors/scala/actors/gui/event/WindowActivated.scala deleted file mode 100644 index 797a2d0fc5..0000000000 --- a/src/actors/scala/actors/gui/event/WindowActivated.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowActivated(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowClosed.scala b/src/actors/scala/actors/gui/event/WindowClosed.scala deleted file mode 100644 index 7676809299..0000000000 --- a/src/actors/scala/actors/gui/event/WindowClosed.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowClosed(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowClosing.scala b/src/actors/scala/actors/gui/event/WindowClosing.scala deleted file mode 100644 index bd63987c4d..0000000000 --- a/src/actors/scala/actors/gui/event/WindowClosing.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowClosing(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowDeactivated.scala b/src/actors/scala/actors/gui/event/WindowDeactivated.scala deleted file mode 100644 index 4452f792d6..0000000000 --- a/src/actors/scala/actors/gui/event/WindowDeactivated.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowDeactivated(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowDeiconified.scala b/src/actors/scala/actors/gui/event/WindowDeiconified.scala deleted file mode 100644 index 1c7cbca57c..0000000000 --- a/src/actors/scala/actors/gui/event/WindowDeiconified.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowDeiconified(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowEvent.scala b/src/actors/scala/actors/gui/event/WindowEvent.scala deleted file mode 100644 index 25177d8e2a..0000000000 --- a/src/actors/scala/actors/gui/event/WindowEvent.scala +++ /dev/null @@ -1,5 +0,0 @@ -package scala.actors.gui.event - -abstract class WindowEvent extends Event { - val window: Frame -} diff --git a/src/actors/scala/actors/gui/event/WindowIconified.scala b/src/actors/scala/actors/gui/event/WindowIconified.scala deleted file mode 100644 index abb6362188..0000000000 --- a/src/actors/scala/actors/gui/event/WindowIconified.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowIconified(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowOpened.scala b/src/actors/scala/actors/gui/event/WindowOpened.scala deleted file mode 100644 index f2656497a3..0000000000 --- a/src/actors/scala/actors/gui/event/WindowOpened.scala +++ /dev/null @@ -1,3 +0,0 @@ -package scala.actors.gui.event - -case class WindowOpened(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/layout.scala b/src/actors/scala/actors/gui/layout.scala deleted file mode 100644 index 6d53d267ae..0000000000 --- a/src/actors/scala/actors/gui/layout.scala +++ /dev/null @@ -1,12 +0,0 @@ -package scala.actors.gui - -import java.awt._ - -object layout { - - val flex = 0 - - def grid(rows: int, columns: int) = new GridLayout(rows, columns) - def row = new FlowLayout() - def column = grid(flex, 1) -} diff --git a/src/actors/scala/actors/multi/Actor.scala b/src/actors/scala/actors/multi/Actor.scala deleted file mode 100644 index 626995792f..0000000000 --- a/src/actors/scala/actors/multi/Actor.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -/** - * @author Philipp Haller - */ -trait Actor[T] extends scala.actors.Actor[T] with MailBox { - def run(): Unit = {} - - def start(): Unit = try { run() } - catch { - case d: Done => // do nothing - } - - def !(msg: T): Unit = send(msg) -} diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala deleted file mode 100644 index 529f03a747..0000000000 --- a/src/actors/scala/actors/multi/MailBox.scala +++ /dev/null @@ -1,182 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -import scala.collection.mutable.Queue - -/** - * @author Philipp Haller - */ -trait MailBox { - /** Unconsumed messages. */ - var sent = new Queue[Any] - - var continuation: PartialFunction[Any, Unit] = null - // more complex continuation - var contCases: PartialFunction[Any, Any] = null - var contThen: Any => Unit = null - - def hasCont = - if ((continuation == null) && (contCases == null)) false - else true - - def contDefinedAt(msg: Any) = - if (((continuation != null) && continuation.isDefinedAt(msg)) || - ((contCases != null) && contCases.isDefinedAt(msg))) - true - else - false - - var isAlive = true - var scheduled = false - - private var pendingSignal = false - - def scheduleContinuation(msg: Any): Unit = { - val task = new ReceiverTask(this, msg) - //Debug.info("ready to receive. dispatch new task " + task) - scheduled = true - Scheduler.execute(task) - } - - def send(msg: Any): Unit = synchronized { - if (isAlive) { - if (!hasCont || scheduled) { - //Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") - msg match { - case Signal() => - // do not add to mailbox - case _ => - sent += msg - } - } - else - msg match { - case Signal() => - if (!contDefinedAt(TIMEOUT)) die() - else - scheduleContinuation(TIMEOUT) - case _ => - if (!contDefinedAt(msg)) - sent += msg - else { - if (pendingSignal) { - pendingSignal = false - TimerThread.trashRequest(this) - } - scheduleContinuation(msg) - } - } - } - } - - def receiveMsg(msg: Any) = { - //Debug.info("" + Thread.currentThread() + ": Resuming " + this) - if (continuation != null) { - val f = continuation - this.synchronized { - continuation = null - scheduled = false - } - f(msg) - die() - } - else { - // use more complex receive-and-return continuation - val cases = contCases - val then = contThen - contCases = null - contThen = null - scheduled = false - val result = cases(msg) - then(result) - die() - } - } - - def receive(f: PartialFunction[Any, Unit]): Nothing = synchronized { - if (isAlive) { - Scheduler.tick(this) - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - continuation = f - scheduleContinuation(msg) - case None => - continuation = f - //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - throw new Done - } - - def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = synchronized { - if (isAlive) { - Scheduler.tick(this) - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => { - continuation = f - scheduleContinuation(msg) - } - case None => - // if timeout == 0 then execute timeout action if specified (see Erlang book) - if (msec == 0) { - if (f.isDefinedAt(TIMEOUT)) { - continuation = f - scheduleContinuation(TIMEOUT) - } - die() - } else { - if (msec > 0) { - TimerThread.requestTimeout(this, msec) - pendingSignal = true - } - continuation = f - //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - } - throw new Done - } - - def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Unit = { - contCases = null - contThen = null - sent.dequeueFirst(cases.isDefinedAt) match { - case Some(msg) => { - val result = cases(msg) - then(result) - die() - } - case None => { - contCases = cases - contThen = then - //Debug.info("No msg found. Saved complex continuation.") - } - } - throw new Done - } - - // receiv {...} then (msg => {...msg...}) - - class ReceiveAndReturn(cases: PartialFunction[Any, Any]) { - def then(body: Any => Unit): Unit = receiveAndReturn(cases, body) - } - - def receiv(cases: PartialFunction[Any, Any]): ReceiveAndReturn = - new ReceiveAndReturn(cases) - - def die() = if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - } -} diff --git a/src/actors/scala/actors/multi/Process.scala b/src/actors/scala/actors/multi/Process.scala deleted file mode 100644 index 6eba3a995a..0000000000 --- a/src/actors/scala/actors/multi/Process.scala +++ /dev/null @@ -1,265 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -import scala.collection.mutable.{HashMap,HashSet,Stack} - -/** - * @author Philipp Haller - */ -object Process { - - def spawn(body: => Unit): Process = { - val p = new Process { - override def run(): Unit = body - } - val task = new ReceiverTask(p, null) { - override def run(): Unit = try { p.start() } - catch { - case d: Done => - // do nothing (continuation is already saved) - } - } - Scheduler.execute(task) - p - } - - def spawnLink(body: => Unit): Process = { - val p = new Process { - override def run(): Unit = body - } - val task = new ReceiverTask(p, null) { - override def run(): Unit = try { p.start() } - catch { - case d: Done => - // do nothing (continuation is already saved) - } - } - self.link(p) - Scheduler.execute(task) - p - } - - def send(p: Process, msg: Any) = - p ! msg - - def receive(f: PartialFunction[Any,Unit]): Nothing = - self.receive(f) - - def receiveWithin(msec: long)(f: PartialFunction[Any,Unit]): Nothing = - self.receiveWithin(msec)(f) - - def self: Process = { - val p = Scheduler.getProcess(Thread.currentThread()) - if (p == null) - error("Self called outside a process") - else - p - } - - def exit(p: Process, reason: Symbol) = - p.exit(reason) -} - -/** - * @author Philipp Haller - */ -class Process extends scala.actors.Process with Actor[Any] { - private val links = new HashSet[scala.actors.Process] - - override def start(): Unit = try { run() } - catch { - case d: Done => // do nothing - case t: Throwable => { - if (!excHandlerDescs.isEmpty) - forwardExc(excHandlerDescs.top, t) - else - exit(new Symbol(t.toString())) - } - } - - case class Exit(from: scala.actors.Process, reason: Symbol) - - def link(to: scala.actors.Process): Unit = { - links += to - to.linkTo(this) - } - - def linkTo(to: scala.actors.Process): Unit = - links += to - - def unlink(from: scala.actors.Process): Unit = { - links -= from - from.unlinkFrom(this) - } - - def unlinkFrom(from: scala.actors.Process): Unit = - links -= from - - private var trapExit = false - - def processFlag(flag: Symbol, set: boolean) = { - if (flag.name.equals("trapExit")) trapExit = set - } - - def exit(reason: Symbol): Unit = { - exitLinked(reason, new HashSet[Process]) - if (isAlive) isAlive = false - } - - def exit(from: scala.actors.Process, reason: Symbol): Unit = { - if (from == this) { - exit(reason) - } - else { - if (trapExit) - this ! Exit(from, reason) - else if (!reason.name.equals("normal")) - exit(reason) - } - } - - def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = { - if (exitMarks contains this) { - // we are marked, do nothing - } - else { - exitMarks += this // mark this as exiting - - // exit linked processes - val iter = links.elements - while (iter.hasNext) { - val linked = iter.next - unlink(linked) - linked.exit(this, reason) - } - exitMarks -= this - } - } - - val excHandlerDescs = new Stack[ExcHandlerDesc] - - // exception handler table - val excHandlers = new HashMap[Int, ExcHandler] - - def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { - // locality check (handler local to this actor?) - if (destDesc.p == this) { - handleExc(destDesc, e) - } - else { - // forward to process of destination descriptor - destDesc.p.handleExc(destDesc, e) - } - } - - def pushExcHandlerDesc(desc: ExcHandlerDesc) = - excHandlerDescs += desc - - /** is only called for local handlers - (i.e. destDesc.p == this) */ - def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = - (excHandlers get destDesc.eid) match { - case Some(handler) => - handler.handle(e) - case None => - error("exc desc refers to non-registered handler") - } - - private var excCnt = 0 - private def freshExcId = { excCnt = excCnt + 1; excCnt } - - def tryAsync(block: => Unit, - handlerFun: PartialFunction[Throwable, Unit]) = { - val excHandler = - new ExcHandler(handlerFun, - this, - if (excHandlerDescs.isEmpty) null - else excHandlerDescs.top) - - val desc = ExcHandlerDesc(this, freshExcId) - // associate desc with handler - excHandlers += desc.eid -> excHandler - // push desc onto stack - excHandlerDescs += desc - //Console.println("desc stack height: " + excHandlerDescs.length) - // execute code block - block - } - - def process(f: PartialFunction[Any,unit], msg: Any): unit = { - try { - f(msg) - } - catch { - case d: Done => - throw new Done - case t: Throwable => - throw t - if (!excHandlerDescs.isEmpty) - forwardExc(excHandlerDescs.top, t) - else - die(Symbol(t.toString())) - } - } - - override def receiveMsg(msg: Any) = { - //Debug.info("" + Thread.currentThread() + ": Resuming " + this) - if (continuation != null) { - val f = continuation - continuation = null - scheduled = false - process(f, msg) - die() - } - else { - // use more complex receive-and-return continuation - val cases = contCases - val then = contThen - contCases = null - contThen = null - scheduled = false - val result = cases(msg) - then(result) - die() - } - } - - def die(reason: Symbol) = { - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - exit(reason) - } - } - - override def die() = { - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - exit('normal) - } - } -} - -case class ExcHandlerDesc(p: Process, eid: Int) - -class ExcHandler(actions: PartialFunction[Throwable, Unit], - p: Process, - parent: ExcHandlerDesc) { - def handle(e: Throwable): Unit = { - if (!actions.isDefinedAt(e)) { - if (parent != null) p.forwardExc(parent, e) - } - else - actions(e) - } -} diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala deleted file mode 100644 index 348c2247d3..0000000000 --- a/src/actors/scala/actors/multi/ReceiverTask.scala +++ /dev/null @@ -1,27 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -/** - * @author Philipp Haller - */ -class ReceiverTask(val actor: MailBox, msg: Any) extends Runnable { - def run(): Unit = { - Scheduler.setProcess(Thread.currentThread(), actor) - try { - actor receiveMsg msg - } - catch { - case d: Done => - // do nothing (continuation is already saved) - } - } -} diff --git a/src/actors/scala/actors/multi/Scheduler.scala b/src/actors/scala/actors/multi/Scheduler.scala deleted file mode 100644 index 20d68c648a..0000000000 --- a/src/actors/scala/actors/multi/Scheduler.scala +++ /dev/null @@ -1,262 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -import scala.collection.mutable._ - -/** - * @author Philipp Haller - */ -object Scheduler /*extends java.util.concurrent.Executor*/ { - private var sched: /*java.util.concurrent.Executor*/ IScheduler = - //java.util.concurrent.Executors.newFixedThreadPool(4); - //new FixedWorkersScheduler(2); - new SpareWorkerScheduler2 - //new SpareWorkerScheduler - - def impl = sched - def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = { - Debug.info("Using scheduler " + scheduler) - sched = scheduler - } - - def execute(item: ReceiverTask) = synchronized { - sched.execute(item) - } - - def tick(a: MailBox) = { - sched.tick(a) - } - - private val process = new HashMap[Thread, MailBox] - - def getProcess(t: Thread): Process = synchronized { - process.get(t) match { - case None => null - case Some(p: Process) => p - } - } - - def setProcess(t: Thread, m: MailBox) = synchronized { - process.update(t, m) - } -} - -/** - * @author Philipp Haller - */ -abstract class IScheduler /*extends java.util.concurrent.Executor*/ { - def execute(item: ReceiverTask): Unit - def getTask(worker: WorkerThread): Runnable - def tick(a: MailBox): Unit - - val QUIT_TASK = new Runnable() { - def run(): Unit = {} - override def toString() = "QUIT_TASK" - } -} - -/** - * @author Philipp Haller - */ -class SpareWorkerScheduler2 extends IScheduler { - private val tasks = new Queue[ReceiverTask]; - private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; - - val idle = new Queue[WorkerThread]; - val ticks = new HashMap[WorkerThread, long] - val executing = new HashMap[MailBox, WorkerThread] - - var TICKFREQ = 50 - - def init = { - for (val i <- List.range(0, 2)) { - val worker = new WorkerThread(this) - workers += worker - worker.start() - } - } - init; - - var maxWorkers = 0; - var ticksCnt = 0; - - def tick(a: MailBox): unit = synchronized { - ticksCnt = ticksCnt + 1 - executing.get(a) match { - case None => // thread outside of scheduler; error("No worker thread associated with actor " + a) - case Some(wt) => - ticks.update(wt, System.currentTimeMillis) - } - } - - def execute(item: ReceiverTask): unit = synchronized { - if (idle.length > 0) { - val wt = idle.dequeue - executing.update(item.actor, wt) - wt.execute(item) - } - else { - /* only create new worker thread - when all are blocked according to heuristic - - we check time stamps of latest send/receive ops - of ALL workers - - we stop if there is one which is not blocked */ - - val iter = workers.elements - var foundBusy = false - while (iter.hasNext && !foundBusy) { - val wt = iter.next - ticks.get(wt) match { - case None => foundBusy = true // assume not blocked - case Some(ts) => { - val currTime = System.currentTimeMillis - if (currTime - ts < TICKFREQ) - foundBusy = true - } - } - } - - if (!foundBusy) { - val newWorker = new WorkerThread(this) - workers += newWorker - maxWorkers = workers.length // statistics - - executing.update(item.actor, newWorker) - - newWorker.execute(item) - newWorker.start() - } - else { - // wait assuming busy thread will be finished soon - // and ask for more work - tasks += item - } - } - } - - def getTask(worker: WorkerThread) = synchronized { - if (tasks.length > 0) { - val item = tasks.dequeue - executing.update(item.actor, worker) - item - } - else { - idle += worker - null - } - } -} - -/** - * @author Philipp Haller - */ -class SpareWorkerScheduler extends IScheduler { - private var canQuit = false; - private val tasks = new Queue[ReceiverTask]; - private val idle = new Queue[WorkerThread]; - - private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; - - def init = { - for (val i <- List.range(0, 2)) { - val worker = new WorkerThread(this) - workers += worker - worker.start() - } - } - init; - - var maxWorkers = 0; - - def execute(item: ReceiverTask): unit = synchronized { - if (idle.length == 0) { - tasks += item - // create new worker - val newWorker = new WorkerThread(this) - workers += newWorker - maxWorkers = workers.length - newWorker.start() - //newWorker.execute(item) - } - else { - canQuit = true - idle.dequeue.execute(item) - } - } - - def getTask(worker: WorkerThread) = synchronized { - if (tasks.length > 0) tasks.dequeue - else { - idle += worker - null - //if ((idle.length == workers.length) && canQuit) haltExcept(worker) - //else null - } - } - - def tick(a: MailBox): unit = {} - - def haltExcept(w: WorkerThread) = { - for (val i <- List.range(0, workers.length)) - if (workers(i) != w) workers(i).halt - QUIT_TASK - } -} - -/** - * @author Philipp Haller - */ -abstract class FixedWorkersScheduler(workercnt: int) extends IScheduler { - private var canQuit = false; - private val tasks = new Queue[ReceiverTask]; - private val idle = new Queue[WorkerThread]; - - //Console.println("Running with " + workercnt + " workers") - private var workers = new Array[WorkerThread](workercnt); - - def init = { - for (val i <- List.range(0, workers.length)) { - workers(i) = new WorkerThread(this) - workers(i).start() - } - } - init; - - def execute(item: ReceiverTask): unit = synchronized { - if (workers.length == 0) item.run - else { - canQuit = true - if (idle.length > 0) idle.dequeue.execute(item) - else tasks += item - } - } - - def getTask(worker: WorkerThread) = synchronized { - if (tasks.length > 0) tasks.dequeue - else { - idle += worker - null - //if ((idle.length == workers.length) && canQuit) haltExcept(worker) - //else null - } - } - - def tick(a: MailBox): unit = {} - - def haltExcept(w: WorkerThread) = { - for (val i <- List.range(0, workers.length)) - if (workers(i) != w) workers(i).halt - QUIT_TASK - } -} diff --git a/src/actors/scala/actors/multi/TimerThread.scala b/src/actors/scala/actors/multi/TimerThread.scala deleted file mode 100644 index d344e7acef..0000000000 --- a/src/actors/scala/actors/multi/TimerThread.scala +++ /dev/null @@ -1,162 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -import scala.collection.mutable.PriorityQueue - -/** - * This class allows the (locl) sending of a message to an actor after - * a timeout. Used by the library to build receiveWithin(time:long). - * Note that the library deletes non received TIMEOUT() message if a - * messsage is received before the time-out occurs. - * - * @author Sebastien Noir - */ -case class Signal() - -object TimerThread extends AnyRef with Runnable { - case class WakedActor(actor: MailBox, time: long, reason: String) extends Ordered[WakedActor] { - var valid = true - def compare (that: WakedActor): int = -(this.time compare that.time) - } - - var queue = new PriorityQueue[WakedActor] - val t = new Thread(this); t.start - - var lateList: List[WakedActor] = Nil - - def trashRequest(a: MailBox) = synchronized { - // keep in mind: killing dead people is a bad idea! - queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { - case Some(b) => - b.valid = false - case None => - lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match { - case Some(b2) => - b2.valid = false - case None => - } - } - } - - override def run = while (true) { - this.synchronized { - try { - val sleepTime = dequeueLateAndGetSleepTime - if (lateList.isEmpty) { - wait(sleepTime) - } - } catch { - case t: Throwable => { t.printStackTrace(); throw t } - } - } - - // process guys waiting for signal and empty list - for (val wa <- lateList) { - if (wa.valid) { - wa.actor send Signal() - } - } - lateList = Nil - } - - def requestSignal(a: MailBox, waitMillis: long, reason: String): unit = this.synchronized { - Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason) - val wakeTime = now + waitMillis - if (waitMillis < 0) { - a send Signal() - return - } - - if (queue.isEmpty) { // add to queue and restart sleeping - queue += WakedActor(a, wakeTime, reason) - notify() - } else { //queue full - if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping - queue += WakedActor (a, wakeTime, reason) - notify() - } else { // simply add to queue - - queue += WakedActor (a, wakeTime, reason) - } - } - } - - def requestTimeout(a: MailBox, waitMillis: long): unit = synchronized { - val wakeTime = now + waitMillis - if (waitMillis < 0) { - a send Signal() - return - } - - if (queue.isEmpty) { // add to queue and restart sleeping - queue += WakedActor(a, wakeTime, "") - notify() - } else - if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping - queue += WakedActor (a, wakeTime, "") - notify() - } - else // simply add to queue - queue += WakedActor (a, wakeTime, "") - } - - private def dequeueLateAndGetSleepTime: long = { - val FOREVER: long = 0 - var waitingList: List[WakedActor] = Nil - - while (!queue.isEmpty) { - val next = queue.max.time - val amount = next - now - if (amount > 0) { // guy in queue is not late - lateList = waitingList // give back the list of waiting guys for signaling - return amount - } - else // we're late: dequeue and examine next guy - waitingList = queue.dequeue :: waitingList - } - - // empty queue => sleep forever - lateList = waitingList - return FOREVER - } - - def now = new java.util.Date().getTime() -} - -//================================================================================ - -object TimerThreadTest { - def main (args:Array[String]) = { - new Tester (1000, "ONE").start - new Tester (500, "TWO").start - } - - class Tester (duration : int, name:String) extends multi.Actor[Any] { - var i = 0 - - def loop:unit = { - receive { - case Signal() => - Console.println(name + i) - i = i+1; - loop - } - } - - override def run = { - for (val i <-List.range(1,10)) { - TimerThread.requestSignal(this, (duration * i).asInstanceOf[long], ""+duration*i) - } - loop - } - } -} diff --git a/src/actors/scala/actors/multi/WorkerThread.scala b/src/actors/scala/actors/multi/WorkerThread.scala deleted file mode 100644 index 8c5a4ecb1e..0000000000 --- a/src/actors/scala/actors/multi/WorkerThread.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -/** - * @author Philipp Haller - */ -class WorkerThread(sched: IScheduler) extends Thread { - private var task: Runnable = null - private var running = true - - def halt = synchronized { - running = false - notify() - } - - def execute(r: Runnable) = synchronized { - //Debug.info("WORK: " + this + ": Executing task " + r) - task = r - notify() - } - - override def run(): Unit = synchronized { - while (running) { - if (task != null) { - task.run() - //Debug.info("WORK: " + this + " has finished.") - } - //Debug.info("WORK: " + this + ": Getting new task...") - task = sched.getTask(this) - //Debug.info("WORK (" + this + "): got task " + task) - if (task == sched.QUIT_TASK) running = false - else if (task == null) wait() - } - } -} diff --git a/src/actors/scala/actors/single/Actor.scala b/src/actors/scala/actors/single/Actor.scala deleted file mode 100644 index b6fe13a7de..0000000000 --- a/src/actors/scala/actors/single/Actor.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.single - -/** - * @author Philipp Haller - */ -trait Actor[T] extends scala.actors.Actor[T] with MailBox { - def run(): Unit = {} - - def start(): Unit = try { run() } - catch { - case d: Done => // do nothing - } - - def !(msg: T): Unit = send(msg) -} diff --git a/src/actors/scala/actors/single/MailBox.scala b/src/actors/scala/actors/single/MailBox.scala deleted file mode 100644 index e46503c472..0000000000 --- a/src/actors/scala/actors/single/MailBox.scala +++ /dev/null @@ -1,157 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.single - -import scala.collection.mutable.Queue - -/** - * @author Philipp Haller - */ -trait MailBox { - /** Unconsumed messages. */ - var sent = new Queue[Any] - - var continuation: PartialFunction[Any,Unit] = null - // more complex continuation - var contCases: PartialFunction[Any,Any] = null - var contThen: Any => unit = null - - def hasCont = - if ((continuation == null) && (contCases == null)) false - else true - - def contDefinedAt(msg: Any) = - if (((continuation != null) && continuation.isDefinedAt(msg)) || - ((contCases != null) && contCases.isDefinedAt(msg))) - true - else - false - - var isAlive = true - - private var duration: Long = 0 - private var timeInitial: Long = 0 - private var timeoutEnabled: Boolean = false - - def send(msg: Any): Unit = synchronized { - if (isAlive) - if (!hasCont) { - Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") - sent += msg - } - else { - var message = msg - var timeoutOccurred = false - - if (timeoutEnabled && (System.currentTimeMillis() - timeInitial > duration)) - timeoutOccurred = true - - if (timeoutOccurred && !contDefinedAt(TIMEOUT)) - die() - else { - if (timeoutOccurred) message = TIMEOUT - - if (contDefinedAt(message)) { - // we exit receive, so reset timeoutEnabled - timeoutEnabled = false - - try { - if (continuation != null) { - val f = continuation - continuation = null - f(msg) - die() - } - else { - // use more complex receive-and-return continuation - val cases = contCases - val then = contThen - contCases = null - contThen = null - val result = cases(msg) - then(result) - die() - } - } - catch { - case d: Done => - // do nothing (continuation is already saved) - } - } - else { - Debug.info("cont not defined at msg. appending to mailbox.") - if (!timeoutOccurred) sent += message - } - } - } - } - - def receive(f: PartialFunction[Any, Unit]): Nothing = { - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - f(msg) - die() - case None => - continuation = f - Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - throw new Done - } - - def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { - timeInitial = System.currentTimeMillis() - duration = msec - - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - f(msg) - die() - case None => - // if timeout == 0 then execute timeout action if specified (see Erlang book) - if (duration == 0) { - if (f.isDefinedAt(TIMEOUT)) - f(TIMEOUT) - die() - } - else { - timeoutEnabled = true - continuation = f - Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - throw new Done - } - - def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Nothing = { - contCases = null - contThen = null - sent.dequeueFirst(cases.isDefinedAt) match { - case Some(msg) => { - val result = cases(msg) - then(result) - die() - } - case None => { - contCases = cases - contThen = then - Debug.info("No msg found. Saved complex continuation.") - } - } - throw new Done - } - - def die() = { - isAlive = false - Debug.info("" + this + " died.") - } -} diff --git a/src/actors/scala/actors/single/Process.scala b/src/actors/scala/actors/single/Process.scala deleted file mode 100644 index 84efa1e198..0000000000 --- a/src/actors/scala/actors/single/Process.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.single - -import scala.collection.mutable.HashSet - -/** - * @author Philipp Haller - */ -class Process extends scala.actors.Process with Actor[Any] { - case class Exit(from: scala.actors.Process, reason: Symbol) - - private val links = new HashSet[scala.actors.Process] - - def link(to: scala.actors.Process): Unit = { - links += to - to.linkTo(this) - } - - def linkTo(to: scala.actors.Process): Unit = links += to - - def unlink(from: scala.actors.Process): Unit = { - links -= from - from.unlinkFrom(this) - } - - def unlinkFrom(from: scala.actors.Process): Unit = links -= from - - private var trapExit = false - - def processFlag(flag: Symbol, set: boolean) = { - if (flag.name.equals("trapExit")) trapExit = set - } - - def exit(reason: Symbol): Unit = { - exitLinked(reason, new HashSet[Process]) - if (isAlive) isAlive = false - } - - def exit(from: scala.actors.Process, reason: Symbol): Unit = { - if (from == this) { - exit(reason) - } - else { - if (trapExit) - this ! Exit(from, reason) - else if (!reason.name.equals("normal")) - exit(reason) - } - } - - def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = { - if (exitMarks contains this) { - // we are marked, do nothing - } - else { - exitMarks += this // mark this as exiting - - // exit linked processes - val iter = links.elements - while (iter.hasNext) { - val linked = iter.next - unlink(linked) - linked.exit(this, reason) - } - exitMarks -= this - } - } -} diff --git a/src/actors/scala/actors/threads/Actor.scala b/src/actors/scala/actors/threads/Actor.scala deleted file mode 100644 index cf0f7988e4..0000000000 --- a/src/actors/scala/actors/threads/Actor.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.threads - -/** - * The class Actor ... - * - * @author Martin Odersky - * @version 1.0 - */ -trait Actor[T] extends Thread with scala.actors.Actor[T] { - private val in = new MailBox - - def !(msg: T): Unit = - in.send(msg) - - def send(msg: T): Unit = - in.send(msg) - - def receive[a](f: PartialFunction[Any, a]): a = - if (Thread.currentThread() == this) in.receive(f) - else error("receive called not on own process") - - def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a = - if (Thread.currentThread() == this) in.receiveWithin(msec)(f) - else error("receiveWithin called not on own process") -} diff --git a/src/actors/scala/actors/threads/MailBox.scala b/src/actors/scala/actors/threads/MailBox.scala deleted file mode 100644 index 61231a3bee..0000000000 --- a/src/actors/scala/actors/threads/MailBox.scala +++ /dev/null @@ -1,176 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - - -package scala.actors.threads; - -/** - * The class MailBox ... - * - * @author Martin Odersky - * @version 1.0 - */ -class MailBox extends AnyRef with ListQueueCreator { - - type Message = Any; - - private abstract class PreReceiver { - var msg: Message = null; - def isDefinedAt(msg: Message): boolean; - } - - private class Receiver[a](receiver: PartialFunction[Message, a]) extends PreReceiver { - - def isDefinedAt(msg: Message) = receiver.isDefinedAt(msg); - - def receive(): a = synchronized { - while (msg == null) wait(); - receiver(msg) - } - - def receiveWithin(msec: long): a = synchronized { - if (msg == null) wait(msec); - receiver(if (msg != null) msg else TIMEOUT) - } - } - - private val messageQueue = queueCreate[Message]; - private val receiverQueue = queueCreate[PreReceiver]; - - /** Unconsumed messages. */ - private var sent = messageQueue.make; - - /** Pending receivers. */ - private var receivers = receiverQueue.make; - - /** - * Check whether the receiver can be applied to an unconsumed message. - * If yes, the message is extracted and associated with the receiver. - * Otherwise the receiver is appended to the list of pending receivers. - */ - private def scanSentMsgs[a](receiver: Receiver[a]): unit = synchronized { - messageQueue.extractFirst(sent, msg => receiver.isDefinedAt(msg)) match { - case None => receivers = receiverQueue.append(receivers, receiver) - case Some(Pair(msg, withoutMsg)) => { - sent = withoutMsg; - receiver.msg = msg - } - } - } - - /** - * First check whether a pending receiver is applicable to the sent - * message. If yes, the receiver is notified. Otherwise the message - * is appended to the linked list of sent messages. - */ - def send(msg: Message): unit = synchronized { - receiverQueue.extractFirst(receivers, r => r.isDefinedAt(msg)) match { - case None => sent = messageQueue.append(sent, msg) - case Some(Pair(receiver, withoutReceiver)) => { - receivers = withoutReceiver; - receiver.msg = msg; - receiver synchronized { receiver.notify() }; - } - } - } - - /** - * Block until there is a message in the mailbox for which the processor - * f is defined. - */ - def receive[a](f: PartialFunction[Message, a]): a = { - val r = new Receiver(f); - scanSentMsgs(r); - r.receive() - } - - /** - * Block until there is a message in the mailbox for which the processor - * f is defined or the timeout is over. - */ - def receiveWithin[a](msec: long)(f: PartialFunction[Message, a]): a = { - val r = new Receiver(f); - scanSentMsgs(r); - r.receiveWithin(msec) - } - -} - -///////////////////////////////////////////////////////////////// - -/** -* Module for dealing with queues. -*/ -trait QueueModule[a] { - /** Type of queues. */ - type t; - /** Create an empty queue. */ - def make: t; - /** Append an element to a queue. */ - def append(l: t, x: a): t; - /** Extract an element satisfying a predicate from a queue. */ - def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]]; -} - -/** Inefficient but simple queue module creator. */ -trait ListQueueCreator { - def queueCreate[a]: QueueModule[a] = new QueueModule[a] { - type t = List[a]; - def make: t = Nil; - def append(l: t, x: a): t = l ::: x :: Nil; - def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = - l match { - case Nil => None - case head :: tail => - if (p(head)) - Some(Pair(head, tail)) - else - extractFirst(tail, p) match { - case None => None - case Some(Pair(x, without_x)) => Some(Pair(x, head :: without_x)) - } - } - } -} - -/** Efficient queue module creator based on linked lists. */ -trait LinkedListQueueCreator { - import scala.collection.mutable.LinkedList; - def queueCreate[a >: Null <: AnyRef]: QueueModule[a] = new QueueModule[a] { - type t = Pair[LinkedList[a], LinkedList[a]]; // fst = the list, snd = last elem - def make: t = { - val l = new LinkedList[a](null, null); - Pair(l, l) - } - def append(l: t, x: a): t = { - val atTail = new LinkedList(x, null); - l._2 append atTail; - Pair(l._1, atTail) - } - def extractFirst(l: t, p: a => boolean): Option[Pair[a, t]] = { - var xs = l._1; - var xs1 = xs.next; - while (xs1 != null && !p(xs1.elem)) { - xs = xs1; - xs1 = xs1.next; - } - if (xs1 != null) { - xs.next = xs1.next; - if (xs.next == null) - Some(Pair(xs1.elem, Pair(l._1, xs))) - else - Some(Pair(xs1.elem, l)) - } - else - None - } - } -} - diff --git a/src/actors/scala/actors/threads/NameServer.scala b/src/actors/scala/actors/threads/NameServer.scala deleted file mode 100644 index a29159202e..0000000000 --- a/src/actors/scala/actors/threads/NameServer.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - - -package scala.actors.threads; - - -object NameServer { - - val names = new scala.collection.mutable.HashMap[Symbol, Process]; - - def register(name: Symbol, proc: Process) = { - if (names.contains(name)) error("Name:" + name + " already registred"); - names += name -> proc; - } - - def unregister(name: Symbol) = { - if (names.contains(name)) - names -= name; - else - error("Name:" + name + " not registred"); - } - - def whereis(name: Symbol): Option[Process] = - names.get(name); - - def send(name: Symbol, msg: Any) = - names(name).send(msg); - -} diff --git a/src/actors/scala/actors/threads/Process.scala b/src/actors/scala/actors/threads/Process.scala deleted file mode 100644 index 22997a686b..0000000000 --- a/src/actors/scala/actors/threads/Process.scala +++ /dev/null @@ -1,99 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2003-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - - -package scala.actors.threads; - -/** - * The object Process ... - * - * @author Martin Odersky - * @version 1.0 - */ -object Process { - - def spawn(body: => Unit): Process = { - val p = new Process { - override def run(): Unit = body - } - p.start(); - p - } - - def spawnLink(body: => Unit): Process = { - val p = new Process { - override def run(): Unit = body - } - self.link(p) - p.start() - p - } - - def send(p: Process, msg: Any) = - p ! msg - - def receive[a](f: PartialFunction[Any, a]): a = - self.receive(f); - - def receiveWithin[a](msec: long)(f: PartialFunction[Any, a]): a = - self.receiveWithin(msec)(f); - - def self: Process = - if (Thread.currentThread().isInstanceOf[Process]) - Thread.currentThread().asInstanceOf[Process] - else - error("Self called outside a process"); - - def exit(p: Process, reason: Symbol) = - p.exit(reason); - -} - -/** - * The class Process ... - * - * @author Martin Odersky - * @version 1.0 - */ -class Process extends Actor[Any] with scala.actors.Process { - private var exitReason: AnyRef = null; - private var links: List[scala.actors.Process] = Nil; - - override def start(): Unit = try { run() } - catch { - case _: java.lang.InterruptedException => - signal(exitReason) - case (exitSignal) => - signal(exitSignal) - } - - private def signal(s: Any) = - links.foreach { p: scala.actors.Process => p ! Triple('EXIT, this, s) } - - def link(p: scala.actors.Process): Unit = - links = p::links; - - def linkTo(to: scala.actors.Process): Unit = - links = to::links; - - // TODO - def unlink(from: scala.actors.Process): Unit = {} - - // TODO - def unlinkFrom(from: scala.actors.Process): Unit = {} - - def exit(reason: Symbol): Unit = { - exitReason = reason; - interrupt() - } - - def exit(from: scala.actors.Process, reason: Symbol): Unit = - from.exit(reason) -} -- cgit v1.2.3