diff options
author | Philipp Haller <hallerp@gmail.com> | 2006-07-14 12:45:15 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2006-07-14 12:45:15 +0000 |
commit | 7796d36f0bf884d8022590b2b66c706547fd8d39 (patch) | |
tree | cf36d16c337dbc8be8777f1d8c0ecae08c54938b | |
parent | 9d95c090f440956f856468dc59bdfd0ff560314b (diff) | |
download | scala-7796d36f0bf884d8022590b2b66c706547fd8d39.tar.gz scala-7796d36f0bf884d8022590b2b66c706547fd8d39.tar.bz2 scala-7796d36f0bf884d8022590b2b66c706547fd8d39.zip |
Refactored actors lib.
32 files changed, 506 insertions, 1938 deletions
diff --git a/src/actors/scala/actors/multi/AbstractPid.scala b/src/actors/scala/actors/Actor.scala index 45133c3918..593681158c 100644 --- a/src/actors/scala/actors/multi/AbstractPid.scala +++ b/src/actors/scala/actors/Actor.scala @@ -6,15 +6,13 @@ ** |/ ** \* */ -// $Id$ - -package scala.actors.multi +package scala.actors /** * @author Philipp Haller */ -trait AbstractPid { - def !(msg: MailBox#Message): Unit - def become(clos: Actor => Unit): Unit - def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit +trait Actor { + def run(): Unit + def start(): Unit + def !(msg: Any): Unit } diff --git a/src/actors/scala/actors/Done.scala b/src/actors/scala/actors/Done.scala index e0ef9ba057..2291df6399 100644 --- a/src/actors/scala/actors/Done.scala +++ b/src/actors/scala/actors/Done.scala @@ -17,13 +17,3 @@ class Done extends Throwable { override def fillInStackTrace(): Throwable = this; } - -class ContinueException extends Throwable { - override def fillInStackTrace(): Throwable = - this; -} - -class AbortException extends Throwable { - override def fillInStackTrace(): Throwable = - this; -} diff --git a/src/actors/scala/actors/single/Pid.scala b/src/actors/scala/actors/Process.scala index 8c92568b6d..e79e2e2730 100644 --- a/src/actors/scala/actors/single/Pid.scala +++ b/src/actors/scala/actors/Process.scala @@ -6,15 +6,16 @@ ** |/ ** \* */ -// $Id$ - -package scala.actors.single +package scala.actors /** * @author Philipp Haller */ -abstract class Pid { - def !(msg: MailBox#Message): Unit - //def become(clos: Actor => Unit): Unit - //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit +trait Process extends Actor { + def link(to: Process): Unit + def linkTo(to: Process): Unit + def unlink(from: Process): Unit + def unlinkFrom(from: Process): Unit + def exit(reason: Symbol): Unit + def exit(from: Process, reason: Symbol): Unit } diff --git a/src/actors/scala/actors/SPanel.scala b/src/actors/scala/actors/SPanel.scala deleted file mode 100644 index b785907722..0000000000 --- a/src/actors/scala/actors/SPanel.scala +++ /dev/null @@ -1,80 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors - -import java.awt._ -import java.awt.event._ -import javax.swing.event._ -import javax.swing._ -import java.io._ - -/* - * SPanel.scala - * GUI for simple texts - * - * @author Sebastien Noir - */ -class SPanel(WIDTH: Int, HEIGHT: Int, title: String) extends JFrame { - - private var textArea:JTextArea = null - private var resetButton:JButton = null - - private var scrollPane:JScrollPane = null - private var panel:JPanel = null - private var contentPane:Container = null - private var levelChoice:JComboBox = null - private var formatChoice:JComboBox = null - - //init - setTitle(title); - setSize(WIDTH, HEIGHT); - this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); - - contentPane = getContentPane(); - panel = new JPanel(); - contentPane.add(panel, BorderLayout.SOUTH); - - // Add a text area with scroll bars - textArea = new JTextArea(8, 40); - scrollPane = new JScrollPane(textArea); - contentPane.add(scrollPane, BorderLayout.CENTER); - - // Add a "reset textarea" button - resetButton = new JButton("Clear"); - panel.add(resetButton); - resetButton.addActionListener( - new ActionListener() { - def actionPerformed(evt:ActionEvent):Unit= { - textArea.setText(""); - } - } - ); - - this.repaint() - this.show() - - def addText(text: String): Unit = { - textArea.append(text+'\n'); - - if ( textArea.getHeight() > scrollPane.getHeight() ) { - scrollPane.getVerticalScrollBar().setValue(scrollPane.getVerticalScrollBar().getMaximum()); - } - repaint(); - } -/* - def actionPerformed(ActionEvent e):Unit { - } -*/ - - override def paint(g: Graphics): Unit = - super.paint(g) - -} diff --git a/src/actors/scala/actors/single/AbstractPid.scala b/src/actors/scala/actors/TIMEOUT.scala index 74aa004567..63a4ad322f 100644 --- a/src/actors/scala/actors/single/AbstractPid.scala +++ b/src/actors/scala/actors/TIMEOUT.scala @@ -6,15 +6,6 @@ ** |/ ** \* */ -// $Id$ +package scala.actors -package scala.actors.single - -/** - * @author Philipp Haller - */ -trait AbstractPid { - def !(msg: MailBox#Message): Unit - def become(clos: Actor => Unit): Unit - def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit -} +case class TIMEOUT() diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala index a4774503cf..5b07e5e9c8 100644 --- a/src/actors/scala/actors/distributed/JavaSerializer.scala +++ b/src/actors/scala/actors/distributed/JavaSerializer.scala @@ -12,7 +12,7 @@ package scala.actors.distributed import java.io._ -import scala.actors.distributed.picklers.BytePickle.SPU +import scala.io.BytePickle.SPU import scala.actors.multi.Pid [serializable] @@ -36,6 +36,6 @@ class JavaSerializer(serv: Service) extends Serializer(serv) { in.readObject() } - def pid: SPU[Pid] = null + def pid: SPU[RemotePid] = null def addRep(name: String, repCons: Serializer => AnyRef): Unit = {} } diff --git a/src/actors/scala/actors/distributed/MessagesComb.scala b/src/actors/scala/actors/distributed/MessagesComb.scala index 31fe01ebc7..ea30e7e5b4 100644 --- a/src/actors/scala/actors/distributed/MessagesComb.scala +++ b/src/actors/scala/actors/distributed/MessagesComb.scala @@ -10,40 +10,39 @@ package scala.actors.distributed -import scala.actors.distributed.picklers.BytePickle._ -import scala.actors.multi.Pid +import scala.io.BytePickle._ /** * @author Philipp Haller */ object MessagesComb { def sendPU(ser: Serializer): SPU[Send] = - wrap((p: Pair[Pid,Array[byte]]) => Send(p._1, p._2), + wrap((p: Pair[RemotePid,Array[byte]]) => Send(p._1, p._2), (s: Send) => Pair(s.rec, s.data), pair(ser.pid, bytearray)); + def namedSendPU: SPU[NamedSend] = + wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2), + (ns: NamedSend) => Pair(ns.sym, ns.data), + pair(symbolPU, bytearray)); + def spawnPU(ser: Serializer): SPU[Spawn] = - wrap((p: Pair[Pid,String]) => Spawn(p._1, p._2), + wrap((p: Pair[RemotePid,String]) => Spawn(p._1, p._2), (s: Spawn) => Pair(s.replyto, s.p), pair(ser.pid, string)); - def symbolPU: SPU[Symbol] = - wrap((s: String) => Symbol(s), - (sym: Symbol) => sym.name, - string); + def spawnObjectPU(ser: Serializer): SPU[SpawnObject] = + wrap((p: Pair[RemotePid,Array[byte]]) => SpawnObject(p._1, p._2), + (s: SpawnObject) => Pair(s.replyto, s.data), + pair(ser.pid, bytearray)); def exitPU(ser: Serializer): SPU[Exit1] = - wrap((p: Triple[Pid,Pid,Symbol]) => Exit1(p._1, p._2, p._3), + wrap((p: Triple[RemotePid,RemotePid,Symbol]) => Exit1(p._1, p._2, p._3), (e: Exit1) => Triple(e.from, e.to, e.reason), triple(ser.pid, ser.pid, symbolPU)); - def spawnObjectPU(ser: Serializer): SPU[SpawnObject] = - wrap((p: Pair[Pid,Array[byte]]) => SpawnObject(p._1, p._2), - (s: SpawnObject) => Pair(s.replyto, s.data), - pair(ser.pid, bytearray)); - - def namedSendPU: SPU[NamedSend] = - wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2), - (ns: NamedSend) => Pair(ns.sym, ns.data), - pair(symbolPU, bytearray)); + def symbolPU: SPU[Symbol] = + wrap((s: String) => Symbol(s), + (sym: Symbol) => sym.name, + string); } diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala index 8d479df377..8cdc2ffdca 100644 --- a/src/actors/scala/actors/distributed/NetKernel.scala +++ b/src/actors/scala/actors/distributed/NetKernel.scala @@ -55,7 +55,7 @@ class NetKernel(service: Service) { /** only called if destDesc is local. */ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = - destDesc.pid match { + destDesc.p match { case rpid: RemotePid => (rtable get rpid.localId) match { case Some(actor) => @@ -67,7 +67,7 @@ class NetKernel(service: Service) { def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = // locality check (handler local to this node?) - destDesc.pid match { + destDesc.p match { case rpid: RemotePid => if (rpid.node == this.node) handleExc(destDesc, e) @@ -121,9 +121,12 @@ class NetKernel(service: Service) { def localSend(pid: RemotePid, msg: AnyRef): Unit = localSend(pid.localId, msg) - def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized { + def remoteSend(pid: RemotePid, msg: Any) = synchronized { //Console.println("NetKernel: Remote msg delivery to " + pid) - service.remoteSend(pid, msg) + msg match { + case m: AnyRef => + service.remoteSend(pid, m) + } } def namedSend(name: Name, msg: AnyRef): Unit = diff --git a/src/actors/scala/actors/distributed/NodeComb.scala b/src/actors/scala/actors/distributed/NodeComb.scala index b3a0c44bc7..5866c87983 100644 --- a/src/actors/scala/actors/distributed/NodeComb.scala +++ b/src/actors/scala/actors/distributed/NodeComb.scala @@ -10,7 +10,7 @@ package scala.actors.distributed -import scala.actors.distributed.picklers.BytePickle._ +import scala.io.BytePickle._ /** * @author Philipp Haller diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala index 51e6c6690a..7920817e95 100644 --- a/src/actors/scala/actors/distributed/RemoteActor.scala +++ b/src/actors/scala/actors/distributed/RemoteActor.scala @@ -10,7 +10,7 @@ package scala.actors.distributed -import scala.actors.multi.{MailBox,Actor,Pid,LocalPid,ExcHandlerDesc} +import scala.actors.multi.{MailBox,Process,ExcHandlerDesc} import scala.collection.mutable.{HashMap,Stack} abstract class ServiceName @@ -20,17 +20,17 @@ case class TCP() extends ServiceName /** * @author Philipp Haller */ -class RemoteActor extends Actor { +class RemoteActor extends Process { override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { // locality check (handler local to this actor?) - if (destDesc.pid == self) + if (destDesc.p == this) handleExc(destDesc, e) else kernel.forwardExc(destDesc, e) } - override def receive(f: PartialFunction[Message,Unit]): scala.All = { + override def receive(f: PartialFunction[Any,Unit]): scala.All = { if (isAlive) { continuation = null sent.dequeueFirst(f.isDefinedAt) match { @@ -64,7 +64,7 @@ class RemoteActor extends Actor { private var selfCached: RemotePid = null - override def self: RemotePid = { + def self: RemotePid = { if (selfCached == null) selfCached = kernel pidOf this selfCached @@ -103,10 +103,7 @@ class RemoteActor extends Actor { selfCached = service.kernel.register(this) } - def node(pid: Pid): Node = pid match { - case rpid: RemotePid => rpid.node - case lpid: LocalPid => null - } + def node(pid: RemotePid) = pid.node def disconnectNode(node: Node) = kernel.disconnectNode(node) diff --git a/src/actors/scala/actors/distributed/RemotePid.scala b/src/actors/scala/actors/distributed/RemotePid.scala index ec3df043ba..a597411dde 100644 --- a/src/actors/scala/actors/distributed/RemotePid.scala +++ b/src/actors/scala/actors/distributed/RemotePid.scala @@ -10,7 +10,7 @@ package scala.actors.distributed -import scala.actors.multi.{Pid,MailBox,ExcHandlerDesc} +import scala.actors.multi.{MailBox,ExcHandlerDesc} import java.io._ @@ -18,7 +18,7 @@ import java.io._ * @author Philipp Haller */ [serializable] -abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends Pid { +abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends scala.actors.multi.Process { def this() = this(0, null, null) // for serialization private var _locId = locId @@ -54,7 +54,7 @@ abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extend def kernel = kern; - def !(msg: MailBox#Message): unit = { + override def !(msg: Any): unit = { //Console.println("! " + msg) if (actor != null) actor send msg @@ -62,41 +62,41 @@ abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extend kernel.remoteSend(this, msg) } - def link(other: Pid): unit = + override def link(other: Process): Unit = other match { case rpid: RemotePid => kernel.link(this, rpid) }; //TODO (dont know if this is local to kernel.node) - def linkTo(other: Pid): unit = + override def linkTo(other: Process): Unit = other match { case rpid: RemotePid => // do nothing }; - def unlink(other: Pid): unit = + override def unlink(other: Process): Unit = other match { case rpid: RemotePid => kernel.unlink(this, rpid) }; //TODO (dont know if this is local to kernel.node) - def unlinkFrom(other: Pid): unit = + override def unlinkFrom(other: Process): Unit = other match { case rpid: RemotePid => // do nothing }; - def exit(reason: Symbol): unit = kernel.exit(this, reason); - def exit(from: Pid, reason: Symbol): unit = { + override def exit(reason: Symbol): Unit = kernel.exit(this, reason); + override def exit(from: Process, reason: Symbol): unit = { from match { case rpid: RemotePid => kernel.exit(rpid, reason); } } - def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {} + override def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {} } [serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) { diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala index 096d2711d2..553ac99312 100644 --- a/src/actors/scala/actors/distributed/Serializer.scala +++ b/src/actors/scala/actors/distributed/Serializer.scala @@ -11,8 +11,8 @@ package scala.actors.distributed import java.io.{DataInputStream,DataOutputStream,EOFException} +import scala.io.BytePickle.SPU -import scala.actors.distributed.picklers.BytePickle.SPU import scala.actors.multi.Pid /** @@ -57,7 +57,7 @@ abstract class Serializer(s: Service) { writeBytes(outputStream, bytes) } - def pid: SPU[Pid] + def pid: SPU[RemotePid] def service = s def addRep(name: String, repCons: Serializer => AnyRef): unit } diff --git a/src/actors/scala/actors/distributed/SystemMessage.scala b/src/actors/scala/actors/distributed/SystemMessage.scala index d4441a1b66..274100f98f 100644 --- a/src/actors/scala/actors/distributed/SystemMessage.scala +++ b/src/actors/scala/actors/distributed/SystemMessage.scala @@ -10,30 +10,26 @@ package scala.actors.distributed -import scala.actors.multi.{Pid,ExcHandlerDesc} +import scala.actors.multi.ExcHandlerDesc abstract class MessageTyper { type DataType = Array[Byte] } -abstract class SystemMessage -case class Send(rec: Pid, data: MessageTyper#DataType) extends SystemMessage -case class Spawn(replyto: Pid, p: String) extends SystemMessage -case class PidReply(res: Pid) extends SystemMessage -case class Disconnect() extends SystemMessage +case class Send(rec: RemotePid, data: MessageTyper#DataType) +case class NamedSend(sym: Symbol, data: MessageTyper#DataType) +case class Spawn(replyto: RemotePid, p: String) +case class SpawnObject(replyto: RemotePid, data: MessageTyper#DataType) +case class Exit1(from: RemotePid, to: RemotePid, reason: Symbol) -case class NodeDown() extends SystemMessage +case class RemotePidReply(res: RemotePid) +case class Disconnect() +case class NodeDown() // CAUTION: Tells "from" to create a _uni-directional_ link! -case class Link(from: Pid, to: Pid) extends SystemMessage -case class UnLink(from: Pid, to: Pid) extends SystemMessage -case class Exit1(from: Pid, to: Pid, reason: Symbol) extends SystemMessage - -case class SpawnObject(replyto: Pid, data: MessageTyper#DataType) extends SystemMessage - -case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage - -case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage +case class Link(from: RemotePid, to: RemotePid) +case class UnLink(from: RemotePid, to: RemotePid) +case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) /* case class NamedSendRep (ser:Serializer) extends TypeRep[NamedSend](ser) { diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala index 7c7ab87eee..cc7fb9477a 100644 --- a/src/actors/scala/actors/distributed/TcpSerializerComb.scala +++ b/src/actors/scala/actors/distributed/TcpSerializerComb.scala @@ -11,8 +11,8 @@ package scala.actors.distributed import java.io.Reader +import scala.io.BytePickle._ -import scala.actors.distributed.picklers.BytePickle._ import scala.actors.distributed.MessagesComb._ import scala.actors.distributed.NodeComb._ import scala.actors.multi.Pid @@ -46,14 +46,14 @@ class TcpSerializerComb(serv: Service) extends Serializer(serv) { lookup(new String(content)) } - def pid: SPU[Pid] = { + def pid: SPU[RemotePid] = { val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel, if (p._1 == serv.node) serv.kernel.getLocalRef(p._2) else null), (t: TcpPid) => Pair(t.node, t.localId), pair(tcpNodePU, nat)); - wrap((p:Pid) => p, (pid:Pid) => pid match { + wrap((p:RemotePid) => p, (pid:RemotePid) => pid match { case tpid: TcpPid => tpid case other => diff --git a/src/actors/scala/actors/distributed/picklers/BytePickle.scala b/src/actors/scala/actors/distributed/picklers/BytePickle.scala deleted file mode 100644 index 796fc9a353..0000000000 --- a/src/actors/scala/actors/distributed/picklers/BytePickle.scala +++ /dev/null @@ -1,436 +0,0 @@ -package scala.actors.distributed.picklers - -import scala.collection.mutable.HashMap -import scala.collection.mutable.ArrayBuffer - -/** - Pickler combinators. - - Author: Philipp Haller <philipp.haller@epfl.ch> - */ - -object BytePickle { - class PicklerState(val stream: Array[byte], val dict: PicklerEnv) {} - class UnPicklerState(val stream: Array[byte], val dict: UnPicklerEnv) {} - - abstract class PU[t] { - def appP(a: t, state: Array[byte]): Array[byte]; - def appU(state: Array[byte]): Pair[t, Array[byte]]; - } - - abstract class SPU[t] { - def appP(a: t, state: PicklerState): PicklerState; - def appU(state: UnPicklerState): Pair[t, UnPicklerState]; - } - - class PicklerEnv extends HashMap[Any, int] { - private var cnt: int = 64; - def nextLoc() = { cnt = cnt + 1; cnt }; - } - - class UnPicklerEnv extends HashMap[int, Any] { - private var cnt: int = 64; - def nextLoc() = { cnt = cnt + 1; cnt }; - } - - abstract class RefDef; - case class Ref() extends RefDef; - case class Def() extends RefDef; - - def refDef: PU[RefDef] = new PU[RefDef] { - def appP(b: RefDef, s: Array[byte]): Array[byte] = - b match { - case Ref() => Array.concat(s, (List[byte](0)).toArray) - case Def() => Array.concat(s, (List[byte](1)).toArray) - }; - def appU(s: Array[byte]): Pair[RefDef, Array[byte]] = - if (s(0) == 0) Pair(Ref(), s.subArray(1, s.length)) - else Pair(Def(), s.subArray(1, s.length)); - } - - val REF = 0 - val DEF = 1 - - def unat: PU[int] = new PU[int] { - def appP(n: int, s: Array[byte]): Array[byte] = - Array.concat(s, nat2Bytes(n)); - def appU(s: Array[byte]): Pair[int, Array[byte]] = { - var num = 0 - def readNat: int = { - var b = 0; - var x = 0; - do { - b = s(num) - num = num + 1 - x = (x << 7) + (b & 0x7f); - } while ((b & 0x80) != 0); - x - } - Pair(readNat, s.subArray(num, s.length)) - } - } - - def share[a](pa: SPU[a]): SPU[a] = new SPU[a] { - def appP(v: a, state: PicklerState): PicklerState = { - /* - - is there some value equal to v associated with a location l in the pickle environment? - - yes: write REF-tag to outstream together with l - - no: - write DEF-tag to outstream - record current location l of outstream - --> serialize value - add entry to pickle environment, mapping v onto l - */ - val pe = state.dict - pe.get(v) match { - case None => - //Console.println("" + v + " is new") - //Console.println("writing DEF...") - val sPrime = refDef.appP(Def(), state.stream) - val l = pe.nextLoc() - - //Console.println("applying pickler to state " + sPrime) - val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe)) - - //Console.println("updating dict (" + l + ") for " + v) - pe.update(v, l) - - return sPrimePrime - case Some(l) => - //Console.println("writing REF...") - val sPrime = refDef.appP(Ref(), state.stream) - - //Console.println("writing location to state " + sPrime) - return new PicklerState(unat.appP(l, sPrime), pe) - } - } - def appU(state: UnPicklerState): Pair[a, UnPicklerState] = { - /* - - first, read tag (i.e. DEF or REF) - - if REF: - read location l - look up resulting value in unpickler environment - - if DEF: - record location l of input stream - --> deserialize value v with argument deserializer - add entry to unpickler environment, mapping l onto v - */ - val upe = state.dict - val res = refDef.appU(state.stream) - res._1 match { - case Def() => - val l = upe.nextLoc - val res2 = pa.appU(new UnPicklerState(res._2, upe)) - upe.update(l, res2._1) - return res2 - case Ref() => - val res2 = unat.appU(res._2) // read location - upe.get(res2._1) match { // lookup value in unpickler env - case None => error("invalid unpickler environment"); return null - case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe)) - } - } - } - } - - def upickle[t](p: PU[t], a: t): Array[byte] = - p.appP(a, new Array[byte](0)); - - def uunpickle[t](p: PU[t], stream: Array[byte]): t = - p.appU(stream)._1; - - def pickle[t](p: SPU[t], a: t): Array[byte] = - p.appP(a, new PicklerState(new Array[byte](0), new PicklerEnv)).stream; - - def unpickle[t](p: SPU[t], stream: Array[byte]): t = - p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1; - - def ulift[t](x: t): PU[t] = new PU[t] { - def appP(a: t, state: Array[byte]): Array[byte] = - if (x != a) { error("value to be pickled (" + a + ") != " + x); state } - else state; - def appU(state: Array[byte]) = Pair(x, state); - } - - def lift[t](x: t): SPU[t] = new SPU[t] { - def appP(a: t, state: PicklerState): PicklerState = - if (x != a) { /*error("value to be pickled (" + a + ") != " + x);*/ state } - else state; - def appU(state: UnPicklerState) = Pair(x, state); - } - - def usequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] { - def appP(b: u, s: Array[byte]): Array[byte] = { - val a = f(b) - val sPrime = pa.appP(a, s) - val pb = k(a) - val sPrimePrime = pb.appP(b, sPrime) - sPrimePrime - } - def appU(s: Array[byte]): Pair[u, Array[byte]] = { - val resPa = pa.appU(s) - val a = resPa._1 - val sPrime = resPa._2 - val pb = k(a) - pb.appU(sPrime) - } - } - - def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] { - def appP(b: u, s: PicklerState): PicklerState = { - val a = f(b) - //Console.println("pickling " + a + ", s: " + s.stream) - val sPrime = pa.appP(a, s) - val pb = k(a) - //Console.println("pickling " + b + ", s: " + s.stream) - pb.appP(b, sPrime) - } - def appU(s: UnPicklerState): Pair[u, UnPicklerState] = { - val resPa = pa.appU(s) - val a = resPa._1 - val sPrime = resPa._2 - val pb = k(a) - pb.appU(sPrime) - } - } - - def upair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = { - def fst(p: Pair[a,b]): a = p._1; - def snd(p: Pair[a,b]): b = p._2; - usequ(fst, pa, (x: a) => usequ(snd, pb, (y: b) => ulift(Pair(x, y)))) - } - - def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = { - def fst(p: Pair[a,b]): a = p._1; - def snd(p: Pair[a,b]): b = p._2; - sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) - } - - def triple[a,b,c](pa: SPU[a], pb: SPU[b], pc: SPU[c]): SPU[Triple[a,b,c]] = { - def fst(p: Triple[a,b,c]): a = p._1; - def snd(p: Triple[a,b,c]): b = p._2; - def trd(p: Triple[a,b,c]): c = p._3; - - sequ(fst, pa, - (x: a) => sequ(snd, pb, - (y: b) => sequ(trd, pc, - (z: c) => lift(Triple(x, y, z))))) - } - - def uwrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] = - usequ(j, pa, (x: a) => ulift(i(x))); - - def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] = - sequ(j, pa, (x: a) => lift(i(x))); - - def appendByte(a: Array[byte], b: int): Array[byte] = { - Array.concat(a, (List[byte](b.asInstanceOf[byte])).toArray) - } - - def nat2Bytes(x: int): Array[byte] = { - val buf = new ArrayBuffer[byte] - def writeNatPrefix(x: int): unit = { - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf += ((x & 0x7f) | 0x80).asInstanceOf[byte]; - } - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf += (x & 0x7f).asInstanceOf[byte]; - buf.toArray - } - - def nat: SPU[int] = new SPU[int] { - def appP(n: int, s: PicklerState): PicklerState = { - new PicklerState(Array.concat(s.stream, nat2Bytes(n)), s.dict); - } - def appU(s: UnPicklerState): Pair[int,UnPicklerState] = { - var num = 0 - def readNat: int = { - var b = 0; - var x = 0; - do { - b = s.stream(num) - num = num + 1 - x = (x << 7) + (b & 0x7f); - } while ((b & 0x80) != 0); - x - } - Pair(readNat, new UnPicklerState(s.stream.subArray(num, s.stream.length), s.dict)) - } - } - - def byte: SPU[byte] = new SPU[byte] { - def appP(b: byte, s: PicklerState): PicklerState = - new PicklerState(Array.concat(s.stream, (List[byte](b)).toArray), s.dict); - def appU(s: UnPicklerState): Pair[byte, UnPicklerState] = - Pair(s.stream(0), new UnPicklerState(s.stream.subArray(1, s.stream.length), s.dict)); - } - - def string: SPU[String] = - share(wrap((a:Array[byte]) => UTF8Codec.decode(a, 0, a.length), (s:String) => UTF8Codec.encode(s), bytearray)); - - def bytearray: SPU[Array[byte]] = { - wrap((l:List[byte]) => l.toArray, .toList, list(byte)) - } - - def bool: SPU[boolean] = { - def toEnum(b: boolean) = if (b) 1 else 0; - def fromEnum(n: int) = if (n == 0) false else true; - wrap(fromEnum, toEnum, nat) - } - - def ufixedList[a](pa: PU[a])(n: int): PU[List[a]] = { - def pairToList(p: Pair[a,List[a]]): List[a] = - p._1 :: p._2; - def listToPair(l: List[a]): Pair[a,List[a]] = - l match { case x :: xs => Pair(x, xs) } - - if (n == 0) ulift(Nil) - else - uwrap(pairToList, listToPair, upair(pa, ufixedList(pa)(n-1))) - } - - def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = { - def pairToList(p: Pair[a,List[a]]): List[a] = - p._1 :: p._2; - def listToPair(l: List[a]): Pair[a,List[a]] = - l match { case x :: xs => Pair(x, xs) } - - if (n == 0) lift(Nil) - else - wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) - } - - def list[a](pa: SPU[a]): SPU[List[a]] = - sequ((l: List[a])=>l.length, nat, fixedList(pa)); - - def ulist[a](pa: PU[a]): PU[List[a]] = - usequ((l:List[a]) => l.length, unat, ufixedList(pa)); - - def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] = - sequ(tag, nat, (x: int)=> ps.apply(x)()); - - def printByteArray(a: Array[byte]) = { - val iter = a.elements - while (iter.hasNext) { - val el = iter.next - Console.print("" + el + ", ") - } - } - - def main(args: Array[String]) = { - // test nat2Bytes - - Console.println(printByteArray(nat2Bytes(1))) - Console.println(printByteArray(nat2Bytes(10))) - Console.println(printByteArray(nat2Bytes(16))) - Console.println(printByteArray(nat2Bytes(256))) - - Console.println(100000) - var res = pickle(nat, 100000) - Console.println(printByteArray(res)) - var up = unpickle(nat, res) - Console.println(up) - - // -- int list - val intList = List(1, 7, 13) - Console.println(intList) - val res9 = pickle(list(nat), intList) - Console.println(printByteArray(res9)) - val up9 = unpickle(list(nat), res9) - Console.println(up9) - - // --------------- - // -- boolean list - val bList = List(false, true, true) - Console.println(bList) - val res2 = pickle(list(bool), bList) - Console.println(printByteArray(res2)) - val up2 = unpickle(list(bool), res2) - Console.println(up2) - - // -- string - val s = "Hello" - Console.println(s) - val res3 = pickle(string, s) - Console.println(printByteArray(res3)) - val up3 = unpickle(string, res3) - Console.println(up3) - - val personPU = wrap((p:Pair[String,int]) => Person(p._1, p._2), (p:Person) => Pair(p.name, p.age), pair(string, nat)); - val p = Person("Philipp", 25) - Console.println(p) - val res4 = pickle(personPU, p) - Console.println(printByteArray(res4)) - val up4 = unpickle(personPU, res4) - Console.println(up4) - - val x = Var("x"); - val i = Lam("x", x); - val k = Lam("x", Lam("y", x)); - val kki = App(k, App(k, i)); - - /*def varPU: PU[Term] = wrap(Var, - (t: Term)=> t match {case Var(x)=>x}, - string); - def lamPU: PU[Term] = wrap(p: Pair[String,Term]=>Lam(p._1, p._2), - (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, - pair(string, termPU)); - def appPU: PU[Term] = wrap(p: Pair[Term,Term]=>App(p._1, p._2), - (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, - pair(termPU, termPU)); - def termPU: PU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, - List(()=>varPU, ()=>lamPU, ()=>appPU)); - - Console.println("\n" + k); - val res5 = pickle(termPU, k); - Console.println(res5); - val up5 = unpickle(termPU, res5); - Console.println(up5); - - Console.println("\n" + kki); - val res6 = pickle(termPU, kki); - Console.println(res6); - Console.println("len: " + res6.length) - val up6 = unpickle(termPU, res6); - Console.println(up6);*/ - - def varSPU: SPU[Term] = wrap(Var, - (t: Term)=> t match {case Var(x)=>x}, - string); - - def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2), - (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, - pair(string, termSPU)); - - def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2), - (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, - pair(termSPU, termSPU)); - - def termSPU: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, - List(()=>varSPU, ()=>lamSPU, ()=>appSPU))); - - Console.println("\n" + k); - val res8 = pickle(termSPU, k); - Console.println(printByteArray(res8)); - Console.println("len: " + res8.length) - val up8 = unpickle(termSPU, res8); - Console.println(up8); - - Console.println("\n" + kki); - val res7 = pickle(termSPU, kki); - Console.println(printByteArray(res7)); - Console.println("len: " + res7.length) - - val up7 = unpickle(termSPU, res7); - Console.println(up7); - } - - case class Person(name: String, age: int); - - abstract class Term; - case class Var(s: String) extends Term; - case class Lam(s: String, t: Term) extends Term; - case class App(t1: Term, t2: Term) extends Term; -} diff --git a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala deleted file mode 100644 index c678e5acf8..0000000000 --- a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala +++ /dev/null @@ -1,519 +0,0 @@ -package scala.actors.distributed.picklers; - -import scala.collection.mutable.HashMap; - -import java.io.StringReader; -import java.io.StringWriter; - -/** - Pickler combinators. - - Author: Philipp Haller <philipp.haller@epfl.ch> - */ - -object SStreamPickle { - abstract class PU[t] { - def appP(a: t, state: OutStream): OutStream; - def appU(state: InStream): Pair[t,InStream]; - } - - //def pickle[t](p: PU[t], a: t): OutStream = - // p.appP(a, ""); - - def unpickle[t](p: PU[t], stream: InStream): t = - p.appU(stream)._1; - - def lift[t](x: t): PU[t] = new PU[t] { - def appP(a: t, state: OutStream): OutStream = state; - def appU(state: InStream) = Pair(x, state); - } - - def sequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] { - def appP(b: u, s: OutStream): OutStream = { - val a = f(b) - val sPrime = pa.appP(a, s) - val pb = k(a) - val sPrimePrime = pb.appP(b, sPrime) - sPrimePrime - } - def appU(s: InStream): Pair[u,InStream] = { - val resPa = pa.appU(s) - val a = resPa._1 - val sPrime = resPa._2 - val pb = k(a) - pb.appU(sPrime) - } - } - - def pair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = { - def fst(p: Pair[a,b]): a = p._1; - def snd(p: Pair[a,b]): b = p._2; - sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) - } - - def triple[a,b,c](pa: PU[a], pb: PU[b], pc: PU[c]): PU[Triple[a,b,c]] = { - def fst(p: Triple[a,b,c]): a = p._1; - def snd(p: Triple[a,b,c]): b = p._2; - def trd(p: Triple[a,b,c]): c = p._3; - - sequ(fst, pa, - (x: a) => sequ(snd, pb, - (y: b) => sequ(trd, pc, - (z: c) => lift(Triple(x, y, z))))) - } - - def wrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] = - sequ(j, pa, (x: a) => lift(i(x))); - - def unit: PU[unit] = - lift(unit); - - def pad(s: String, req: int): String = { - val buf = new StringBuffer - for (val i <- List.range(1, req-s.length+1)) - buf append "0" - (buf append s).toString - } - def encode(i: int): String = pad(Integer.toHexString(i), 8); - def decode(s: String): int = Integer.decode("0x" + s).intValue(); - - def int: PU[int] = new PU[int] { - def appP(n: int, s: OutStream): OutStream = { - s.write(encode(n)) - s - } - def appU(s: InStream): Pair[int,InStream] = { - val substr = s.read(8) - //Console.println("unpickling " + substr) - Pair(decode(substr), s) - } - } - - def char: PU[char] = new PU[char] { - def appP(b: char, s: OutStream): OutStream = { - s.write(b) - s - } - def appU(s: InStream): Pair[char,InStream] = { - val carr = new Array[char](1) - s.read(carr) - //Console.println("unpickling " + carr(0)) - Pair(carr(0), s) - } - } - - def bool: PU[boolean] = { - def toEnum(b: boolean) = if (b) 1 else 0; - def fromEnum(n: int) = if (n == 0) false else true; - wrap(fromEnum, toEnum, nat) - } - - def fixedList[a](pa: PU[a])(n: int): PU[List[a]] = { - def pairToList(p: Pair[a,List[a]]): List[a] = - p._1 :: p._2; - def listToPair(l: List[a]): Pair[a,List[a]] = - l match { case x :: xs => Pair(x, xs) } - - if (n == 0) lift(Nil) - else - wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) - } - - def list[a](pa: PU[a]): PU[List[a]] = - sequ((l: List[a])=>l.length, nat, fixedList(pa)); - - def string: PU[String] = - wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char)); - - def alt[a](tag: a => int, ps: List[PU[a]]): PU[a] = - sequ(tag, int, ps.apply); - - def data[a](tag: a => int, ps: List[()=>PU[a]]): PU[a] = - sequ(tag, nat, (x: int)=> ps.apply(x)()); - - def option[a](pa: PU[a]): PU[Option[a]] = { - def tag(x: Option[a]) = x match { - case None => 0 - case Some(y) => 1 - } - def fromSome(x: Option[a]) = x match { - case Some(y) => y - case None => null - } - def toSome(x: a): Option[a] = Some(x); - val pnone: PU[Option[a]] = lift(None) - alt(tag, List(pnone, wrap(toSome, fromSome, pa))) - } - - def byteString(b: int) = - pad(Integer.toHexString(b), 2); - - def natString(x: int): String = { - val buf = new StringBuffer - - def writeNatPrefix(x: int): unit = { - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf.append(byteString((x & 0x7f) | 0x80)); - } - - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf.append(byteString(x & 0x7f)); - buf.toString() - } - - def nat: PU[int] = new PU[int] { - def appP(n: int, s: OutStream): OutStream = { - s.write(natString(n)) - s - } - def appU(s: InStream): Pair[int,InStream] = { - def readNat: int = { - var b = 0; - var x = 0; - do { - b = decode(s.read(2)); - x = (x << 7) + (b & 0x7f); - } while ((b & 0x80) != 0); - x - } - Pair(readNat, s) - } - } - - def main(args: Array[String]) = { - def testBase128(x: int) = { - Console.println(x) - - val sw = new StringWriter - val os = new OutStream(sw) - val res = nat.appP(x, os) - os.flush() - Console.println(sw.toString()) - - val up = nat.appU(new InStream(new StringReader(sw.toString()))) - Console.println(up._1) - } - - testBase128(0) - testBase128(1) - testBase128(64) - testBase128(127) - testBase128(128) - testBase128(8192) - - def pickleTest[a](x: a, pa: PU[a]) = { - Console.println(x) - - val sw = new StringWriter - val os = new OutStream(sw) - val res = pa.appP(x, os) - os.flush() - Console.println(sw.toString()) - - val up = pa.appU(new InStream(new StringReader(sw.toString()))) - Console.println(up._1) - } - - pickleTest(List(1, 7, 13), list(nat)) - - pickleTest(List(false, true, true), list(bool)) - - pickleTest("Hello", string) - - - val personPU = wrap((p: Pair[String,int]) => Person(p._1, p._2), (p: Person) => Pair(p.name, p.age), pair(string, nat)); - val p = Person("Philipp", 25) - pickleTest(p, personPU) - - - val x = Var("x"); - val i = Lam("x", x); - val k = Lam("x", Lam("y", x)); - val kki = App(k, App(k, i)); - - def varPU: PU[Term] = - wrap(Var, - (t: Term)=> t match {case Var(x)=>x}, - string); - def lamPU: PU[Term] = - wrap((p: Pair[String,Term])=>Lam(p._1, p._2), - (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, - pair(string, termPU)); - def appPU: PU[Term] = - wrap((p: Pair[Term,Term])=>App(p._1, p._2), - (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, - pair(termPU, termPU)); - def termPU: PU[Term] = - data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, - List(()=>varPU, ()=>lamPU, ()=>appPU)); - - pickleTest(k, termPU) - pickleTest(kki, termPU) - } - - case class Person(name: String, age: int); -} - -abstract class Term; -case class Var(s: String) extends Term; -case class Lam(s: String, t: Term) extends Term; -case class App(t1: Term, t2: Term) extends Term; - - -object ShareStreamPickle { - abstract class SPU[t] { - def appP(a: t, state: PicklerState): PicklerState; - def appU(state: UnPicklerState): Pair[t, UnPicklerState]; - } - - //def pickle[t](p: SPU[t], a: t): String = - // p.appP(a, new PicklerState("", new PicklerEnv)).stream; - - //def unpickle[t](p: SPU[t], stream: String): t = - // p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1; - - class PicklerEnv extends HashMap[Any, int] { - private var cnt: int = 64; - def nextLoc() = { cnt = cnt + 1; cnt }; - } - - class UnPicklerEnv extends HashMap[int, Any] { - private var cnt: int = 64; - def nextLoc() = { cnt = cnt + 1; cnt }; - } - - class PicklerState(val stream: OutStream, val dict: PicklerEnv) {} - class UnPicklerState(val stream: InStream, val dict: UnPicklerEnv) {} - - abstract class RefDef; - case class Ref() extends RefDef; - case class Def() extends RefDef; - - def refDef: SStreamPickle.PU[RefDef] = new SStreamPickle.PU[RefDef] { - def appP(b: RefDef, s: OutStream): OutStream = - b match { - case Ref() => s.write("0"); s - case Def() => s.write("1"); s - }; - def appU(s: InStream): Pair[RefDef, InStream] = - if (s.readChar == '0') Pair(Ref(), s) - else Pair(Def(), s); - } - - def share[a](pa: SPU[a]): SPU[a] = new SPU[a] { - def appP(v: a, state: PicklerState): PicklerState = { - /* - - is there some value equal to v associated with a location l in the pickle environment? - - yes: write REF-tag to outstream together with l - - no: - write DEF-tag to outstream - record current location l of outstream - --> serialize value - add entry to pickle environment, mapping v onto l - */ - val pe = state.dict - pe.get(v) match { - case None => - //Console.println("" + v + " is new") - //Console.println("writing DEF...") - val sPrime = refDef.appP(Def(), state.stream) - val l = pe.nextLoc() - - //Console.println("applying pickler to state " + sPrime) - val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe)) - - //Console.println("updating dict (" + l + ") for " + v) - pe.update(v, l) - - return sPrimePrime - case Some(l) => - //Console.println("writing REF...") - val sPrime = refDef.appP(Ref(), state.stream) - //Console.println("writing location to state " + sPrime) - return new PicklerState(SStreamPickle.nat.appP(l, sPrime), pe) - } - } - def appU(state: UnPicklerState): Pair[a, UnPicklerState] = { - /* - - first, read tag (i.e. DEF or REF) - - if REF: - read location l - look up resulting value in unpickler environment - - if DEF: - record location l of input stream - --> deserialize value v with argument deserializer - add entry to unpickler environment, mapping l onto v - */ - val upe = state.dict - val res = refDef.appU(state.stream) - res._1 match { - case Def() => - val l = upe.nextLoc - val res2 = pa.appU(new UnPicklerState(res._2, upe)) - upe.update(l, res2._1) - return res2 - case Ref() => - val res2 = SStreamPickle.nat.appU(res._2) // read location - upe.get(res2._1) match { // lookup value in unpickler env - case None => error("invalid unpickler environment"); return null - case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe)) - } - } - } - } - - def lift[t](x: t): SPU[t] = new SPU[t] { - def appP(a: t, state: PicklerState): PicklerState = state; - def appU(state: UnPicklerState) = Pair(x, state); - } - - def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] { - def appP(b: u, s: PicklerState): PicklerState = { - val a = f(b) - //Console.println("pickling " + a + ", s: " + s.stream) - val sPrime = pa.appP(a, s) - val pb = k(a) - //Console.println("pickling " + b + ", s: " + s.stream) - pb.appP(b, sPrime) - } - def appU(s: UnPicklerState): Pair[u, UnPicklerState] = { - val resPa = pa.appU(s) - val a = resPa._1 - val sPrime = resPa._2 - val pb = k(a) - pb.appU(sPrime) - } - } - - def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = { - def fst(p: Pair[a,b]): a = p._1; - def snd(p: Pair[a,b]): b = p._2; - sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) - } - - def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] = - sequ(j, pa, (x: a) => lift(i(x))); - - def char: SPU[char] = new SPU[char] { - def appP(b: char, s: PicklerState): PicklerState = { - s.stream.write(b) - new PicklerState(s.stream, s.dict) - } - def appU(s: UnPicklerState): Pair[char, UnPicklerState] = - Pair(s.stream.readChar, new UnPicklerState(s.stream, s.dict)); - } - - def pad(s: String, req: int): String = { - val buf = new StringBuffer - for (val i <- List.range(1, req-s.length+1)) - buf append "0" - (buf append s).toString - } - def encode(i: int): String = pad(Integer.toHexString(i), 8); - def decode(s: String): int = Integer.decode("0x" + s).intValue(); - - def byteString(b: int) = - pad(Integer.toHexString(b), 2); - - def natString(x: int): String = { - val buf = new StringBuffer - - def writeNatPrefix(x: int): unit = { - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf.append(byteString((x & 0x7f) | 0x80)); - } - - val y = x >>> 7; - if (y != 0) writeNatPrefix(y); - buf.append(byteString(x & 0x7f)); - buf.toString() - } - - def nat: SPU[int] = new SPU[int] { - def appP(n: int, s: PicklerState): PicklerState = { - s.stream.write(natString(n)) - new PicklerState(s.stream, s.dict) - } - def appU(s: UnPicklerState): Pair[int,UnPicklerState] = { - def readNat: int = { - var b = 0; - var x = 0; - do { - b = decode(s.stream.read(2)); - x = (x << 7) + (b & 0x7f); - } while ((b & 0x80) != 0); - x - } - Pair(readNat, new UnPicklerState(s.stream, s.dict)) - } - } - - def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = { - def pairToList(p: Pair[a,List[a]]): List[a] = - p._1 :: p._2; - def listToPair(l: List[a]): Pair[a,List[a]] = - l match { case x :: xs => Pair(x, xs) } - - if (n == 0) lift(Nil) - else - wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) - } - - def list[a](pa: SPU[a]): SPU[List[a]] = - sequ((l: List[a])=>l.length, nat, fixedList(pa)); - - def string: SPU[String] = - wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char)); - - def alt[a](tag: a => int, ps: List[SPU[a]]): SPU[a] = - sequ(tag, nat, ps.apply); - - def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] = - sequ(tag, nat, (x: int)=> ps.apply(x)()); - - def main(args: Array[String]) = { - def pickleTest[a](x: a, pa: SPU[a]) = { - Console.println(x) - - val sw = new StringWriter - val os = new OutStream(sw) - val res = pa.appP(x, new PicklerState(os, new PicklerEnv)) - os.flush() - Console.println(sw.toString()) - - val up = pa.appU(new UnPicklerState(new InStream(new StringReader(sw.toString())), new UnPicklerEnv)) - Console.println(up._1) - } - - val x = Var("x"); - val i = Lam("x", x); - val k = Lam("x", Lam("y", x)); - val kki = App(k, App(k, i)); - - def varSPU: SPU[Term] = wrap(Var, - (t: Term)=> t match {case Var(x)=>x}, - string); - - def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2), - (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, - pair(string, termSPU)); - - def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2), - (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, - pair(termSPU, termSPU)); - - def termSPU: SPU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, - List(()=>varSPU, ()=>lamSPU, ()=>appSPU)); - - def termSPUshared: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, - List(()=>varSPU, ()=>lamSPU, ()=>appSPU))); - - pickleTest(k, termSPU) - pickleTest(k, termSPUshared) - pickleTest(kki, termSPU) - pickleTest(kki, termSPUshared) - } -} diff --git a/src/actors/scala/actors/distributed/picklers/Stream.scala b/src/actors/scala/actors/distributed/picklers/Stream.scala deleted file mode 100644 index 65e59b975d..0000000000 --- a/src/actors/scala/actors/distributed/picklers/Stream.scala +++ /dev/null @@ -1,81 +0,0 @@ -package scala.actors.distributed.picklers; - -import java.io.Reader; -import java.io.Writer; - -import scala.collection.mutable.HashMap; - -abstract class Stream { - protected var loc: int = 0; - - def getLocation = loc; -} - -class OutStream(writer: Writer) extends Stream { - val picklerEnv = new PicklerEnv; - - def write(s: String): unit = { - loc = loc + s.length() - writer.write(s) - } - - def write(c: char): unit = { - loc = loc + 1 - writer.write(c) - } - - def flush(): unit = - writer.flush(); -} - -class InStream(reader: Reader) extends Stream { - val unpicklerEnv = new UnpicklerEnv; - - def read(num: int): String = { - val carr = new Array[char](num) - val cnt = reader.read(carr) - loc = loc + num - //Console.println("new loc: " + loc) - new String(carr) - - /*val buf = new StringBuffer - var ch = r.read() - loc = loc + 1 - if (num > 1) { - var cnt = 1 - while (cnt < num && ch != -1) { - buf.append(ch) - ch = r.read() - loc = loc + 1 - cnt = cnt + 1 - } - if (cnt == num) - buf.toString() - else - buf.toString() // error - } - else - new String(ch.asInstanceOf[char])*/ - } - - def read(cbuf: Array[char]): int = { - loc = loc + cbuf.length - reader.read(cbuf) - } - - def readChar: char = { - val carr = new Array[char](1) - read(carr) - carr(0) - } -} - -class PicklerEnv[a] extends HashMap[a, int] { - private var cnt: int = 0; - def nextLoc() = { cnt = cnt + 1; cnt }; -} - -class UnpicklerEnv[a] extends HashMap[int, a] { - private var cnt: int = 0; - def nextLoc() = { cnt = cnt + 1; cnt }; -} diff --git a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala deleted file mode 100644 index 36ad115cdd..0000000000 --- a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala +++ /dev/null @@ -1,77 +0,0 @@ -/* ____ ____ ____ ____ ______ *\ -** / __// __ \/ __// __ \/ ____/ SOcos COmpiles Scala ** -** __\_ \/ /_/ / /__/ /_/ /\_ \ (c) 2002-2006, LAMP/EPFL ** -** /_____/\____/\___/\____/____/ ** -\* */ - -// $Id: UTF8Codec.scala 7116 2006-04-11 15:36:05Z mihaylov $ - - -package scala.actors.distributed.picklers - -object UTF8Codec { - - def encode(src: Array[Char], from: Int, dst: Array[Byte], to: Int, len: Int): Int = { - var i = from; - var j = to; - val end = from + len; - while (i < end) { - val ch = src(i); - i = i + 1; - if (ch < 128) { - dst(j) = ch.toByte; - j = j + 1; - } - else if (ch <= 0x3FF) { - dst(j) = (0xC0 | (ch >> 6)).toByte; - dst(j+1) = (0x80 | (ch & 0x3F)).toByte; - j = j + 2; - } else { - dst(j) = (0xE0 | (ch >> 12)).toByte; - dst(j+1) = (0x80 | ((ch >> 6) & 0x3F)).toByte; - dst(j+2) = (0x80 | (ch & 0x3F)).toByte; - j = j + 3; - } - } - j - } - - def encode(s: String, dst: Array[Byte], to: Int): Int = - encode(s.toCharArray(), 0, dst, to, s.length()); - - - def encode(s: String): Array[Byte] = { - val dst = new Array[Byte](s.length() * 3); - val len = encode(s, dst, 0); - dst.subArray(0, len) - } - - def decode(src: Array[Byte], from: Int, - dst: Array[Char], to: Int, len: Int): Int = - { - var i = from; - var j = to; - val end = from + len; - while (i < end) { - var b = src(i) & 0xFF; - i = i + 1; - if (b >= 0xE0) { - b = ((b & 0x0F) << 12) | (src(i) & 0x3F) << 6; - b = b | (src(i+1) & 0x3F); - i = i + 2; - } else if (b >= 0xC0) { - b = ((b & 0x1F) << 6) | (src(i) & 0x3F); - i = i + 1; - } - dst(j) = b.toChar; - j = j + 1; - } - j - } - - def decode(src: Array[Byte], from: Int, len: Int): String = { - val cs = new Array[Char](len); - String.copyValueOf(cs, 0, decode(src, 0, cs, 0, len)); - } - -} diff --git a/src/actors/scala/actors/gui/Publisher.scala b/src/actors/scala/actors/gui/Publisher.scala index 0b71bc30f6..fc81ec9db9 100644 --- a/src/actors/scala/actors/gui/Publisher.scala +++ b/src/actors/scala/actors/gui/Publisher.scala @@ -8,7 +8,7 @@ import scala.actors.single.Pid import scala.actors.gui.event.Event class EventHandlers { - type Handler = PartialFunction[AnyRef,unit] + type Handler = PartialFunction[Any,unit] private val handlers = new ListBuffer[Handler] @@ -16,9 +16,9 @@ class EventHandlers { def -= (h: Handler) = { handlers -= h } def compoundHandler = new Handler { - def isDefinedAt(e: AnyRef): boolean = handlers.exists(.isDefinedAt(e)) + def isDefinedAt(e: Any): boolean = handlers.exists(.isDefinedAt(e)) - def apply(e: AnyRef): unit = + def apply(e: Any): unit = handlers.find(.isDefinedAt(e)) match { case Some(h) => h.apply(e) case None => // do nothing @@ -29,10 +29,10 @@ class EventHandlers { trait Responder extends Actor { protected val handlers = new EventHandlers - final def eventloop(f: PartialFunction[Message,unit]): scala.All = + final def eventloop(f: PartialFunction[Any,unit]): scala.All = receive(new RecursiveProxyHandler(this, f)) - def eventblock(f: PartialFunction[Message,unit]): unit = { + def eventblock(f: PartialFunction[Any,unit]): unit = { try { receive(new RecursiveProxyHandler(this, f)) } @@ -42,11 +42,11 @@ trait Responder extends Actor { } } - private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] { - def isDefinedAt(m: Message): boolean = + private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] { + def isDefinedAt(m: Any): boolean = true // events should be removed from the mailbox immediately! - def apply(m: Message): unit = { + def apply(m: Any): unit = { if (f.isDefinedAt(m)) f(m) // overrides any installed handler else if (handlers.compoundHandler.isDefinedAt(m)) @@ -63,7 +63,7 @@ case class Subscribe(s: Subscriber) case class Publish(e: Event) trait Subscriber extends Responder { - type Handler = PartialFunction[AnyRef,unit] + type Handler = PartialFunction[Any,unit] def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this) } @@ -98,11 +98,11 @@ trait Publisher extends Responder { } // TODO: super.receive might already be overridden! - //final override def receive(f: PartialFunction[Message,unit]): scala.All = + //final override def receive(f: PartialFunction[Any,unit]): scala.All = //super.receive(new ProxyPubSubHandler(f)) - private class ProxyPubSubHandler(f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] { - def isDefinedAt(m: Message): boolean = + private class ProxyPubSubHandler(f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] { + def isDefinedAt(m: Any): boolean = if (f.isDefinedAt(m)) true else m match { case Subscribe(s) => true @@ -110,7 +110,7 @@ trait Publisher extends Responder { case other => false } - def apply(m: Message): unit = { + def apply(m: Any): unit = { m match { case Subscribe(s) => //Console.println("Rec subscription: " + s) diff --git a/src/actors/scala/actors/multi/Actor.scala b/src/actors/scala/actors/multi/Actor.scala index 520267d2a2..2b775e927e 100644 --- a/src/actors/scala/actors/multi/Actor.scala +++ b/src/actors/scala/actors/multi/Actor.scala @@ -10,299 +10,16 @@ package scala.actors.multi -import scala.collection.mutable.{HashMap,HashSet,Stack} - -case class ExcHandlerDesc(pid: Pid, eid: Int) - -class ExcHandler(actions: PartialFunction[Throwable, Unit], - actor: Actor, - parent: ExcHandlerDesc) { - def handle(e: Throwable): Unit = { - if (!actions.isDefinedAt(e)) { - if (parent != null) actor.forwardExc(parent, e) - } - else - actions(e) - } -} - /** * @author Philipp Haller */ -abstract class Actor extends MailBox { +trait Actor extends scala.actors.Actor with MailBox { def run(): Unit = {} - def start(): Unit = { - var finished = true - try { run } - catch { - case d: Done => - finished = false - case t: Throwable => - if (!excHandlerDescs.isEmpty) - forwardExc(excHandlerDescs.top, t) - else - exit(new Symbol(t.toString())) - } - if (finished) die() - } - - case class Exit(from: Pid, reason: Symbol) extends Message - - private var pid: Pid = null - def self: Pid = { - if (pid == null) pid = new LocalPid(this) - pid - } - def self_= (p: Pid) = pid = p - - - private val links = new HashSet[Pid] - - def link(to: Pid): unit = { - // TODO: check if exists (eff: dont need to.linkTo...) - links += to - to.linkTo(self) - } - def linkTo(to: Pid): unit = links += to - - def unlink(from: Pid): unit = { - // TODO: check if exists (eff) - links -= from - from.unlinkFrom(self) - } - def unlinkFrom(from: Pid): unit = links -= from - - - private var trapExit = false - - def processFlag(flag: Symbol, set: boolean) = { - if (flag.name.equals("trapExit")) trapExit = set - } - - def exit(reason: Symbol): unit = { - exitLinked(reason, new HashSet[Pid]) - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - } - } - - def exit(from: Pid, reason: Symbol): unit = { - if (from == self) { - exit(reason) - } - else { - if (trapExit) - this send Exit(from, reason) - else if (!reason.name.equals("normal")) - exit(reason) - } + def start(): Unit = try { run() } + catch { + case d: Done => // do nothing } - def exitLinked(reason: Symbol, exitMarks: HashSet[Pid]): unit = { - if (exitMarks contains self) { - // we are marked, do nothing - } - else { - exitMarks += self // mark self as exiting - //Console.println("" + self + " is exiting (" + reason + ").") - - // exit linked scala.actors - val iter = links.elements - while (iter.hasNext) { - val linkedPid = iter.next - unlink(linkedPid) - linkedPid.exit(self, reason) - } - exitMarks -= self - } - } - - def spawnLink(body: Actor => unit): Pid = { - val a = new Actor { - override def run = body(this) - } - if (!excHandlerDescs.isEmpty) - a.pushExcHandlerDesc(excHandlerDescs.top) - // link new process to self - link(a.self) - a.start - a.self - } - - val excHandlerDescs = new Stack[ExcHandlerDesc] - - // exception handler table - val excHandlers = new HashMap[int, ExcHandler] - - def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { - // locality check (handler local to this actor?) - if (destDesc.pid == self) { - handleExc(destDesc, e) - } - else { - // forward to pid of destination descriptor - destDesc.pid.handleExc(destDesc, e) - } - } - - def pushExcHandlerDesc(desc: ExcHandlerDesc) = { - excHandlerDescs += desc - } - - /** is only called for local handlers - (i.e. destDesc.pid == self) */ - def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = - (excHandlers get destDesc.eid) match { - case Some(handler) => - handler.handle(e) - case None => - error("exc desc refers to non-registered handler") - }; - - var excCnt = 0 - def freshExcId = { excCnt = excCnt + 1; excCnt } - - def tryAsync(block: => unit, - handlerFun: PartialFunction[Throwable, unit]) = { - val excHandler = - new ExcHandler(handlerFun, - this, - if (excHandlerDescs.isEmpty) null - else excHandlerDescs.top) - - val desc = ExcHandlerDesc(self, freshExcId) - // associate desc with handler - excHandlers += desc.eid -> excHandler - // push desc onto stack - excHandlerDescs += desc - //Console.println("desc stack height: " + excHandlerDescs.length) - // execute code block - block - } - - def die(reason: Symbol) = { - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - exit(reason) - } - } - - override def die() = { - if (isAlive) { - isAlive = false - //Debug.info("" + this + " died.") - exit('normal) - } - } - - def process(f: PartialFunction[Message,unit], msg: Message): unit = { - try { - f(msg) - } - catch { - case d: Done => - throw new Done - case t: Throwable => - throw t - if (!excHandlerDescs.isEmpty) - forwardExc(excHandlerDescs.top, t) - else - die(Symbol(t.toString())) - } - } - - override def receive(f: PartialFunction[Message,unit]): scala.All = { - if (isAlive) { - Scheduler.tick(this) - continuation = null - sent.dequeueFirst(f.isDefinedAt) match { - case Some(msg) => - process(f, msg) - die() - case None => - continuation = f - //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") - } - } - throw new Done - } - - override def receiveMsg(msg: MailBox#Message) = { - //Debug.info("" + Thread.currentThread() + ": Resuming " + this) - if (continuation != null) { - val f = continuation - continuation = null - scheduled = false - process(f, msg) - die() - } - else { - // use more complex receive-and-return continuation - val cases = contCases - val then = contThen - contCases = null - contThen = null - scheduled = false - val result = cases(msg) - then(result) - die() - } - } - - def spawn(a: Actor): Pid = { - // let "a" inherit active exception handler - if (!excHandlerDescs.isEmpty) - a.pushExcHandlerDesc(excHandlerDescs.top) - a.start - a.self - } - - def spawn(body: Actor => unit): Pid = { - val a = new Actor { - override def run = body(this) - } - if (!excHandlerDescs.isEmpty) - a.pushExcHandlerDesc(excHandlerDescs.top) - a.start - a.self - } - - def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = { - val a = new Actor { - override def run = receive(cases) - } - if (!excHandlerDescs.isEmpty) - a.pushExcHandlerDesc(excHandlerDescs.top) - a.start - a.self - } - - def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = { - receive { - case Pair(pid1, msg1) => receive { - case Pair(pid2, msg2) => cont match { - case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) - } - } - case Pair(pid2, msg2) => receive { - case Pair(pid1, msg1) => cont match { - case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) - } - } - } - } - - def makeRef = Actor.makeRef -} - -object Actor { - private var counter = 0 - type Tag = int - def makeRef: Tag = { - counter = counter + 1 - counter - } + def !(msg: Any): Unit = send(msg) } diff --git a/src/actors/scala/actors/multi/LocalPid.scala b/src/actors/scala/actors/multi/LocalPid.scala deleted file mode 100644 index 6762f0915d..0000000000 --- a/src/actors/scala/actors/multi/LocalPid.scala +++ /dev/null @@ -1,104 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -import scala.collection.mutable.Queue - -/** - * @author Philipp Haller - */ -class LocalPid(actor: Actor) extends Pid { - var target = actor - - def !(msg: MailBox#Message): Unit = target send msg - - def link(other: Pid): Unit = target link other - def linkTo(other: Pid): Unit = target linkTo other - def unlink(other: Pid): Unit = target unlink other - def unlinkFrom(other: Pid): Unit = target unlinkFrom other - def exit(reason: Symbol): Unit = target exit reason - def exit(from: Pid, reason: Symbol): Unit = target.exit(from, reason) - - def spawn(body: Actor => Unit): Pid = { - val a = new Actor { - override def run: Unit = body(this) - } - a.start - a.self - } - - def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = { - val a = new Actor { - override def run: Unit = receive(cases) - } - a.start - a.self - } - - override def toString() = "LocalPid(" + target + ")" - - def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = - actor.handleExc(destDesc, e); - - def become(clos: Actor => Unit) = { - // old actor should become anonymous (cannot receive any messages any more) - // achieved by removing it from target of pid. - - val oldActor = target - //Debug.info("old actor: " + oldActor); - - // change our target to point to a newly created actor with the same mailbox. - - val newActor = new Actor { - override def run: Unit = clos(this) - } - newActor.sent = oldActor.sent - target = newActor - newActor.self = this - - //Debug.info("new actor: " + newActor); - - // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor) - oldActor.sent = new Queue[MailBox#Message] - - //Debug.info("Starting new actor."); - newActor.start // important to start after changing pid because actor may send messages to itself. - } - - private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] { - def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m) - def apply(m: MailBox#Message): unit = { - f(m) - a receive this - } - } - - def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]) = { - - become(a => a receive new ProxyPartialFunction(a, f)) - - /*become( - a:Actor => { - def loop: Unit = { - def proxyFun(m: MailBox#Message): unit = { - if (f.isDefinedAt(m)) { - f(m); - loop - } - }; - //a receive proxyFun - } - loop - } - )*/ - } - -} diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala index 7809deedc3..5ec21e613b 100644 --- a/src/actors/scala/actors/multi/MailBox.scala +++ b/src/actors/scala/actors/multi/MailBox.scala @@ -15,23 +15,20 @@ import scala.collection.mutable.Queue /** * @author Philipp Haller */ -class MailBox { - type Message = AnyRef - case class TIMEOUT() extends Message - +trait MailBox { /** Unconsumed messages. */ - var sent = new Queue[Message] + var sent = new Queue[Any] - var continuation: PartialFunction[Message,Unit] = null + var continuation: PartialFunction[Any,Unit] = null // more complex continuation - var contCases: PartialFunction[Message,Message] = null - var contThen: Message => unit = null + var contCases: PartialFunction[Any,Any] = null + var contThen: Any => unit = null def hasCont = if ((continuation == null) && (contCases == null)) false else true - def contDefinedAt(msg: Message) = + def contDefinedAt(msg: Any) = if (((continuation != null) && continuation.isDefinedAt(msg)) || ((contCases != null) && contCases.isDefinedAt(msg))) true @@ -43,7 +40,7 @@ class MailBox { private var pendingSignal = false - def send(msg: Message): unit = synchronized { + def send(msg: Any): unit = synchronized { if (isAlive) { if (!hasCont || scheduled) { //Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") @@ -81,7 +78,7 @@ class MailBox { } } - def receiveMsg(msg: MailBox#Message) = { + def receiveMsg(msg: Any) = { //Debug.info("" + Thread.currentThread() + ": Resuming " + this) if (continuation != null) { val f = continuation @@ -103,7 +100,7 @@ class MailBox { } } - def receive(f: PartialFunction[Message,unit]): scala.All = { + def receive(f: PartialFunction[Any,unit]): Nothing = { if (isAlive) { Scheduler.tick(this) continuation = null @@ -119,7 +116,7 @@ class MailBox { throw new Done } - def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = { + def receiveWithin(msec: long)(f: PartialFunction[Any, unit]): Nothing = { Scheduler.tick(this) continuation = null sent.dequeueFirst(f.isDefinedAt) match { @@ -145,11 +142,7 @@ class MailBox { throw new Done } - // original wish: - // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B - // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit - - def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): unit = { + def receiveAndReturn(cases: PartialFunction[Any,Any], then: Any => unit): unit = { contCases = null contThen = null sent.dequeueFirst(cases.isDefinedAt) match { @@ -169,11 +162,11 @@ class MailBox { // receiv {...} then (msg => {...msg...}) - class ReceiveAndReturn(cases: PartialFunction[Message,Message]) { - def then(body: Message => unit): unit = receiveAndReturn(cases, body) + class ReceiveAndReturn(cases: PartialFunction[Any,Any]) { + def then(body: Any => unit): unit = receiveAndReturn(cases, body) } - def receiv(cases: PartialFunction[Message,Message]): ReceiveAndReturn = + def receiv(cases: PartialFunction[Any,Any]): ReceiveAndReturn = new ReceiveAndReturn(cases) def die() = { diff --git a/src/actors/scala/actors/multi/Pid.scala b/src/actors/scala/actors/multi/Pid.scala deleted file mode 100644 index e8cf5ede23..0000000000 --- a/src/actors/scala/actors/multi/Pid.scala +++ /dev/null @@ -1,30 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.multi - -/** - * @author Philipp Haller - */ -[serializable]abstract class Pid { - def !(msg: MailBox#Message): Unit - - def link(other: Pid): Unit - def linkTo(other: Pid): Unit // uni-directional - def unlink(other: Pid): Unit - def unlinkFrom(other: Pid): Unit // uni-directional - def exit(reason: Symbol): Unit - def exit(from: Pid, reason: Symbol): Unit - - def handleExc(destDesc: ExcHandlerDesc, e: Throwable): Unit - - //def become(clos: Actor => Unit): Unit - //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): Unit -} diff --git a/src/actors/scala/actors/multi/Process.scala b/src/actors/scala/actors/multi/Process.scala new file mode 100644 index 0000000000..3764a6454b --- /dev/null +++ b/src/actors/scala/actors/multi/Process.scala @@ -0,0 +1,281 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: LocalPid.scala 7969 2006-06-23 11:44:31Z michelou $ + +package scala.actors.multi + +import scala.collection.mutable.{HashMap,HashSet,Stack} + +/** + * @author Philipp Haller + */ +object Process { + + def spawn(body: => Unit): Process = { + val p = new Process { + override def run(): Unit = body + } + val task = new ReceiverTask(p, null) { + override def run(): Unit = try { p.start() } + catch { + case d: Done => + // do nothing (continuation is already saved) + } + } + Scheduler.execute(task) + p + } + + def spawnLink(body: => Unit): Process = { + val p = new Process { + override def run(): Unit = body + } + val task = new ReceiverTask(p, null) { + override def run(): Unit = try { p.start() } + catch { + case d: Done => + // do nothing (continuation is already saved) + } + } + self.link(p) + Scheduler.execute(task) + p + } + + def send(p: Process, msg: Any) = + p ! msg + + def receive(f: PartialFunction[Any,Unit]): Nothing = + self.receive(f) + + def receiveWithin(msec: long)(f: PartialFunction[Any,Unit]): Nothing = + self.receiveWithin(msec)(f) + + def self: Process = { + val p = Scheduler.getProcess(Thread.currentThread()) + if (p == null) + error("Self called outside a process") + else + p + } + + def exit(p: Process, reason: Symbol) = + p.exit(reason) +} + +/** + * @author Philipp Haller + */ +class Process extends scala.actors.Process with Actor { + private val links = new HashSet[scala.actors.Process] + + override def start(): Unit = try { run() } + catch { + case d: Done => // do nothing + case t: Throwable => { + if (!excHandlerDescs.isEmpty) + forwardExc(excHandlerDescs.top, t) + else + exit(new Symbol(t.toString())) + } + } + + case class Exit(from: scala.actors.Process, reason: Symbol) + + def link(to: scala.actors.Process): Unit = { + links += to + to.linkTo(this) + } + + def linkTo(to: scala.actors.Process): Unit = + links += to + + def unlink(from: scala.actors.Process): Unit = { + links -= from + from.unlinkFrom(this) + } + + def unlinkFrom(from: scala.actors.Process): Unit = + links -= from + + private var trapExit = false + + def processFlag(flag: Symbol, set: boolean) = { + if (flag.name.equals("trapExit")) trapExit = set + } + + def exit(reason: Symbol): Unit = { + exitLinked(reason, new HashSet[Process]) + if (isAlive) isAlive = false + } + + def exit(from: scala.actors.Process, reason: Symbol): Unit = { + if (from == this) { + exit(reason) + } + else { + if (trapExit) + this ! Exit(from, reason) + else if (!reason.name.equals("normal")) + exit(reason) + } + } + + def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = { + if (exitMarks contains this) { + // we are marked, do nothing + } + else { + exitMarks += this // mark this as exiting + + // exit linked processes + val iter = links.elements + while (iter.hasNext) { + val linked = iter.next + unlink(linked) + linked.exit(this, reason) + } + exitMarks -= this + } + } + + val excHandlerDescs = new Stack[ExcHandlerDesc] + + // exception handler table + val excHandlers = new HashMap[Int, ExcHandler] + + def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { + // locality check (handler local to this actor?) + if (destDesc.p == this) { + handleExc(destDesc, e) + } + else { + // forward to process of destination descriptor + destDesc.p.handleExc(destDesc, e) + } + } + + def pushExcHandlerDesc(desc: ExcHandlerDesc) = + excHandlerDescs += desc + + /** is only called for local handlers + (i.e. destDesc.p == this) */ + def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = + (excHandlers get destDesc.eid) match { + case Some(handler) => + handler.handle(e) + case None => + error("exc desc refers to non-registered handler") + } + + private var excCnt = 0 + private def freshExcId = { excCnt = excCnt + 1; excCnt } + + def tryAsync(block: => Unit, + handlerFun: PartialFunction[Throwable, Unit]) = { + val excHandler = + new ExcHandler(handlerFun, + this, + if (excHandlerDescs.isEmpty) null + else excHandlerDescs.top) + + val desc = ExcHandlerDesc(this, freshExcId) + // associate desc with handler + excHandlers += desc.eid -> excHandler + // push desc onto stack + excHandlerDescs += desc + //Console.println("desc stack height: " + excHandlerDescs.length) + // execute code block + block + } + + def process(f: PartialFunction[Any,unit], msg: Any): unit = { + try { + f(msg) + } + catch { + case d: Done => + throw new Done + case t: Throwable => + throw t + if (!excHandlerDescs.isEmpty) + forwardExc(excHandlerDescs.top, t) + else + die(Symbol(t.toString())) + } + } + + override def receive(f: PartialFunction[Any,unit]): scala.All = { + if (isAlive) { + Scheduler.tick(this) + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + process(f, msg) + die() + case None => + continuation = f + //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + override def receiveMsg(msg: Any) = { + //Debug.info("" + Thread.currentThread() + ": Resuming " + this) + if (continuation != null) { + val f = continuation + continuation = null + scheduled = false + process(f, msg) + die() + } + else { + // use more complex receive-and-return continuation + val cases = contCases + val then = contThen + contCases = null + contThen = null + scheduled = false + val result = cases(msg) + then(result) + die() + } + } + + def die(reason: Symbol) = { + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + exit(reason) + } + } + + override def die() = { + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + exit('normal) + } + } +} + +case class ExcHandlerDesc(p: Process, eid: Int) + +class ExcHandler(actions: PartialFunction[Throwable, Unit], + p: Process, + parent: ExcHandlerDesc) { + def handle(e: Throwable): Unit = { + if (!actions.isDefinedAt(e)) { + if (parent != null) p.forwardExc(parent, e) + } + else + actions(e) + } +} diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala index 8b6893b43e..033df00ef0 100644 --- a/src/actors/scala/actors/multi/ReceiverTask.scala +++ b/src/actors/scala/actors/multi/ReceiverTask.scala @@ -13,7 +13,7 @@ package scala.actors.multi /** * @author Philipp Haller */ -class ReceiverTask(val actor: MailBox, msg: MailBox#Message) extends Runnable { +class ReceiverTask(val actor: MailBox, msg: Any) extends Runnable { def run(): Unit = { try { actor receiveMsg msg diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/multi/Scheduler.scala index 2133995720..9deb9c8148 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/multi/Scheduler.scala @@ -8,12 +8,10 @@ // $Id$ -package scala.actors +package scala.actors.multi import scala.collection.mutable._ -import scala.actors.multi._ - /** * @author Philipp Haller */ @@ -21,6 +19,7 @@ abstract class IScheduler /*extends java.util.concurrent.Executor*/ { def execute(item: ReceiverTask): Unit def getTask(worker: WorkerThread): Runnable def tick(a: MailBox): Unit + def getProcess(t: Thread): Process val QUIT_TASK = new Runnable() { def run(): Unit = {} @@ -47,6 +46,9 @@ object Scheduler /*extends java.util.concurrent.Executor*/ { def tick(a: MailBox) = sched.tick(a) + + def getProcess(t: Thread): Process = + sched.getProcess(t) } @@ -57,6 +59,7 @@ class SpareWorkerScheduler2 extends IScheduler { val idle = new Queue[WorkerThread]; val ticks = new HashMap[WorkerThread, long] val executing = new HashMap[MailBox, WorkerThread] + val rexec = new HashMap[Thread, MailBox] var TICKFREQ = 50 @@ -81,10 +84,18 @@ class SpareWorkerScheduler2 extends IScheduler { } } + def getProcess(t: Thread): Process = synchronized { + rexec.get(t) match { + case None => null + case Some(p: Process) => p + } + } + def execute(item: ReceiverTask): unit = synchronized { if (idle.length > 0) { val wt = idle.dequeue executing.update(item.actor, wt) + rexec.update(wt, item.actor) wt.execute(item) } else { @@ -116,6 +127,7 @@ class SpareWorkerScheduler2 extends IScheduler { maxWorkers = workers.length // statistics executing.update(item.actor, newWorker) + rexec.update(newWorker, item.actor) newWorker.execute(item) newWorker.start() @@ -132,6 +144,7 @@ class SpareWorkerScheduler2 extends IScheduler { if (tasks.length > 0) { val item = tasks.dequeue executing.update(item.actor, worker) + rexec.update(worker, item.actor) item } else { @@ -144,9 +157,9 @@ class SpareWorkerScheduler2 extends IScheduler { /** * @author Philipp Haller */ -class SpareWorkerScheduler extends IScheduler { +abstract class SpareWorkerScheduler extends IScheduler { private var canQuit = false; - private val tasks = new Queue[Runnable]; + private val tasks = new Queue[ReceiverTask]; private val idle = new Queue[WorkerThread]; private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; @@ -201,9 +214,9 @@ class SpareWorkerScheduler extends IScheduler { /** * @author Philipp Haller */ -class FixedWorkersScheduler(workercnt: int) extends IScheduler { +abstract class FixedWorkersScheduler(workercnt: int) extends IScheduler { private var canQuit = false; - private val tasks = new Queue[Runnable]; + private val tasks = new Queue[ReceiverTask]; private val idle = new Queue[WorkerThread]; //Console.println("Running with " + workercnt + " workers") diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/multi/TimerThread.scala index 075d14fe57..2b91ba13ca 100644 --- a/src/actors/scala/actors/TimerThread.scala +++ b/src/actors/scala/actors/multi/TimerThread.scala @@ -8,13 +8,10 @@ // $Id$ -package scala.actors +package scala.actors.multi import scala.collection.mutable.PriorityQueue -import scala.actors.multi.Actor -import scala.actors.multi.MailBox - /** * This class allows the (locl) sending of a message to an actor after * a timeout. Used by the library to build receiveWithin(time:long). @@ -74,7 +71,7 @@ object TimerThread extends AnyRef with Runnable { lateList = Nil } - def requestSignal(a: Actor, waitMillis: long, reason: String): unit = this.synchronized { + def requestSignal(a: multi.Actor, waitMillis: long, reason: String): unit = this.synchronized { Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason) val wakeTime = now + waitMillis if (waitMillis < 0) { @@ -146,7 +143,7 @@ object TimerThreadTest { new Tester (500, "TWO").start } - class Tester (duration : int, name:String) extends Actor { + class Tester (duration : int, name:String) extends multi.Actor { var i = 0 def loop:unit = { diff --git a/src/actors/scala/actors/WorkerThread.scala b/src/actors/scala/actors/multi/WorkerThread.scala index e46cdc53c9..8c5a4ecb1e 100644 --- a/src/actors/scala/actors/WorkerThread.scala +++ b/src/actors/scala/actors/multi/WorkerThread.scala @@ -8,7 +8,7 @@ // $Id$ -package scala.actors +package scala.actors.multi /** * @author Philipp Haller diff --git a/src/actors/scala/actors/single/Actor.scala b/src/actors/scala/actors/single/Actor.scala index d323c51d57..fa799db985 100644 --- a/src/actors/scala/actors/single/Actor.scala +++ b/src/actors/scala/actors/single/Actor.scala @@ -13,70 +13,13 @@ package scala.actors.single /** * @author Philipp Haller */ -abstract class Actor extends MailBox { - def run: Unit = {} +trait Actor extends scala.actors.Actor with MailBox { + def run(): Unit = {} - def start: Unit = { - try { run } - catch { - case d:Done => - // do nothing - } + def start(): Unit = try { run() } + catch { + case d: Done => // do nothing } - private var pid: Pid = null - - def self: Pid = { - if (pid == null) pid = new LocalPid(this) - pid - } - - def self_= (p: Pid) = pid = p - - def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = { - receive { - case Pair(pid1, msg1) => receive { - case Pair(pid2, msg2) => cont match { - case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) - } - } - case Pair(pid2, msg2) => receive { - case Pair(pid1, msg1) => cont match { - case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) - } - } - } - } - - def spawn(body: Actor => Unit): Pid = { - val a = new Actor { - override def run = body(this) - } - a.start - a.self - } - - def spawn(a: Actor): Pid = { - a.start - a.self - } - - def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = { - val a = new Actor { - override def run = receive(cases) - } - a.start - a.self - } - - def makeRef = Actor.makeRef -} - -object Actor { - private var counter = 0 - type Tag = int - def makeRef: Tag = { - counter = counter + 1 - counter - } + def !(msg: Any): Unit = send(msg) } diff --git a/src/actors/scala/actors/single/LocalPid.scala b/src/actors/scala/actors/single/LocalPid.scala deleted file mode 100644 index f04de6ab9c..0000000000 --- a/src/actors/scala/actors/single/LocalPid.scala +++ /dev/null @@ -1,93 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - -// $Id$ - -package scala.actors.single - -import scala.collection.mutable.Queue - -/** - * @author Philipp Haller - */ -class LocalPid(actor: Actor) extends Pid { - var target = actor - - def !(msg: MailBox#Message): unit = target send msg - - def become(clos: Actor => Unit) = { - // old actor should become anonymous (cannot receive any messages any more) - // achieved by removing it from target of pid. - - val oldActor = target - //Debug.info("old actor: " + oldActor); - - // change our target to point to a newly created actor with the same mailbox. - - val newActor = new Actor { - override def run: Unit = clos(this) - } - newActor.sent = oldActor.sent - target = newActor - newActor.self = this - - //Debug.info("new actor: " + newActor); - - // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor) - oldActor.sent = new Queue[MailBox#Message] - - //Debug.info("Starting new actor."); - newActor.start // important to start after changing pid because actor may send messages to itself. - } - - private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] { - def isDefinedAt(m: MailBox#Message): Boolean = f.isDefinedAt(m) - def apply(m: MailBox#Message): unit = { - f(m) - a receive this - } - } - - def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]) = { - - become(a => a receive new ProxyPartialFunction(a, f)) - - /*become( - a:Actor => { - def loop: Unit = { - def proxyFun(m: MailBox#Message): Unit = { - if (f.isDefinedAt(m)) { - f(m); - loop - } - }; - //a receive proxyFun - } - loop - } - )*/ - } - - def spawn(body: Actor => Unit): Pid = { - val a = new Actor { - override def run: Unit = body(this) - } - a.start - a.self - } - - def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = { - val a = new Actor { - override def run: Unit = receive(cases) - } - a.start - a.self - } - - override def toString() = "LocalPid(" + target + ")" -} diff --git a/src/actors/scala/actors/single/MailBox.scala b/src/actors/scala/actors/single/MailBox.scala index f42b287323..3e8116a794 100644 --- a/src/actors/scala/actors/single/MailBox.scala +++ b/src/actors/scala/actors/single/MailBox.scala @@ -15,24 +15,20 @@ import scala.collection.mutable.Queue /** * @author Philipp Haller */ -class MailBox { - - type Message = AnyRef - case class TIMEOUT() extends Message - +trait MailBox { /** Unconsumed messages. */ - var sent = new Queue[Message] + var sent = new Queue[Any] - var continuation: PartialFunction[Message,Unit] = null + var continuation: PartialFunction[Any,Unit] = null // more complex continuation - var contCases: PartialFunction[Message,Message] = null - var contThen: Message => unit = null + var contCases: PartialFunction[Any,Any] = null + var contThen: Any => unit = null def hasCont = if ((continuation == null) && (contCases == null)) false else true - def contDefinedAt(msg: Message) = + def contDefinedAt(msg: Any) = if (((continuation != null) && continuation.isDefinedAt(msg)) || ((contCases != null) && contCases.isDefinedAt(msg))) true @@ -45,7 +41,7 @@ class MailBox { private var timeInitial: Long = 0 private var timeoutEnabled: Boolean = false - def send(msg: Message): unit = synchronized { + def send(msg: Any): Unit = synchronized { if (isAlive) if (!hasCont) { Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") @@ -98,7 +94,7 @@ class MailBox { } } - def receive(f: PartialFunction[Message,unit]): scala.All = { + def receive(f: PartialFunction[Any, Unit]): Nothing = { continuation = null sent.dequeueFirst(f.isDefinedAt) match { case Some(msg) => @@ -111,7 +107,7 @@ class MailBox { throw new Done } - def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = { + def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { timeInitial = System.currentTimeMillis() duration = msec @@ -136,11 +132,7 @@ class MailBox { throw new Done } - // original wish: - // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B - // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit - - def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): scala.All = { + def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Nothing = { contCases = null contThen = null sent.dequeueFirst(cases.isDefinedAt) match { diff --git a/src/actors/scala/actors/single/Process.scala b/src/actors/scala/actors/single/Process.scala new file mode 100644 index 0000000000..3f8e0255bc --- /dev/null +++ b/src/actors/scala/actors/single/Process.scala @@ -0,0 +1,77 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: LocalPid.scala 7969 2006-06-23 11:44:31Z michelou $ + +package scala.actors.single + +import scala.collection.mutable.HashSet + +/** + * @author Philipp Haller + */ +class Process extends scala.actors.Process with Actor { + case class Exit(from: scala.actors.Process, reason: Symbol) + + private val links = new HashSet[scala.actors.Process] + + def link(to: scala.actors.Process): Unit = { + links += to + to.linkTo(this) + } + + def linkTo(to: scala.actors.Process): Unit = links += to + + def unlink(from: scala.actors.Process): Unit = { + links -= from + from.unlinkFrom(this) + } + + def unlinkFrom(from: scala.actors.Process): Unit = links -= from + + private var trapExit = false + + def processFlag(flag: Symbol, set: boolean) = { + if (flag.name.equals("trapExit")) trapExit = set + } + + def exit(reason: Symbol): Unit = { + exitLinked(reason, new HashSet[Process]) + if (isAlive) isAlive = false + } + + def exit(from: scala.actors.Process, reason: Symbol): Unit = { + if (from == this) { + exit(reason) + } + else { + if (trapExit) + this ! Exit(from, reason) + else if (!reason.name.equals("normal")) + exit(reason) + } + } + + def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = { + if (exitMarks contains this) { + // we are marked, do nothing + } + else { + exitMarks += this // mark this as exiting + + // exit linked processes + val iter = links.elements + while (iter.hasNext) { + val linked = iter.next + unlink(linked) + linked.exit(this, reason) + } + exitMarks -= this + } + } +} |