summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-07-14 12:45:15 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-07-14 12:45:15 +0000
commit7796d36f0bf884d8022590b2b66c706547fd8d39 (patch)
treecf36d16c337dbc8be8777f1d8c0ecae08c54938b
parent9d95c090f440956f856468dc59bdfd0ff560314b (diff)
downloadscala-7796d36f0bf884d8022590b2b66c706547fd8d39.tar.gz
scala-7796d36f0bf884d8022590b2b66c706547fd8d39.tar.bz2
scala-7796d36f0bf884d8022590b2b66c706547fd8d39.zip
Refactored actors lib.
-rw-r--r--src/actors/scala/actors/Actor.scala (renamed from src/actors/scala/actors/multi/AbstractPid.scala)12
-rw-r--r--src/actors/scala/actors/Done.scala10
-rw-r--r--src/actors/scala/actors/Process.scala (renamed from src/actors/scala/actors/single/Pid.scala)15
-rw-r--r--src/actors/scala/actors/SPanel.scala80
-rw-r--r--src/actors/scala/actors/TIMEOUT.scala (renamed from src/actors/scala/actors/single/AbstractPid.scala)13
-rw-r--r--src/actors/scala/actors/distributed/JavaSerializer.scala4
-rw-r--r--src/actors/scala/actors/distributed/MessagesComb.scala35
-rw-r--r--src/actors/scala/actors/distributed/NetKernel.scala11
-rw-r--r--src/actors/scala/actors/distributed/NodeComb.scala2
-rw-r--r--src/actors/scala/actors/distributed/RemoteActor.scala15
-rw-r--r--src/actors/scala/actors/distributed/RemotePid.scala20
-rw-r--r--src/actors/scala/actors/distributed/Serializer.scala4
-rw-r--r--src/actors/scala/actors/distributed/SystemMessage.scala28
-rw-r--r--src/actors/scala/actors/distributed/TcpSerializerComb.scala6
-rw-r--r--src/actors/scala/actors/distributed/picklers/BytePickle.scala436
-rw-r--r--src/actors/scala/actors/distributed/picklers/SStreamPickle.scala519
-rw-r--r--src/actors/scala/actors/distributed/picklers/Stream.scala81
-rw-r--r--src/actors/scala/actors/distributed/picklers/UTF8Codec.scala77
-rw-r--r--src/actors/scala/actors/gui/Publisher.scala26
-rw-r--r--src/actors/scala/actors/multi/Actor.scala293
-rw-r--r--src/actors/scala/actors/multi/LocalPid.scala104
-rw-r--r--src/actors/scala/actors/multi/MailBox.scala35
-rw-r--r--src/actors/scala/actors/multi/Pid.scala30
-rw-r--r--src/actors/scala/actors/multi/Process.scala281
-rw-r--r--src/actors/scala/actors/multi/ReceiverTask.scala2
-rw-r--r--src/actors/scala/actors/multi/Scheduler.scala (renamed from src/actors/scala/actors/Scheduler.scala)27
-rw-r--r--src/actors/scala/actors/multi/TimerThread.scala (renamed from src/actors/scala/actors/TimerThread.scala)9
-rw-r--r--src/actors/scala/actors/multi/WorkerThread.scala (renamed from src/actors/scala/actors/WorkerThread.scala)2
-rw-r--r--src/actors/scala/actors/single/Actor.scala69
-rw-r--r--src/actors/scala/actors/single/LocalPid.scala93
-rw-r--r--src/actors/scala/actors/single/MailBox.scala28
-rw-r--r--src/actors/scala/actors/single/Process.scala77
32 files changed, 506 insertions, 1938 deletions
diff --git a/src/actors/scala/actors/multi/AbstractPid.scala b/src/actors/scala/actors/Actor.scala
index 45133c3918..593681158c 100644
--- a/src/actors/scala/actors/multi/AbstractPid.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -6,15 +6,13 @@
** |/ **
\* */
-// $Id$
-
-package scala.actors.multi
+package scala.actors
/**
* @author Philipp Haller
*/
-trait AbstractPid {
- def !(msg: MailBox#Message): Unit
- def become(clos: Actor => Unit): Unit
- def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit
+trait Actor {
+ def run(): Unit
+ def start(): Unit
+ def !(msg: Any): Unit
}
diff --git a/src/actors/scala/actors/Done.scala b/src/actors/scala/actors/Done.scala
index e0ef9ba057..2291df6399 100644
--- a/src/actors/scala/actors/Done.scala
+++ b/src/actors/scala/actors/Done.scala
@@ -17,13 +17,3 @@ class Done extends Throwable {
override def fillInStackTrace(): Throwable =
this;
}
-
-class ContinueException extends Throwable {
- override def fillInStackTrace(): Throwable =
- this;
-}
-
-class AbortException extends Throwable {
- override def fillInStackTrace(): Throwable =
- this;
-}
diff --git a/src/actors/scala/actors/single/Pid.scala b/src/actors/scala/actors/Process.scala
index 8c92568b6d..e79e2e2730 100644
--- a/src/actors/scala/actors/single/Pid.scala
+++ b/src/actors/scala/actors/Process.scala
@@ -6,15 +6,16 @@
** |/ **
\* */
-// $Id$
-
-package scala.actors.single
+package scala.actors
/**
* @author Philipp Haller
*/
-abstract class Pid {
- def !(msg: MailBox#Message): Unit
- //def become(clos: Actor => Unit): Unit
- //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit
+trait Process extends Actor {
+ def link(to: Process): Unit
+ def linkTo(to: Process): Unit
+ def unlink(from: Process): Unit
+ def unlinkFrom(from: Process): Unit
+ def exit(reason: Symbol): Unit
+ def exit(from: Process, reason: Symbol): Unit
}
diff --git a/src/actors/scala/actors/SPanel.scala b/src/actors/scala/actors/SPanel.scala
deleted file mode 100644
index b785907722..0000000000
--- a/src/actors/scala/actors/SPanel.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors
-
-import java.awt._
-import java.awt.event._
-import javax.swing.event._
-import javax.swing._
-import java.io._
-
-/*
- * SPanel.scala
- * GUI for simple texts
- *
- * @author Sebastien Noir
- */
-class SPanel(WIDTH: Int, HEIGHT: Int, title: String) extends JFrame {
-
- private var textArea:JTextArea = null
- private var resetButton:JButton = null
-
- private var scrollPane:JScrollPane = null
- private var panel:JPanel = null
- private var contentPane:Container = null
- private var levelChoice:JComboBox = null
- private var formatChoice:JComboBox = null
-
- //init
- setTitle(title);
- setSize(WIDTH, HEIGHT);
- this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
-
- contentPane = getContentPane();
- panel = new JPanel();
- contentPane.add(panel, BorderLayout.SOUTH);
-
- // Add a text area with scroll bars
- textArea = new JTextArea(8, 40);
- scrollPane = new JScrollPane(textArea);
- contentPane.add(scrollPane, BorderLayout.CENTER);
-
- // Add a "reset textarea" button
- resetButton = new JButton("Clear");
- panel.add(resetButton);
- resetButton.addActionListener(
- new ActionListener() {
- def actionPerformed(evt:ActionEvent):Unit= {
- textArea.setText("");
- }
- }
- );
-
- this.repaint()
- this.show()
-
- def addText(text: String): Unit = {
- textArea.append(text+'\n');
-
- if ( textArea.getHeight() > scrollPane.getHeight() ) {
- scrollPane.getVerticalScrollBar().setValue(scrollPane.getVerticalScrollBar().getMaximum());
- }
- repaint();
- }
-/*
- def actionPerformed(ActionEvent e):Unit {
- }
-*/
-
- override def paint(g: Graphics): Unit =
- super.paint(g)
-
-}
diff --git a/src/actors/scala/actors/single/AbstractPid.scala b/src/actors/scala/actors/TIMEOUT.scala
index 74aa004567..63a4ad322f 100644
--- a/src/actors/scala/actors/single/AbstractPid.scala
+++ b/src/actors/scala/actors/TIMEOUT.scala
@@ -6,15 +6,6 @@
** |/ **
\* */
-// $Id$
+package scala.actors
-package scala.actors.single
-
-/**
- * @author Philipp Haller
- */
-trait AbstractPid {
- def !(msg: MailBox#Message): Unit
- def become(clos: Actor => Unit): Unit
- def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]): Unit
-}
+case class TIMEOUT()
diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala
index a4774503cf..5b07e5e9c8 100644
--- a/src/actors/scala/actors/distributed/JavaSerializer.scala
+++ b/src/actors/scala/actors/distributed/JavaSerializer.scala
@@ -12,7 +12,7 @@ package scala.actors.distributed
import java.io._
-import scala.actors.distributed.picklers.BytePickle.SPU
+import scala.io.BytePickle.SPU
import scala.actors.multi.Pid
[serializable]
@@ -36,6 +36,6 @@ class JavaSerializer(serv: Service) extends Serializer(serv) {
in.readObject()
}
- def pid: SPU[Pid] = null
+ def pid: SPU[RemotePid] = null
def addRep(name: String, repCons: Serializer => AnyRef): Unit = {}
}
diff --git a/src/actors/scala/actors/distributed/MessagesComb.scala b/src/actors/scala/actors/distributed/MessagesComb.scala
index 31fe01ebc7..ea30e7e5b4 100644
--- a/src/actors/scala/actors/distributed/MessagesComb.scala
+++ b/src/actors/scala/actors/distributed/MessagesComb.scala
@@ -10,40 +10,39 @@
package scala.actors.distributed
-import scala.actors.distributed.picklers.BytePickle._
-import scala.actors.multi.Pid
+import scala.io.BytePickle._
/**
* @author Philipp Haller
*/
object MessagesComb {
def sendPU(ser: Serializer): SPU[Send] =
- wrap((p: Pair[Pid,Array[byte]]) => Send(p._1, p._2),
+ wrap((p: Pair[RemotePid,Array[byte]]) => Send(p._1, p._2),
(s: Send) => Pair(s.rec, s.data),
pair(ser.pid, bytearray));
+ def namedSendPU: SPU[NamedSend] =
+ wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2),
+ (ns: NamedSend) => Pair(ns.sym, ns.data),
+ pair(symbolPU, bytearray));
+
def spawnPU(ser: Serializer): SPU[Spawn] =
- wrap((p: Pair[Pid,String]) => Spawn(p._1, p._2),
+ wrap((p: Pair[RemotePid,String]) => Spawn(p._1, p._2),
(s: Spawn) => Pair(s.replyto, s.p),
pair(ser.pid, string));
- def symbolPU: SPU[Symbol] =
- wrap((s: String) => Symbol(s),
- (sym: Symbol) => sym.name,
- string);
+ def spawnObjectPU(ser: Serializer): SPU[SpawnObject] =
+ wrap((p: Pair[RemotePid,Array[byte]]) => SpawnObject(p._1, p._2),
+ (s: SpawnObject) => Pair(s.replyto, s.data),
+ pair(ser.pid, bytearray));
def exitPU(ser: Serializer): SPU[Exit1] =
- wrap((p: Triple[Pid,Pid,Symbol]) => Exit1(p._1, p._2, p._3),
+ wrap((p: Triple[RemotePid,RemotePid,Symbol]) => Exit1(p._1, p._2, p._3),
(e: Exit1) => Triple(e.from, e.to, e.reason),
triple(ser.pid, ser.pid, symbolPU));
- def spawnObjectPU(ser: Serializer): SPU[SpawnObject] =
- wrap((p: Pair[Pid,Array[byte]]) => SpawnObject(p._1, p._2),
- (s: SpawnObject) => Pair(s.replyto, s.data),
- pair(ser.pid, bytearray));
-
- def namedSendPU: SPU[NamedSend] =
- wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2),
- (ns: NamedSend) => Pair(ns.sym, ns.data),
- pair(symbolPU, bytearray));
+ def symbolPU: SPU[Symbol] =
+ wrap((s: String) => Symbol(s),
+ (sym: Symbol) => sym.name,
+ string);
}
diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala
index 8d479df377..8cdc2ffdca 100644
--- a/src/actors/scala/actors/distributed/NetKernel.scala
+++ b/src/actors/scala/actors/distributed/NetKernel.scala
@@ -55,7 +55,7 @@ class NetKernel(service: Service) {
/** only called if destDesc is local. */
def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
- destDesc.pid match {
+ destDesc.p match {
case rpid: RemotePid =>
(rtable get rpid.localId) match {
case Some(actor) =>
@@ -67,7 +67,7 @@ class NetKernel(service: Service) {
def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) =
// locality check (handler local to this node?)
- destDesc.pid match {
+ destDesc.p match {
case rpid: RemotePid =>
if (rpid.node == this.node)
handleExc(destDesc, e)
@@ -121,9 +121,12 @@ class NetKernel(service: Service) {
def localSend(pid: RemotePid, msg: AnyRef): Unit =
localSend(pid.localId, msg)
- def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized {
+ def remoteSend(pid: RemotePid, msg: Any) = synchronized {
//Console.println("NetKernel: Remote msg delivery to " + pid)
- service.remoteSend(pid, msg)
+ msg match {
+ case m: AnyRef =>
+ service.remoteSend(pid, m)
+ }
}
def namedSend(name: Name, msg: AnyRef): Unit =
diff --git a/src/actors/scala/actors/distributed/NodeComb.scala b/src/actors/scala/actors/distributed/NodeComb.scala
index b3a0c44bc7..5866c87983 100644
--- a/src/actors/scala/actors/distributed/NodeComb.scala
+++ b/src/actors/scala/actors/distributed/NodeComb.scala
@@ -10,7 +10,7 @@
package scala.actors.distributed
-import scala.actors.distributed.picklers.BytePickle._
+import scala.io.BytePickle._
/**
* @author Philipp Haller
diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala
index 51e6c6690a..7920817e95 100644
--- a/src/actors/scala/actors/distributed/RemoteActor.scala
+++ b/src/actors/scala/actors/distributed/RemoteActor.scala
@@ -10,7 +10,7 @@
package scala.actors.distributed
-import scala.actors.multi.{MailBox,Actor,Pid,LocalPid,ExcHandlerDesc}
+import scala.actors.multi.{MailBox,Process,ExcHandlerDesc}
import scala.collection.mutable.{HashMap,Stack}
abstract class ServiceName
@@ -20,17 +20,17 @@ case class TCP() extends ServiceName
/**
* @author Philipp Haller
*/
-class RemoteActor extends Actor {
+class RemoteActor extends Process {
override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
// locality check (handler local to this actor?)
- if (destDesc.pid == self)
+ if (destDesc.p == this)
handleExc(destDesc, e)
else
kernel.forwardExc(destDesc, e)
}
- override def receive(f: PartialFunction[Message,Unit]): scala.All = {
+ override def receive(f: PartialFunction[Any,Unit]): scala.All = {
if (isAlive) {
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
@@ -64,7 +64,7 @@ class RemoteActor extends Actor {
private var selfCached: RemotePid = null
- override def self: RemotePid = {
+ def self: RemotePid = {
if (selfCached == null)
selfCached = kernel pidOf this
selfCached
@@ -103,10 +103,7 @@ class RemoteActor extends Actor {
selfCached = service.kernel.register(this)
}
- def node(pid: Pid): Node = pid match {
- case rpid: RemotePid => rpid.node
- case lpid: LocalPid => null
- }
+ def node(pid: RemotePid) = pid.node
def disconnectNode(node: Node) =
kernel.disconnectNode(node)
diff --git a/src/actors/scala/actors/distributed/RemotePid.scala b/src/actors/scala/actors/distributed/RemotePid.scala
index ec3df043ba..a597411dde 100644
--- a/src/actors/scala/actors/distributed/RemotePid.scala
+++ b/src/actors/scala/actors/distributed/RemotePid.scala
@@ -10,7 +10,7 @@
package scala.actors.distributed
-import scala.actors.multi.{Pid,MailBox,ExcHandlerDesc}
+import scala.actors.multi.{MailBox,ExcHandlerDesc}
import java.io._
@@ -18,7 +18,7 @@ import java.io._
* @author Philipp Haller
*/
[serializable]
-abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends Pid {
+abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends scala.actors.multi.Process {
def this() = this(0, null, null) // for serialization
private var _locId = locId
@@ -54,7 +54,7 @@ abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extend
def kernel = kern;
- def !(msg: MailBox#Message): unit = {
+ override def !(msg: Any): unit = {
//Console.println("! " + msg)
if (actor != null)
actor send msg
@@ -62,41 +62,41 @@ abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extend
kernel.remoteSend(this, msg)
}
- def link(other: Pid): unit =
+ override def link(other: Process): Unit =
other match {
case rpid: RemotePid =>
kernel.link(this, rpid)
};
//TODO (dont know if this is local to kernel.node)
- def linkTo(other: Pid): unit =
+ override def linkTo(other: Process): Unit =
other match {
case rpid: RemotePid =>
// do nothing
};
- def unlink(other: Pid): unit =
+ override def unlink(other: Process): Unit =
other match {
case rpid: RemotePid =>
kernel.unlink(this, rpid)
};
//TODO (dont know if this is local to kernel.node)
- def unlinkFrom(other: Pid): unit =
+ override def unlinkFrom(other: Process): Unit =
other match {
case rpid: RemotePid =>
// do nothing
};
- def exit(reason: Symbol): unit = kernel.exit(this, reason);
- def exit(from: Pid, reason: Symbol): unit = {
+ override def exit(reason: Symbol): Unit = kernel.exit(this, reason);
+ override def exit(from: Process, reason: Symbol): unit = {
from match {
case rpid: RemotePid =>
kernel.exit(rpid, reason);
}
}
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {}
+ override def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {}
}
[serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) {
diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala
index 096d2711d2..553ac99312 100644
--- a/src/actors/scala/actors/distributed/Serializer.scala
+++ b/src/actors/scala/actors/distributed/Serializer.scala
@@ -11,8 +11,8 @@
package scala.actors.distributed
import java.io.{DataInputStream,DataOutputStream,EOFException}
+import scala.io.BytePickle.SPU
-import scala.actors.distributed.picklers.BytePickle.SPU
import scala.actors.multi.Pid
/**
@@ -57,7 +57,7 @@ abstract class Serializer(s: Service) {
writeBytes(outputStream, bytes)
}
- def pid: SPU[Pid]
+ def pid: SPU[RemotePid]
def service = s
def addRep(name: String, repCons: Serializer => AnyRef): unit
}
diff --git a/src/actors/scala/actors/distributed/SystemMessage.scala b/src/actors/scala/actors/distributed/SystemMessage.scala
index d4441a1b66..274100f98f 100644
--- a/src/actors/scala/actors/distributed/SystemMessage.scala
+++ b/src/actors/scala/actors/distributed/SystemMessage.scala
@@ -10,30 +10,26 @@
package scala.actors.distributed
-import scala.actors.multi.{Pid,ExcHandlerDesc}
+import scala.actors.multi.ExcHandlerDesc
abstract class MessageTyper {
type DataType = Array[Byte]
}
-abstract class SystemMessage
-case class Send(rec: Pid, data: MessageTyper#DataType) extends SystemMessage
-case class Spawn(replyto: Pid, p: String) extends SystemMessage
-case class PidReply(res: Pid) extends SystemMessage
-case class Disconnect() extends SystemMessage
+case class Send(rec: RemotePid, data: MessageTyper#DataType)
+case class NamedSend(sym: Symbol, data: MessageTyper#DataType)
+case class Spawn(replyto: RemotePid, p: String)
+case class SpawnObject(replyto: RemotePid, data: MessageTyper#DataType)
+case class Exit1(from: RemotePid, to: RemotePid, reason: Symbol)
-case class NodeDown() extends SystemMessage
+case class RemotePidReply(res: RemotePid)
+case class Disconnect()
+case class NodeDown()
// CAUTION: Tells "from" to create a _uni-directional_ link!
-case class Link(from: Pid, to: Pid) extends SystemMessage
-case class UnLink(from: Pid, to: Pid) extends SystemMessage
-case class Exit1(from: Pid, to: Pid, reason: Symbol) extends SystemMessage
-
-case class SpawnObject(replyto: Pid, data: MessageTyper#DataType) extends SystemMessage
-
-case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage
-
-case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage
+case class Link(from: RemotePid, to: RemotePid)
+case class UnLink(from: RemotePid, to: RemotePid)
+case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable)
/*
case class NamedSendRep (ser:Serializer) extends TypeRep[NamedSend](ser) {
diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
index 7c7ab87eee..cc7fb9477a 100644
--- a/src/actors/scala/actors/distributed/TcpSerializerComb.scala
+++ b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
@@ -11,8 +11,8 @@
package scala.actors.distributed
import java.io.Reader
+import scala.io.BytePickle._
-import scala.actors.distributed.picklers.BytePickle._
import scala.actors.distributed.MessagesComb._
import scala.actors.distributed.NodeComb._
import scala.actors.multi.Pid
@@ -46,14 +46,14 @@ class TcpSerializerComb(serv: Service) extends Serializer(serv) {
lookup(new String(content))
}
- def pid: SPU[Pid] = {
+ def pid: SPU[RemotePid] = {
val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel,
if (p._1 == serv.node) serv.kernel.getLocalRef(p._2)
else null),
(t: TcpPid) => Pair(t.node, t.localId),
pair(tcpNodePU, nat));
- wrap((p:Pid) => p, (pid:Pid) => pid match {
+ wrap((p:RemotePid) => p, (pid:RemotePid) => pid match {
case tpid: TcpPid =>
tpid
case other =>
diff --git a/src/actors/scala/actors/distributed/picklers/BytePickle.scala b/src/actors/scala/actors/distributed/picklers/BytePickle.scala
deleted file mode 100644
index 796fc9a353..0000000000
--- a/src/actors/scala/actors/distributed/picklers/BytePickle.scala
+++ /dev/null
@@ -1,436 +0,0 @@
-package scala.actors.distributed.picklers
-
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.ArrayBuffer
-
-/**
- Pickler combinators.
-
- Author: Philipp Haller <philipp.haller@epfl.ch>
- */
-
-object BytePickle {
- class PicklerState(val stream: Array[byte], val dict: PicklerEnv) {}
- class UnPicklerState(val stream: Array[byte], val dict: UnPicklerEnv) {}
-
- abstract class PU[t] {
- def appP(a: t, state: Array[byte]): Array[byte];
- def appU(state: Array[byte]): Pair[t, Array[byte]];
- }
-
- abstract class SPU[t] {
- def appP(a: t, state: PicklerState): PicklerState;
- def appU(state: UnPicklerState): Pair[t, UnPicklerState];
- }
-
- class PicklerEnv extends HashMap[Any, int] {
- private var cnt: int = 64;
- def nextLoc() = { cnt = cnt + 1; cnt };
- }
-
- class UnPicklerEnv extends HashMap[int, Any] {
- private var cnt: int = 64;
- def nextLoc() = { cnt = cnt + 1; cnt };
- }
-
- abstract class RefDef;
- case class Ref() extends RefDef;
- case class Def() extends RefDef;
-
- def refDef: PU[RefDef] = new PU[RefDef] {
- def appP(b: RefDef, s: Array[byte]): Array[byte] =
- b match {
- case Ref() => Array.concat(s, (List[byte](0)).toArray)
- case Def() => Array.concat(s, (List[byte](1)).toArray)
- };
- def appU(s: Array[byte]): Pair[RefDef, Array[byte]] =
- if (s(0) == 0) Pair(Ref(), s.subArray(1, s.length))
- else Pair(Def(), s.subArray(1, s.length));
- }
-
- val REF = 0
- val DEF = 1
-
- def unat: PU[int] = new PU[int] {
- def appP(n: int, s: Array[byte]): Array[byte] =
- Array.concat(s, nat2Bytes(n));
- def appU(s: Array[byte]): Pair[int, Array[byte]] = {
- var num = 0
- def readNat: int = {
- var b = 0;
- var x = 0;
- do {
- b = s(num)
- num = num + 1
- x = (x << 7) + (b & 0x7f);
- } while ((b & 0x80) != 0);
- x
- }
- Pair(readNat, s.subArray(num, s.length))
- }
- }
-
- def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
- def appP(v: a, state: PicklerState): PicklerState = {
- /*
- - is there some value equal to v associated with a location l in the pickle environment?
- - yes: write REF-tag to outstream together with l
- - no:
- write DEF-tag to outstream
- record current location l of outstream
- --> serialize value
- add entry to pickle environment, mapping v onto l
- */
- val pe = state.dict
- pe.get(v) match {
- case None =>
- //Console.println("" + v + " is new")
- //Console.println("writing DEF...")
- val sPrime = refDef.appP(Def(), state.stream)
- val l = pe.nextLoc()
-
- //Console.println("applying pickler to state " + sPrime)
- val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
-
- //Console.println("updating dict (" + l + ") for " + v)
- pe.update(v, l)
-
- return sPrimePrime
- case Some(l) =>
- //Console.println("writing REF...")
- val sPrime = refDef.appP(Ref(), state.stream)
-
- //Console.println("writing location to state " + sPrime)
- return new PicklerState(unat.appP(l, sPrime), pe)
- }
- }
- def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
- /*
- - first, read tag (i.e. DEF or REF)
- - if REF:
- read location l
- look up resulting value in unpickler environment
- - if DEF:
- record location l of input stream
- --> deserialize value v with argument deserializer
- add entry to unpickler environment, mapping l onto v
- */
- val upe = state.dict
- val res = refDef.appU(state.stream)
- res._1 match {
- case Def() =>
- val l = upe.nextLoc
- val res2 = pa.appU(new UnPicklerState(res._2, upe))
- upe.update(l, res2._1)
- return res2
- case Ref() =>
- val res2 = unat.appU(res._2) // read location
- upe.get(res2._1) match { // lookup value in unpickler env
- case None => error("invalid unpickler environment"); return null
- case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
- }
- }
- }
- }
-
- def upickle[t](p: PU[t], a: t): Array[byte] =
- p.appP(a, new Array[byte](0));
-
- def uunpickle[t](p: PU[t], stream: Array[byte]): t =
- p.appU(stream)._1;
-
- def pickle[t](p: SPU[t], a: t): Array[byte] =
- p.appP(a, new PicklerState(new Array[byte](0), new PicklerEnv)).stream;
-
- def unpickle[t](p: SPU[t], stream: Array[byte]): t =
- p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
-
- def ulift[t](x: t): PU[t] = new PU[t] {
- def appP(a: t, state: Array[byte]): Array[byte] =
- if (x != a) { error("value to be pickled (" + a + ") != " + x); state }
- else state;
- def appU(state: Array[byte]) = Pair(x, state);
- }
-
- def lift[t](x: t): SPU[t] = new SPU[t] {
- def appP(a: t, state: PicklerState): PicklerState =
- if (x != a) { /*error("value to be pickled (" + a + ") != " + x);*/ state }
- else state;
- def appU(state: UnPicklerState) = Pair(x, state);
- }
-
- def usequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
- def appP(b: u, s: Array[byte]): Array[byte] = {
- val a = f(b)
- val sPrime = pa.appP(a, s)
- val pb = k(a)
- val sPrimePrime = pb.appP(b, sPrime)
- sPrimePrime
- }
- def appU(s: Array[byte]): Pair[u, Array[byte]] = {
- val resPa = pa.appU(s)
- val a = resPa._1
- val sPrime = resPa._2
- val pb = k(a)
- pb.appU(sPrime)
- }
- }
-
- def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
- def appP(b: u, s: PicklerState): PicklerState = {
- val a = f(b)
- //Console.println("pickling " + a + ", s: " + s.stream)
- val sPrime = pa.appP(a, s)
- val pb = k(a)
- //Console.println("pickling " + b + ", s: " + s.stream)
- pb.appP(b, sPrime)
- }
- def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
- val resPa = pa.appU(s)
- val a = resPa._1
- val sPrime = resPa._2
- val pb = k(a)
- pb.appU(sPrime)
- }
- }
-
- def upair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
- def fst(p: Pair[a,b]): a = p._1;
- def snd(p: Pair[a,b]): b = p._2;
- usequ(fst, pa, (x: a) => usequ(snd, pb, (y: b) => ulift(Pair(x, y))))
- }
-
- def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
- def fst(p: Pair[a,b]): a = p._1;
- def snd(p: Pair[a,b]): b = p._2;
- sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
- }
-
- def triple[a,b,c](pa: SPU[a], pb: SPU[b], pc: SPU[c]): SPU[Triple[a,b,c]] = {
- def fst(p: Triple[a,b,c]): a = p._1;
- def snd(p: Triple[a,b,c]): b = p._2;
- def trd(p: Triple[a,b,c]): c = p._3;
-
- sequ(fst, pa,
- (x: a) => sequ(snd, pb,
- (y: b) => sequ(trd, pc,
- (z: c) => lift(Triple(x, y, z)))))
- }
-
- def uwrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
- usequ(j, pa, (x: a) => ulift(i(x)));
-
- def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
- sequ(j, pa, (x: a) => lift(i(x)));
-
- def appendByte(a: Array[byte], b: int): Array[byte] = {
- Array.concat(a, (List[byte](b.asInstanceOf[byte])).toArray)
- }
-
- def nat2Bytes(x: int): Array[byte] = {
- val buf = new ArrayBuffer[byte]
- def writeNatPrefix(x: int): unit = {
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf += ((x & 0x7f) | 0x80).asInstanceOf[byte];
- }
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf += (x & 0x7f).asInstanceOf[byte];
- buf.toArray
- }
-
- def nat: SPU[int] = new SPU[int] {
- def appP(n: int, s: PicklerState): PicklerState = {
- new PicklerState(Array.concat(s.stream, nat2Bytes(n)), s.dict);
- }
- def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
- var num = 0
- def readNat: int = {
- var b = 0;
- var x = 0;
- do {
- b = s.stream(num)
- num = num + 1
- x = (x << 7) + (b & 0x7f);
- } while ((b & 0x80) != 0);
- x
- }
- Pair(readNat, new UnPicklerState(s.stream.subArray(num, s.stream.length), s.dict))
- }
- }
-
- def byte: SPU[byte] = new SPU[byte] {
- def appP(b: byte, s: PicklerState): PicklerState =
- new PicklerState(Array.concat(s.stream, (List[byte](b)).toArray), s.dict);
- def appU(s: UnPicklerState): Pair[byte, UnPicklerState] =
- Pair(s.stream(0), new UnPicklerState(s.stream.subArray(1, s.stream.length), s.dict));
- }
-
- def string: SPU[String] =
- share(wrap((a:Array[byte]) => UTF8Codec.decode(a, 0, a.length), (s:String) => UTF8Codec.encode(s), bytearray));
-
- def bytearray: SPU[Array[byte]] = {
- wrap((l:List[byte]) => l.toArray, .toList, list(byte))
- }
-
- def bool: SPU[boolean] = {
- def toEnum(b: boolean) = if (b) 1 else 0;
- def fromEnum(n: int) = if (n == 0) false else true;
- wrap(fromEnum, toEnum, nat)
- }
-
- def ufixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
- def pairToList(p: Pair[a,List[a]]): List[a] =
- p._1 :: p._2;
- def listToPair(l: List[a]): Pair[a,List[a]] =
- l match { case x :: xs => Pair(x, xs) }
-
- if (n == 0) ulift(Nil)
- else
- uwrap(pairToList, listToPair, upair(pa, ufixedList(pa)(n-1)))
- }
-
- def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
- def pairToList(p: Pair[a,List[a]]): List[a] =
- p._1 :: p._2;
- def listToPair(l: List[a]): Pair[a,List[a]] =
- l match { case x :: xs => Pair(x, xs) }
-
- if (n == 0) lift(Nil)
- else
- wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
- }
-
- def list[a](pa: SPU[a]): SPU[List[a]] =
- sequ((l: List[a])=>l.length, nat, fixedList(pa));
-
- def ulist[a](pa: PU[a]): PU[List[a]] =
- usequ((l:List[a]) => l.length, unat, ufixedList(pa));
-
- def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
- sequ(tag, nat, (x: int)=> ps.apply(x)());
-
- def printByteArray(a: Array[byte]) = {
- val iter = a.elements
- while (iter.hasNext) {
- val el = iter.next
- Console.print("" + el + ", ")
- }
- }
-
- def main(args: Array[String]) = {
- // test nat2Bytes
-
- Console.println(printByteArray(nat2Bytes(1)))
- Console.println(printByteArray(nat2Bytes(10)))
- Console.println(printByteArray(nat2Bytes(16)))
- Console.println(printByteArray(nat2Bytes(256)))
-
- Console.println(100000)
- var res = pickle(nat, 100000)
- Console.println(printByteArray(res))
- var up = unpickle(nat, res)
- Console.println(up)
-
- // -- int list
- val intList = List(1, 7, 13)
- Console.println(intList)
- val res9 = pickle(list(nat), intList)
- Console.println(printByteArray(res9))
- val up9 = unpickle(list(nat), res9)
- Console.println(up9)
-
- // ---------------
- // -- boolean list
- val bList = List(false, true, true)
- Console.println(bList)
- val res2 = pickle(list(bool), bList)
- Console.println(printByteArray(res2))
- val up2 = unpickle(list(bool), res2)
- Console.println(up2)
-
- // -- string
- val s = "Hello"
- Console.println(s)
- val res3 = pickle(string, s)
- Console.println(printByteArray(res3))
- val up3 = unpickle(string, res3)
- Console.println(up3)
-
- val personPU = wrap((p:Pair[String,int]) => Person(p._1, p._2), (p:Person) => Pair(p.name, p.age), pair(string, nat));
- val p = Person("Philipp", 25)
- Console.println(p)
- val res4 = pickle(personPU, p)
- Console.println(printByteArray(res4))
- val up4 = unpickle(personPU, res4)
- Console.println(up4)
-
- val x = Var("x");
- val i = Lam("x", x);
- val k = Lam("x", Lam("y", x));
- val kki = App(k, App(k, i));
-
- /*def varPU: PU[Term] = wrap(Var,
- (t: Term)=> t match {case Var(x)=>x},
- string);
- def lamPU: PU[Term] = wrap(p: Pair[String,Term]=>Lam(p._1, p._2),
- (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
- pair(string, termPU));
- def appPU: PU[Term] = wrap(p: Pair[Term,Term]=>App(p._1, p._2),
- (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
- pair(termPU, termPU));
- def termPU: PU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
- List(()=>varPU, ()=>lamPU, ()=>appPU));
-
- Console.println("\n" + k);
- val res5 = pickle(termPU, k);
- Console.println(res5);
- val up5 = unpickle(termPU, res5);
- Console.println(up5);
-
- Console.println("\n" + kki);
- val res6 = pickle(termPU, kki);
- Console.println(res6);
- Console.println("len: " + res6.length)
- val up6 = unpickle(termPU, res6);
- Console.println(up6);*/
-
- def varSPU: SPU[Term] = wrap(Var,
- (t: Term)=> t match {case Var(x)=>x},
- string);
-
- def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
- (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
- pair(string, termSPU));
-
- def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
- (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
- pair(termSPU, termSPU));
-
- def termSPU: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
- List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
-
- Console.println("\n" + k);
- val res8 = pickle(termSPU, k);
- Console.println(printByteArray(res8));
- Console.println("len: " + res8.length)
- val up8 = unpickle(termSPU, res8);
- Console.println(up8);
-
- Console.println("\n" + kki);
- val res7 = pickle(termSPU, kki);
- Console.println(printByteArray(res7));
- Console.println("len: " + res7.length)
-
- val up7 = unpickle(termSPU, res7);
- Console.println(up7);
- }
-
- case class Person(name: String, age: int);
-
- abstract class Term;
- case class Var(s: String) extends Term;
- case class Lam(s: String, t: Term) extends Term;
- case class App(t1: Term, t2: Term) extends Term;
-}
diff --git a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala
deleted file mode 100644
index c678e5acf8..0000000000
--- a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala
+++ /dev/null
@@ -1,519 +0,0 @@
-package scala.actors.distributed.picklers;
-
-import scala.collection.mutable.HashMap;
-
-import java.io.StringReader;
-import java.io.StringWriter;
-
-/**
- Pickler combinators.
-
- Author: Philipp Haller <philipp.haller@epfl.ch>
- */
-
-object SStreamPickle {
- abstract class PU[t] {
- def appP(a: t, state: OutStream): OutStream;
- def appU(state: InStream): Pair[t,InStream];
- }
-
- //def pickle[t](p: PU[t], a: t): OutStream =
- // p.appP(a, "");
-
- def unpickle[t](p: PU[t], stream: InStream): t =
- p.appU(stream)._1;
-
- def lift[t](x: t): PU[t] = new PU[t] {
- def appP(a: t, state: OutStream): OutStream = state;
- def appU(state: InStream) = Pair(x, state);
- }
-
- def sequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
- def appP(b: u, s: OutStream): OutStream = {
- val a = f(b)
- val sPrime = pa.appP(a, s)
- val pb = k(a)
- val sPrimePrime = pb.appP(b, sPrime)
- sPrimePrime
- }
- def appU(s: InStream): Pair[u,InStream] = {
- val resPa = pa.appU(s)
- val a = resPa._1
- val sPrime = resPa._2
- val pb = k(a)
- pb.appU(sPrime)
- }
- }
-
- def pair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
- def fst(p: Pair[a,b]): a = p._1;
- def snd(p: Pair[a,b]): b = p._2;
- sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
- }
-
- def triple[a,b,c](pa: PU[a], pb: PU[b], pc: PU[c]): PU[Triple[a,b,c]] = {
- def fst(p: Triple[a,b,c]): a = p._1;
- def snd(p: Triple[a,b,c]): b = p._2;
- def trd(p: Triple[a,b,c]): c = p._3;
-
- sequ(fst, pa,
- (x: a) => sequ(snd, pb,
- (y: b) => sequ(trd, pc,
- (z: c) => lift(Triple(x, y, z)))))
- }
-
- def wrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
- sequ(j, pa, (x: a) => lift(i(x)));
-
- def unit: PU[unit] =
- lift(unit);
-
- def pad(s: String, req: int): String = {
- val buf = new StringBuffer
- for (val i <- List.range(1, req-s.length+1))
- buf append "0"
- (buf append s).toString
- }
- def encode(i: int): String = pad(Integer.toHexString(i), 8);
- def decode(s: String): int = Integer.decode("0x" + s).intValue();
-
- def int: PU[int] = new PU[int] {
- def appP(n: int, s: OutStream): OutStream = {
- s.write(encode(n))
- s
- }
- def appU(s: InStream): Pair[int,InStream] = {
- val substr = s.read(8)
- //Console.println("unpickling " + substr)
- Pair(decode(substr), s)
- }
- }
-
- def char: PU[char] = new PU[char] {
- def appP(b: char, s: OutStream): OutStream = {
- s.write(b)
- s
- }
- def appU(s: InStream): Pair[char,InStream] = {
- val carr = new Array[char](1)
- s.read(carr)
- //Console.println("unpickling " + carr(0))
- Pair(carr(0), s)
- }
- }
-
- def bool: PU[boolean] = {
- def toEnum(b: boolean) = if (b) 1 else 0;
- def fromEnum(n: int) = if (n == 0) false else true;
- wrap(fromEnum, toEnum, nat)
- }
-
- def fixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
- def pairToList(p: Pair[a,List[a]]): List[a] =
- p._1 :: p._2;
- def listToPair(l: List[a]): Pair[a,List[a]] =
- l match { case x :: xs => Pair(x, xs) }
-
- if (n == 0) lift(Nil)
- else
- wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
- }
-
- def list[a](pa: PU[a]): PU[List[a]] =
- sequ((l: List[a])=>l.length, nat, fixedList(pa));
-
- def string: PU[String] =
- wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
-
- def alt[a](tag: a => int, ps: List[PU[a]]): PU[a] =
- sequ(tag, int, ps.apply);
-
- def data[a](tag: a => int, ps: List[()=>PU[a]]): PU[a] =
- sequ(tag, nat, (x: int)=> ps.apply(x)());
-
- def option[a](pa: PU[a]): PU[Option[a]] = {
- def tag(x: Option[a]) = x match {
- case None => 0
- case Some(y) => 1
- }
- def fromSome(x: Option[a]) = x match {
- case Some(y) => y
- case None => null
- }
- def toSome(x: a): Option[a] = Some(x);
- val pnone: PU[Option[a]] = lift(None)
- alt(tag, List(pnone, wrap(toSome, fromSome, pa)))
- }
-
- def byteString(b: int) =
- pad(Integer.toHexString(b), 2);
-
- def natString(x: int): String = {
- val buf = new StringBuffer
-
- def writeNatPrefix(x: int): unit = {
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf.append(byteString((x & 0x7f) | 0x80));
- }
-
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf.append(byteString(x & 0x7f));
- buf.toString()
- }
-
- def nat: PU[int] = new PU[int] {
- def appP(n: int, s: OutStream): OutStream = {
- s.write(natString(n))
- s
- }
- def appU(s: InStream): Pair[int,InStream] = {
- def readNat: int = {
- var b = 0;
- var x = 0;
- do {
- b = decode(s.read(2));
- x = (x << 7) + (b & 0x7f);
- } while ((b & 0x80) != 0);
- x
- }
- Pair(readNat, s)
- }
- }
-
- def main(args: Array[String]) = {
- def testBase128(x: int) = {
- Console.println(x)
-
- val sw = new StringWriter
- val os = new OutStream(sw)
- val res = nat.appP(x, os)
- os.flush()
- Console.println(sw.toString())
-
- val up = nat.appU(new InStream(new StringReader(sw.toString())))
- Console.println(up._1)
- }
-
- testBase128(0)
- testBase128(1)
- testBase128(64)
- testBase128(127)
- testBase128(128)
- testBase128(8192)
-
- def pickleTest[a](x: a, pa: PU[a]) = {
- Console.println(x)
-
- val sw = new StringWriter
- val os = new OutStream(sw)
- val res = pa.appP(x, os)
- os.flush()
- Console.println(sw.toString())
-
- val up = pa.appU(new InStream(new StringReader(sw.toString())))
- Console.println(up._1)
- }
-
- pickleTest(List(1, 7, 13), list(nat))
-
- pickleTest(List(false, true, true), list(bool))
-
- pickleTest("Hello", string)
-
-
- val personPU = wrap((p: Pair[String,int]) => Person(p._1, p._2), (p: Person) => Pair(p.name, p.age), pair(string, nat));
- val p = Person("Philipp", 25)
- pickleTest(p, personPU)
-
-
- val x = Var("x");
- val i = Lam("x", x);
- val k = Lam("x", Lam("y", x));
- val kki = App(k, App(k, i));
-
- def varPU: PU[Term] =
- wrap(Var,
- (t: Term)=> t match {case Var(x)=>x},
- string);
- def lamPU: PU[Term] =
- wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
- (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
- pair(string, termPU));
- def appPU: PU[Term] =
- wrap((p: Pair[Term,Term])=>App(p._1, p._2),
- (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
- pair(termPU, termPU));
- def termPU: PU[Term] =
- data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
- List(()=>varPU, ()=>lamPU, ()=>appPU));
-
- pickleTest(k, termPU)
- pickleTest(kki, termPU)
- }
-
- case class Person(name: String, age: int);
-}
-
-abstract class Term;
-case class Var(s: String) extends Term;
-case class Lam(s: String, t: Term) extends Term;
-case class App(t1: Term, t2: Term) extends Term;
-
-
-object ShareStreamPickle {
- abstract class SPU[t] {
- def appP(a: t, state: PicklerState): PicklerState;
- def appU(state: UnPicklerState): Pair[t, UnPicklerState];
- }
-
- //def pickle[t](p: SPU[t], a: t): String =
- // p.appP(a, new PicklerState("", new PicklerEnv)).stream;
-
- //def unpickle[t](p: SPU[t], stream: String): t =
- // p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
-
- class PicklerEnv extends HashMap[Any, int] {
- private var cnt: int = 64;
- def nextLoc() = { cnt = cnt + 1; cnt };
- }
-
- class UnPicklerEnv extends HashMap[int, Any] {
- private var cnt: int = 64;
- def nextLoc() = { cnt = cnt + 1; cnt };
- }
-
- class PicklerState(val stream: OutStream, val dict: PicklerEnv) {}
- class UnPicklerState(val stream: InStream, val dict: UnPicklerEnv) {}
-
- abstract class RefDef;
- case class Ref() extends RefDef;
- case class Def() extends RefDef;
-
- def refDef: SStreamPickle.PU[RefDef] = new SStreamPickle.PU[RefDef] {
- def appP(b: RefDef, s: OutStream): OutStream =
- b match {
- case Ref() => s.write("0"); s
- case Def() => s.write("1"); s
- };
- def appU(s: InStream): Pair[RefDef, InStream] =
- if (s.readChar == '0') Pair(Ref(), s)
- else Pair(Def(), s);
- }
-
- def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
- def appP(v: a, state: PicklerState): PicklerState = {
- /*
- - is there some value equal to v associated with a location l in the pickle environment?
- - yes: write REF-tag to outstream together with l
- - no:
- write DEF-tag to outstream
- record current location l of outstream
- --> serialize value
- add entry to pickle environment, mapping v onto l
- */
- val pe = state.dict
- pe.get(v) match {
- case None =>
- //Console.println("" + v + " is new")
- //Console.println("writing DEF...")
- val sPrime = refDef.appP(Def(), state.stream)
- val l = pe.nextLoc()
-
- //Console.println("applying pickler to state " + sPrime)
- val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
-
- //Console.println("updating dict (" + l + ") for " + v)
- pe.update(v, l)
-
- return sPrimePrime
- case Some(l) =>
- //Console.println("writing REF...")
- val sPrime = refDef.appP(Ref(), state.stream)
- //Console.println("writing location to state " + sPrime)
- return new PicklerState(SStreamPickle.nat.appP(l, sPrime), pe)
- }
- }
- def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
- /*
- - first, read tag (i.e. DEF or REF)
- - if REF:
- read location l
- look up resulting value in unpickler environment
- - if DEF:
- record location l of input stream
- --> deserialize value v with argument deserializer
- add entry to unpickler environment, mapping l onto v
- */
- val upe = state.dict
- val res = refDef.appU(state.stream)
- res._1 match {
- case Def() =>
- val l = upe.nextLoc
- val res2 = pa.appU(new UnPicklerState(res._2, upe))
- upe.update(l, res2._1)
- return res2
- case Ref() =>
- val res2 = SStreamPickle.nat.appU(res._2) // read location
- upe.get(res2._1) match { // lookup value in unpickler env
- case None => error("invalid unpickler environment"); return null
- case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
- }
- }
- }
- }
-
- def lift[t](x: t): SPU[t] = new SPU[t] {
- def appP(a: t, state: PicklerState): PicklerState = state;
- def appU(state: UnPicklerState) = Pair(x, state);
- }
-
- def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
- def appP(b: u, s: PicklerState): PicklerState = {
- val a = f(b)
- //Console.println("pickling " + a + ", s: " + s.stream)
- val sPrime = pa.appP(a, s)
- val pb = k(a)
- //Console.println("pickling " + b + ", s: " + s.stream)
- pb.appP(b, sPrime)
- }
- def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
- val resPa = pa.appU(s)
- val a = resPa._1
- val sPrime = resPa._2
- val pb = k(a)
- pb.appU(sPrime)
- }
- }
-
- def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
- def fst(p: Pair[a,b]): a = p._1;
- def snd(p: Pair[a,b]): b = p._2;
- sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
- }
-
- def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
- sequ(j, pa, (x: a) => lift(i(x)));
-
- def char: SPU[char] = new SPU[char] {
- def appP(b: char, s: PicklerState): PicklerState = {
- s.stream.write(b)
- new PicklerState(s.stream, s.dict)
- }
- def appU(s: UnPicklerState): Pair[char, UnPicklerState] =
- Pair(s.stream.readChar, new UnPicklerState(s.stream, s.dict));
- }
-
- def pad(s: String, req: int): String = {
- val buf = new StringBuffer
- for (val i <- List.range(1, req-s.length+1))
- buf append "0"
- (buf append s).toString
- }
- def encode(i: int): String = pad(Integer.toHexString(i), 8);
- def decode(s: String): int = Integer.decode("0x" + s).intValue();
-
- def byteString(b: int) =
- pad(Integer.toHexString(b), 2);
-
- def natString(x: int): String = {
- val buf = new StringBuffer
-
- def writeNatPrefix(x: int): unit = {
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf.append(byteString((x & 0x7f) | 0x80));
- }
-
- val y = x >>> 7;
- if (y != 0) writeNatPrefix(y);
- buf.append(byteString(x & 0x7f));
- buf.toString()
- }
-
- def nat: SPU[int] = new SPU[int] {
- def appP(n: int, s: PicklerState): PicklerState = {
- s.stream.write(natString(n))
- new PicklerState(s.stream, s.dict)
- }
- def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
- def readNat: int = {
- var b = 0;
- var x = 0;
- do {
- b = decode(s.stream.read(2));
- x = (x << 7) + (b & 0x7f);
- } while ((b & 0x80) != 0);
- x
- }
- Pair(readNat, new UnPicklerState(s.stream, s.dict))
- }
- }
-
- def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
- def pairToList(p: Pair[a,List[a]]): List[a] =
- p._1 :: p._2;
- def listToPair(l: List[a]): Pair[a,List[a]] =
- l match { case x :: xs => Pair(x, xs) }
-
- if (n == 0) lift(Nil)
- else
- wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
- }
-
- def list[a](pa: SPU[a]): SPU[List[a]] =
- sequ((l: List[a])=>l.length, nat, fixedList(pa));
-
- def string: SPU[String] =
- wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
-
- def alt[a](tag: a => int, ps: List[SPU[a]]): SPU[a] =
- sequ(tag, nat, ps.apply);
-
- def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
- sequ(tag, nat, (x: int)=> ps.apply(x)());
-
- def main(args: Array[String]) = {
- def pickleTest[a](x: a, pa: SPU[a]) = {
- Console.println(x)
-
- val sw = new StringWriter
- val os = new OutStream(sw)
- val res = pa.appP(x, new PicklerState(os, new PicklerEnv))
- os.flush()
- Console.println(sw.toString())
-
- val up = pa.appU(new UnPicklerState(new InStream(new StringReader(sw.toString())), new UnPicklerEnv))
- Console.println(up._1)
- }
-
- val x = Var("x");
- val i = Lam("x", x);
- val k = Lam("x", Lam("y", x));
- val kki = App(k, App(k, i));
-
- def varSPU: SPU[Term] = wrap(Var,
- (t: Term)=> t match {case Var(x)=>x},
- string);
-
- def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
- (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
- pair(string, termSPU));
-
- def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
- (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
- pair(termSPU, termSPU));
-
- def termSPU: SPU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
- List(()=>varSPU, ()=>lamSPU, ()=>appSPU));
-
- def termSPUshared: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
- List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
-
- pickleTest(k, termSPU)
- pickleTest(k, termSPUshared)
- pickleTest(kki, termSPU)
- pickleTest(kki, termSPUshared)
- }
-}
diff --git a/src/actors/scala/actors/distributed/picklers/Stream.scala b/src/actors/scala/actors/distributed/picklers/Stream.scala
deleted file mode 100644
index 65e59b975d..0000000000
--- a/src/actors/scala/actors/distributed/picklers/Stream.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-package scala.actors.distributed.picklers;
-
-import java.io.Reader;
-import java.io.Writer;
-
-import scala.collection.mutable.HashMap;
-
-abstract class Stream {
- protected var loc: int = 0;
-
- def getLocation = loc;
-}
-
-class OutStream(writer: Writer) extends Stream {
- val picklerEnv = new PicklerEnv;
-
- def write(s: String): unit = {
- loc = loc + s.length()
- writer.write(s)
- }
-
- def write(c: char): unit = {
- loc = loc + 1
- writer.write(c)
- }
-
- def flush(): unit =
- writer.flush();
-}
-
-class InStream(reader: Reader) extends Stream {
- val unpicklerEnv = new UnpicklerEnv;
-
- def read(num: int): String = {
- val carr = new Array[char](num)
- val cnt = reader.read(carr)
- loc = loc + num
- //Console.println("new loc: " + loc)
- new String(carr)
-
- /*val buf = new StringBuffer
- var ch = r.read()
- loc = loc + 1
- if (num > 1) {
- var cnt = 1
- while (cnt < num && ch != -1) {
- buf.append(ch)
- ch = r.read()
- loc = loc + 1
- cnt = cnt + 1
- }
- if (cnt == num)
- buf.toString()
- else
- buf.toString() // error
- }
- else
- new String(ch.asInstanceOf[char])*/
- }
-
- def read(cbuf: Array[char]): int = {
- loc = loc + cbuf.length
- reader.read(cbuf)
- }
-
- def readChar: char = {
- val carr = new Array[char](1)
- read(carr)
- carr(0)
- }
-}
-
-class PicklerEnv[a] extends HashMap[a, int] {
- private var cnt: int = 0;
- def nextLoc() = { cnt = cnt + 1; cnt };
-}
-
-class UnpicklerEnv[a] extends HashMap[int, a] {
- private var cnt: int = 0;
- def nextLoc() = { cnt = cnt + 1; cnt };
-}
diff --git a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala
deleted file mode 100644
index 36ad115cdd..0000000000
--- a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/* ____ ____ ____ ____ ______ *\
-** / __// __ \/ __// __ \/ ____/ SOcos COmpiles Scala **
-** __\_ \/ /_/ / /__/ /_/ /\_ \ (c) 2002-2006, LAMP/EPFL **
-** /_____/\____/\___/\____/____/ **
-\* */
-
-// $Id: UTF8Codec.scala 7116 2006-04-11 15:36:05Z mihaylov $
-
-
-package scala.actors.distributed.picklers
-
-object UTF8Codec {
-
- def encode(src: Array[Char], from: Int, dst: Array[Byte], to: Int, len: Int): Int = {
- var i = from;
- var j = to;
- val end = from + len;
- while (i < end) {
- val ch = src(i);
- i = i + 1;
- if (ch < 128) {
- dst(j) = ch.toByte;
- j = j + 1;
- }
- else if (ch <= 0x3FF) {
- dst(j) = (0xC0 | (ch >> 6)).toByte;
- dst(j+1) = (0x80 | (ch & 0x3F)).toByte;
- j = j + 2;
- } else {
- dst(j) = (0xE0 | (ch >> 12)).toByte;
- dst(j+1) = (0x80 | ((ch >> 6) & 0x3F)).toByte;
- dst(j+2) = (0x80 | (ch & 0x3F)).toByte;
- j = j + 3;
- }
- }
- j
- }
-
- def encode(s: String, dst: Array[Byte], to: Int): Int =
- encode(s.toCharArray(), 0, dst, to, s.length());
-
-
- def encode(s: String): Array[Byte] = {
- val dst = new Array[Byte](s.length() * 3);
- val len = encode(s, dst, 0);
- dst.subArray(0, len)
- }
-
- def decode(src: Array[Byte], from: Int,
- dst: Array[Char], to: Int, len: Int): Int =
- {
- var i = from;
- var j = to;
- val end = from + len;
- while (i < end) {
- var b = src(i) & 0xFF;
- i = i + 1;
- if (b >= 0xE0) {
- b = ((b & 0x0F) << 12) | (src(i) & 0x3F) << 6;
- b = b | (src(i+1) & 0x3F);
- i = i + 2;
- } else if (b >= 0xC0) {
- b = ((b & 0x1F) << 6) | (src(i) & 0x3F);
- i = i + 1;
- }
- dst(j) = b.toChar;
- j = j + 1;
- }
- j
- }
-
- def decode(src: Array[Byte], from: Int, len: Int): String = {
- val cs = new Array[Char](len);
- String.copyValueOf(cs, 0, decode(src, 0, cs, 0, len));
- }
-
-}
diff --git a/src/actors/scala/actors/gui/Publisher.scala b/src/actors/scala/actors/gui/Publisher.scala
index 0b71bc30f6..fc81ec9db9 100644
--- a/src/actors/scala/actors/gui/Publisher.scala
+++ b/src/actors/scala/actors/gui/Publisher.scala
@@ -8,7 +8,7 @@ import scala.actors.single.Pid
import scala.actors.gui.event.Event
class EventHandlers {
- type Handler = PartialFunction[AnyRef,unit]
+ type Handler = PartialFunction[Any,unit]
private val handlers = new ListBuffer[Handler]
@@ -16,9 +16,9 @@ class EventHandlers {
def -= (h: Handler) = { handlers -= h }
def compoundHandler = new Handler {
- def isDefinedAt(e: AnyRef): boolean = handlers.exists(.isDefinedAt(e))
+ def isDefinedAt(e: Any): boolean = handlers.exists(.isDefinedAt(e))
- def apply(e: AnyRef): unit =
+ def apply(e: Any): unit =
handlers.find(.isDefinedAt(e)) match {
case Some(h) => h.apply(e)
case None => // do nothing
@@ -29,10 +29,10 @@ class EventHandlers {
trait Responder extends Actor {
protected val handlers = new EventHandlers
- final def eventloop(f: PartialFunction[Message,unit]): scala.All =
+ final def eventloop(f: PartialFunction[Any,unit]): scala.All =
receive(new RecursiveProxyHandler(this, f))
- def eventblock(f: PartialFunction[Message,unit]): unit = {
+ def eventblock(f: PartialFunction[Any,unit]): unit = {
try {
receive(new RecursiveProxyHandler(this, f))
}
@@ -42,11 +42,11 @@ trait Responder extends Actor {
}
}
- private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
- def isDefinedAt(m: Message): boolean =
+ private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] {
+ def isDefinedAt(m: Any): boolean =
true // events should be removed from the mailbox immediately!
- def apply(m: Message): unit = {
+ def apply(m: Any): unit = {
if (f.isDefinedAt(m)) f(m) // overrides any installed handler
else
if (handlers.compoundHandler.isDefinedAt(m))
@@ -63,7 +63,7 @@ case class Subscribe(s: Subscriber)
case class Publish(e: Event)
trait Subscriber extends Responder {
- type Handler = PartialFunction[AnyRef,unit]
+ type Handler = PartialFunction[Any,unit]
def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this)
}
@@ -98,11 +98,11 @@ trait Publisher extends Responder {
}
// TODO: super.receive might already be overridden!
- //final override def receive(f: PartialFunction[Message,unit]): scala.All =
+ //final override def receive(f: PartialFunction[Any,unit]): scala.All =
//super.receive(new ProxyPubSubHandler(f))
- private class ProxyPubSubHandler(f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
- def isDefinedAt(m: Message): boolean =
+ private class ProxyPubSubHandler(f: PartialFunction[Any,unit]) extends PartialFunction[Any,unit] {
+ def isDefinedAt(m: Any): boolean =
if (f.isDefinedAt(m)) true
else m match {
case Subscribe(s) => true
@@ -110,7 +110,7 @@ trait Publisher extends Responder {
case other => false
}
- def apply(m: Message): unit = {
+ def apply(m: Any): unit = {
m match {
case Subscribe(s) =>
//Console.println("Rec subscription: " + s)
diff --git a/src/actors/scala/actors/multi/Actor.scala b/src/actors/scala/actors/multi/Actor.scala
index 520267d2a2..2b775e927e 100644
--- a/src/actors/scala/actors/multi/Actor.scala
+++ b/src/actors/scala/actors/multi/Actor.scala
@@ -10,299 +10,16 @@
package scala.actors.multi
-import scala.collection.mutable.{HashMap,HashSet,Stack}
-
-case class ExcHandlerDesc(pid: Pid, eid: Int)
-
-class ExcHandler(actions: PartialFunction[Throwable, Unit],
- actor: Actor,
- parent: ExcHandlerDesc) {
- def handle(e: Throwable): Unit = {
- if (!actions.isDefinedAt(e)) {
- if (parent != null) actor.forwardExc(parent, e)
- }
- else
- actions(e)
- }
-}
-
/**
* @author Philipp Haller
*/
-abstract class Actor extends MailBox {
+trait Actor extends scala.actors.Actor with MailBox {
def run(): Unit = {}
- def start(): Unit = {
- var finished = true
- try { run }
- catch {
- case d: Done =>
- finished = false
- case t: Throwable =>
- if (!excHandlerDescs.isEmpty)
- forwardExc(excHandlerDescs.top, t)
- else
- exit(new Symbol(t.toString()))
- }
- if (finished) die()
- }
-
- case class Exit(from: Pid, reason: Symbol) extends Message
-
- private var pid: Pid = null
- def self: Pid = {
- if (pid == null) pid = new LocalPid(this)
- pid
- }
- def self_= (p: Pid) = pid = p
-
-
- private val links = new HashSet[Pid]
-
- def link(to: Pid): unit = {
- // TODO: check if exists (eff: dont need to.linkTo...)
- links += to
- to.linkTo(self)
- }
- def linkTo(to: Pid): unit = links += to
-
- def unlink(from: Pid): unit = {
- // TODO: check if exists (eff)
- links -= from
- from.unlinkFrom(self)
- }
- def unlinkFrom(from: Pid): unit = links -= from
-
-
- private var trapExit = false
-
- def processFlag(flag: Symbol, set: boolean) = {
- if (flag.name.equals("trapExit")) trapExit = set
- }
-
- def exit(reason: Symbol): unit = {
- exitLinked(reason, new HashSet[Pid])
- if (isAlive) {
- isAlive = false
- //Debug.info("" + this + " died.")
- }
- }
-
- def exit(from: Pid, reason: Symbol): unit = {
- if (from == self) {
- exit(reason)
- }
- else {
- if (trapExit)
- this send Exit(from, reason)
- else if (!reason.name.equals("normal"))
- exit(reason)
- }
+ def start(): Unit = try { run() }
+ catch {
+ case d: Done => // do nothing
}
- def exitLinked(reason: Symbol, exitMarks: HashSet[Pid]): unit = {
- if (exitMarks contains self) {
- // we are marked, do nothing
- }
- else {
- exitMarks += self // mark self as exiting
- //Console.println("" + self + " is exiting (" + reason + ").")
-
- // exit linked scala.actors
- val iter = links.elements
- while (iter.hasNext) {
- val linkedPid = iter.next
- unlink(linkedPid)
- linkedPid.exit(self, reason)
- }
- exitMarks -= self
- }
- }
-
- def spawnLink(body: Actor => unit): Pid = {
- val a = new Actor {
- override def run = body(this)
- }
- if (!excHandlerDescs.isEmpty)
- a.pushExcHandlerDesc(excHandlerDescs.top)
- // link new process to self
- link(a.self)
- a.start
- a.self
- }
-
- val excHandlerDescs = new Stack[ExcHandlerDesc]
-
- // exception handler table
- val excHandlers = new HashMap[int, ExcHandler]
-
- def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
- // locality check (handler local to this actor?)
- if (destDesc.pid == self) {
- handleExc(destDesc, e)
- }
- else {
- // forward to pid of destination descriptor
- destDesc.pid.handleExc(destDesc, e)
- }
- }
-
- def pushExcHandlerDesc(desc: ExcHandlerDesc) = {
- excHandlerDescs += desc
- }
-
- /** is only called for local handlers
- (i.e. destDesc.pid == self) */
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
- (excHandlers get destDesc.eid) match {
- case Some(handler) =>
- handler.handle(e)
- case None =>
- error("exc desc refers to non-registered handler")
- };
-
- var excCnt = 0
- def freshExcId = { excCnt = excCnt + 1; excCnt }
-
- def tryAsync(block: => unit,
- handlerFun: PartialFunction[Throwable, unit]) = {
- val excHandler =
- new ExcHandler(handlerFun,
- this,
- if (excHandlerDescs.isEmpty) null
- else excHandlerDescs.top)
-
- val desc = ExcHandlerDesc(self, freshExcId)
- // associate desc with handler
- excHandlers += desc.eid -> excHandler
- // push desc onto stack
- excHandlerDescs += desc
- //Console.println("desc stack height: " + excHandlerDescs.length)
- // execute code block
- block
- }
-
- def die(reason: Symbol) = {
- if (isAlive) {
- isAlive = false
- //Debug.info("" + this + " died.")
- exit(reason)
- }
- }
-
- override def die() = {
- if (isAlive) {
- isAlive = false
- //Debug.info("" + this + " died.")
- exit('normal)
- }
- }
-
- def process(f: PartialFunction[Message,unit], msg: Message): unit = {
- try {
- f(msg)
- }
- catch {
- case d: Done =>
- throw new Done
- case t: Throwable =>
- throw t
- if (!excHandlerDescs.isEmpty)
- forwardExc(excHandlerDescs.top, t)
- else
- die(Symbol(t.toString()))
- }
- }
-
- override def receive(f: PartialFunction[Message,unit]): scala.All = {
- if (isAlive) {
- Scheduler.tick(this)
- continuation = null
- sent.dequeueFirst(f.isDefinedAt) match {
- case Some(msg) =>
- process(f, msg)
- die()
- case None =>
- continuation = f
- //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
- }
- }
- throw new Done
- }
-
- override def receiveMsg(msg: MailBox#Message) = {
- //Debug.info("" + Thread.currentThread() + ": Resuming " + this)
- if (continuation != null) {
- val f = continuation
- continuation = null
- scheduled = false
- process(f, msg)
- die()
- }
- else {
- // use more complex receive-and-return continuation
- val cases = contCases
- val then = contThen
- contCases = null
- contThen = null
- scheduled = false
- val result = cases(msg)
- then(result)
- die()
- }
- }
-
- def spawn(a: Actor): Pid = {
- // let "a" inherit active exception handler
- if (!excHandlerDescs.isEmpty)
- a.pushExcHandlerDesc(excHandlerDescs.top)
- a.start
- a.self
- }
-
- def spawn(body: Actor => unit): Pid = {
- val a = new Actor {
- override def run = body(this)
- }
- if (!excHandlerDescs.isEmpty)
- a.pushExcHandlerDesc(excHandlerDescs.top)
- a.start
- a.self
- }
-
- def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = {
- val a = new Actor {
- override def run = receive(cases)
- }
- if (!excHandlerDescs.isEmpty)
- a.pushExcHandlerDesc(excHandlerDescs.top)
- a.start
- a.self
- }
-
- def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
- receive {
- case Pair(pid1, msg1) => receive {
- case Pair(pid2, msg2) => cont match {
- case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
- }
- }
- case Pair(pid2, msg2) => receive {
- case Pair(pid1, msg1) => cont match {
- case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
- }
- }
- }
- }
-
- def makeRef = Actor.makeRef
-}
-
-object Actor {
- private var counter = 0
- type Tag = int
- def makeRef: Tag = {
- counter = counter + 1
- counter
- }
+ def !(msg: Any): Unit = send(msg)
}
diff --git a/src/actors/scala/actors/multi/LocalPid.scala b/src/actors/scala/actors/multi/LocalPid.scala
deleted file mode 100644
index 6762f0915d..0000000000
--- a/src/actors/scala/actors/multi/LocalPid.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors.multi
-
-import scala.collection.mutable.Queue
-
-/**
- * @author Philipp Haller
- */
-class LocalPid(actor: Actor) extends Pid {
- var target = actor
-
- def !(msg: MailBox#Message): Unit = target send msg
-
- def link(other: Pid): Unit = target link other
- def linkTo(other: Pid): Unit = target linkTo other
- def unlink(other: Pid): Unit = target unlink other
- def unlinkFrom(other: Pid): Unit = target unlinkFrom other
- def exit(reason: Symbol): Unit = target exit reason
- def exit(from: Pid, reason: Symbol): Unit = target.exit(from, reason)
-
- def spawn(body: Actor => Unit): Pid = {
- val a = new Actor {
- override def run: Unit = body(this)
- }
- a.start
- a.self
- }
-
- def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
- val a = new Actor {
- override def run: Unit = receive(cases)
- }
- a.start
- a.self
- }
-
- override def toString() = "LocalPid(" + target + ")"
-
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit =
- actor.handleExc(destDesc, e);
-
- def become(clos: Actor => Unit) = {
- // old actor should become anonymous (cannot receive any messages any more)
- // achieved by removing it from target of pid.
-
- val oldActor = target
- //Debug.info("old actor: " + oldActor);
-
- // change our target to point to a newly created actor with the same mailbox.
-
- val newActor = new Actor {
- override def run: Unit = clos(this)
- }
- newActor.sent = oldActor.sent
- target = newActor
- newActor.self = this
-
- //Debug.info("new actor: " + newActor);
-
- // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
- oldActor.sent = new Queue[MailBox#Message]
-
- //Debug.info("Starting new actor.");
- newActor.start // important to start after changing pid because actor may send messages to itself.
- }
-
- private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
- def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m)
- def apply(m: MailBox#Message): unit = {
- f(m)
- a receive this
- }
- }
-
- def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]) = {
-
- become(a => a receive new ProxyPartialFunction(a, f))
-
- /*become(
- a:Actor => {
- def loop: Unit = {
- def proxyFun(m: MailBox#Message): unit = {
- if (f.isDefinedAt(m)) {
- f(m);
- loop
- }
- };
- //a receive proxyFun
- }
- loop
- }
- )*/
- }
-
-}
diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala
index 7809deedc3..5ec21e613b 100644
--- a/src/actors/scala/actors/multi/MailBox.scala
+++ b/src/actors/scala/actors/multi/MailBox.scala
@@ -15,23 +15,20 @@ import scala.collection.mutable.Queue
/**
* @author Philipp Haller
*/
-class MailBox {
- type Message = AnyRef
- case class TIMEOUT() extends Message
-
+trait MailBox {
/** Unconsumed messages. */
- var sent = new Queue[Message]
+ var sent = new Queue[Any]
- var continuation: PartialFunction[Message,Unit] = null
+ var continuation: PartialFunction[Any,Unit] = null
// more complex continuation
- var contCases: PartialFunction[Message,Message] = null
- var contThen: Message => unit = null
+ var contCases: PartialFunction[Any,Any] = null
+ var contThen: Any => unit = null
def hasCont =
if ((continuation == null) && (contCases == null)) false
else true
- def contDefinedAt(msg: Message) =
+ def contDefinedAt(msg: Any) =
if (((continuation != null) && continuation.isDefinedAt(msg)) ||
((contCases != null) && contCases.isDefinedAt(msg)))
true
@@ -43,7 +40,7 @@ class MailBox {
private var pendingSignal = false
- def send(msg: Message): unit = synchronized {
+ def send(msg: Any): unit = synchronized {
if (isAlive) {
if (!hasCont || scheduled) {
//Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
@@ -81,7 +78,7 @@ class MailBox {
}
}
- def receiveMsg(msg: MailBox#Message) = {
+ def receiveMsg(msg: Any) = {
//Debug.info("" + Thread.currentThread() + ": Resuming " + this)
if (continuation != null) {
val f = continuation
@@ -103,7 +100,7 @@ class MailBox {
}
}
- def receive(f: PartialFunction[Message,unit]): scala.All = {
+ def receive(f: PartialFunction[Any,unit]): Nothing = {
if (isAlive) {
Scheduler.tick(this)
continuation = null
@@ -119,7 +116,7 @@ class MailBox {
throw new Done
}
- def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
+ def receiveWithin(msec: long)(f: PartialFunction[Any, unit]): Nothing = {
Scheduler.tick(this)
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
@@ -145,11 +142,7 @@ class MailBox {
throw new Done
}
- // original wish:
- // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
- // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
-
- def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): unit = {
+ def receiveAndReturn(cases: PartialFunction[Any,Any], then: Any => unit): unit = {
contCases = null
contThen = null
sent.dequeueFirst(cases.isDefinedAt) match {
@@ -169,11 +162,11 @@ class MailBox {
// receiv {...} then (msg => {...msg...})
- class ReceiveAndReturn(cases: PartialFunction[Message,Message]) {
- def then(body: Message => unit): unit = receiveAndReturn(cases, body)
+ class ReceiveAndReturn(cases: PartialFunction[Any,Any]) {
+ def then(body: Any => unit): unit = receiveAndReturn(cases, body)
}
- def receiv(cases: PartialFunction[Message,Message]): ReceiveAndReturn =
+ def receiv(cases: PartialFunction[Any,Any]): ReceiveAndReturn =
new ReceiveAndReturn(cases)
def die() = {
diff --git a/src/actors/scala/actors/multi/Pid.scala b/src/actors/scala/actors/multi/Pid.scala
deleted file mode 100644
index e8cf5ede23..0000000000
--- a/src/actors/scala/actors/multi/Pid.scala
+++ /dev/null
@@ -1,30 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors.multi
-
-/**
- * @author Philipp Haller
- */
-[serializable]abstract class Pid {
- def !(msg: MailBox#Message): Unit
-
- def link(other: Pid): Unit
- def linkTo(other: Pid): Unit // uni-directional
- def unlink(other: Pid): Unit
- def unlinkFrom(other: Pid): Unit // uni-directional
- def exit(reason: Symbol): Unit
- def exit(from: Pid, reason: Symbol): Unit
-
- def handleExc(destDesc: ExcHandlerDesc, e: Throwable): Unit
-
- //def become(clos: Actor => Unit): Unit
- //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): Unit
-}
diff --git a/src/actors/scala/actors/multi/Process.scala b/src/actors/scala/actors/multi/Process.scala
new file mode 100644
index 0000000000..3764a6454b
--- /dev/null
+++ b/src/actors/scala/actors/multi/Process.scala
@@ -0,0 +1,281 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id: LocalPid.scala 7969 2006-06-23 11:44:31Z michelou $
+
+package scala.actors.multi
+
+import scala.collection.mutable.{HashMap,HashSet,Stack}
+
+/**
+ * @author Philipp Haller
+ */
+object Process {
+
+ def spawn(body: => Unit): Process = {
+ val p = new Process {
+ override def run(): Unit = body
+ }
+ val task = new ReceiverTask(p, null) {
+ override def run(): Unit = try { p.start() }
+ catch {
+ case d: Done =>
+ // do nothing (continuation is already saved)
+ }
+ }
+ Scheduler.execute(task)
+ p
+ }
+
+ def spawnLink(body: => Unit): Process = {
+ val p = new Process {
+ override def run(): Unit = body
+ }
+ val task = new ReceiverTask(p, null) {
+ override def run(): Unit = try { p.start() }
+ catch {
+ case d: Done =>
+ // do nothing (continuation is already saved)
+ }
+ }
+ self.link(p)
+ Scheduler.execute(task)
+ p
+ }
+
+ def send(p: Process, msg: Any) =
+ p ! msg
+
+ def receive(f: PartialFunction[Any,Unit]): Nothing =
+ self.receive(f)
+
+ def receiveWithin(msec: long)(f: PartialFunction[Any,Unit]): Nothing =
+ self.receiveWithin(msec)(f)
+
+ def self: Process = {
+ val p = Scheduler.getProcess(Thread.currentThread())
+ if (p == null)
+ error("Self called outside a process")
+ else
+ p
+ }
+
+ def exit(p: Process, reason: Symbol) =
+ p.exit(reason)
+}
+
+/**
+ * @author Philipp Haller
+ */
+class Process extends scala.actors.Process with Actor {
+ private val links = new HashSet[scala.actors.Process]
+
+ override def start(): Unit = try { run() }
+ catch {
+ case d: Done => // do nothing
+ case t: Throwable => {
+ if (!excHandlerDescs.isEmpty)
+ forwardExc(excHandlerDescs.top, t)
+ else
+ exit(new Symbol(t.toString()))
+ }
+ }
+
+ case class Exit(from: scala.actors.Process, reason: Symbol)
+
+ def link(to: scala.actors.Process): Unit = {
+ links += to
+ to.linkTo(this)
+ }
+
+ def linkTo(to: scala.actors.Process): Unit =
+ links += to
+
+ def unlink(from: scala.actors.Process): Unit = {
+ links -= from
+ from.unlinkFrom(this)
+ }
+
+ def unlinkFrom(from: scala.actors.Process): Unit =
+ links -= from
+
+ private var trapExit = false
+
+ def processFlag(flag: Symbol, set: boolean) = {
+ if (flag.name.equals("trapExit")) trapExit = set
+ }
+
+ def exit(reason: Symbol): Unit = {
+ exitLinked(reason, new HashSet[Process])
+ if (isAlive) isAlive = false
+ }
+
+ def exit(from: scala.actors.Process, reason: Symbol): Unit = {
+ if (from == this) {
+ exit(reason)
+ }
+ else {
+ if (trapExit)
+ this ! Exit(from, reason)
+ else if (!reason.name.equals("normal"))
+ exit(reason)
+ }
+ }
+
+ def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = {
+ if (exitMarks contains this) {
+ // we are marked, do nothing
+ }
+ else {
+ exitMarks += this // mark this as exiting
+
+ // exit linked processes
+ val iter = links.elements
+ while (iter.hasNext) {
+ val linked = iter.next
+ unlink(linked)
+ linked.exit(this, reason)
+ }
+ exitMarks -= this
+ }
+ }
+
+ val excHandlerDescs = new Stack[ExcHandlerDesc]
+
+ // exception handler table
+ val excHandlers = new HashMap[Int, ExcHandler]
+
+ def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ // locality check (handler local to this actor?)
+ if (destDesc.p == this) {
+ handleExc(destDesc, e)
+ }
+ else {
+ // forward to process of destination descriptor
+ destDesc.p.handleExc(destDesc, e)
+ }
+ }
+
+ def pushExcHandlerDesc(desc: ExcHandlerDesc) =
+ excHandlerDescs += desc
+
+ /** is only called for local handlers
+ (i.e. destDesc.p == this) */
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
+ (excHandlers get destDesc.eid) match {
+ case Some(handler) =>
+ handler.handle(e)
+ case None =>
+ error("exc desc refers to non-registered handler")
+ }
+
+ private var excCnt = 0
+ private def freshExcId = { excCnt = excCnt + 1; excCnt }
+
+ def tryAsync(block: => Unit,
+ handlerFun: PartialFunction[Throwable, Unit]) = {
+ val excHandler =
+ new ExcHandler(handlerFun,
+ this,
+ if (excHandlerDescs.isEmpty) null
+ else excHandlerDescs.top)
+
+ val desc = ExcHandlerDesc(this, freshExcId)
+ // associate desc with handler
+ excHandlers += desc.eid -> excHandler
+ // push desc onto stack
+ excHandlerDescs += desc
+ //Console.println("desc stack height: " + excHandlerDescs.length)
+ // execute code block
+ block
+ }
+
+ def process(f: PartialFunction[Any,unit], msg: Any): unit = {
+ try {
+ f(msg)
+ }
+ catch {
+ case d: Done =>
+ throw new Done
+ case t: Throwable =>
+ throw t
+ if (!excHandlerDescs.isEmpty)
+ forwardExc(excHandlerDescs.top, t)
+ else
+ die(Symbol(t.toString()))
+ }
+ }
+
+ override def receive(f: PartialFunction[Any,unit]): scala.All = {
+ if (isAlive) {
+ Scheduler.tick(this)
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ process(f, msg)
+ die()
+ case None =>
+ continuation = f
+ //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ override def receiveMsg(msg: Any) = {
+ //Debug.info("" + Thread.currentThread() + ": Resuming " + this)
+ if (continuation != null) {
+ val f = continuation
+ continuation = null
+ scheduled = false
+ process(f, msg)
+ die()
+ }
+ else {
+ // use more complex receive-and-return continuation
+ val cases = contCases
+ val then = contThen
+ contCases = null
+ contThen = null
+ scheduled = false
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ }
+
+ def die(reason: Symbol) = {
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ exit(reason)
+ }
+ }
+
+ override def die() = {
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ exit('normal)
+ }
+ }
+}
+
+case class ExcHandlerDesc(p: Process, eid: Int)
+
+class ExcHandler(actions: PartialFunction[Throwable, Unit],
+ p: Process,
+ parent: ExcHandlerDesc) {
+ def handle(e: Throwable): Unit = {
+ if (!actions.isDefinedAt(e)) {
+ if (parent != null) p.forwardExc(parent, e)
+ }
+ else
+ actions(e)
+ }
+}
diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala
index 8b6893b43e..033df00ef0 100644
--- a/src/actors/scala/actors/multi/ReceiverTask.scala
+++ b/src/actors/scala/actors/multi/ReceiverTask.scala
@@ -13,7 +13,7 @@ package scala.actors.multi
/**
* @author Philipp Haller
*/
-class ReceiverTask(val actor: MailBox, msg: MailBox#Message) extends Runnable {
+class ReceiverTask(val actor: MailBox, msg: Any) extends Runnable {
def run(): Unit = {
try {
actor receiveMsg msg
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/multi/Scheduler.scala
index 2133995720..9deb9c8148 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/multi/Scheduler.scala
@@ -8,12 +8,10 @@
// $Id$
-package scala.actors
+package scala.actors.multi
import scala.collection.mutable._
-import scala.actors.multi._
-
/**
* @author Philipp Haller
*/
@@ -21,6 +19,7 @@ abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
def execute(item: ReceiverTask): Unit
def getTask(worker: WorkerThread): Runnable
def tick(a: MailBox): Unit
+ def getProcess(t: Thread): Process
val QUIT_TASK = new Runnable() {
def run(): Unit = {}
@@ -47,6 +46,9 @@ object Scheduler /*extends java.util.concurrent.Executor*/ {
def tick(a: MailBox) =
sched.tick(a)
+
+ def getProcess(t: Thread): Process =
+ sched.getProcess(t)
}
@@ -57,6 +59,7 @@ class SpareWorkerScheduler2 extends IScheduler {
val idle = new Queue[WorkerThread];
val ticks = new HashMap[WorkerThread, long]
val executing = new HashMap[MailBox, WorkerThread]
+ val rexec = new HashMap[Thread, MailBox]
var TICKFREQ = 50
@@ -81,10 +84,18 @@ class SpareWorkerScheduler2 extends IScheduler {
}
}
+ def getProcess(t: Thread): Process = synchronized {
+ rexec.get(t) match {
+ case None => null
+ case Some(p: Process) => p
+ }
+ }
+
def execute(item: ReceiverTask): unit = synchronized {
if (idle.length > 0) {
val wt = idle.dequeue
executing.update(item.actor, wt)
+ rexec.update(wt, item.actor)
wt.execute(item)
}
else {
@@ -116,6 +127,7 @@ class SpareWorkerScheduler2 extends IScheduler {
maxWorkers = workers.length // statistics
executing.update(item.actor, newWorker)
+ rexec.update(newWorker, item.actor)
newWorker.execute(item)
newWorker.start()
@@ -132,6 +144,7 @@ class SpareWorkerScheduler2 extends IScheduler {
if (tasks.length > 0) {
val item = tasks.dequeue
executing.update(item.actor, worker)
+ rexec.update(worker, item.actor)
item
}
else {
@@ -144,9 +157,9 @@ class SpareWorkerScheduler2 extends IScheduler {
/**
* @author Philipp Haller
*/
-class SpareWorkerScheduler extends IScheduler {
+abstract class SpareWorkerScheduler extends IScheduler {
private var canQuit = false;
- private val tasks = new Queue[Runnable];
+ private val tasks = new Queue[ReceiverTask];
private val idle = new Queue[WorkerThread];
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
@@ -201,9 +214,9 @@ class SpareWorkerScheduler extends IScheduler {
/**
* @author Philipp Haller
*/
-class FixedWorkersScheduler(workercnt: int) extends IScheduler {
+abstract class FixedWorkersScheduler(workercnt: int) extends IScheduler {
private var canQuit = false;
- private val tasks = new Queue[Runnable];
+ private val tasks = new Queue[ReceiverTask];
private val idle = new Queue[WorkerThread];
//Console.println("Running with " + workercnt + " workers")
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/multi/TimerThread.scala
index 075d14fe57..2b91ba13ca 100644
--- a/src/actors/scala/actors/TimerThread.scala
+++ b/src/actors/scala/actors/multi/TimerThread.scala
@@ -8,13 +8,10 @@
// $Id$
-package scala.actors
+package scala.actors.multi
import scala.collection.mutable.PriorityQueue
-import scala.actors.multi.Actor
-import scala.actors.multi.MailBox
-
/**
* This class allows the (locl) sending of a message to an actor after
* a timeout. Used by the library to build receiveWithin(time:long).
@@ -74,7 +71,7 @@ object TimerThread extends AnyRef with Runnable {
lateList = Nil
}
- def requestSignal(a: Actor, waitMillis: long, reason: String): unit = this.synchronized {
+ def requestSignal(a: multi.Actor, waitMillis: long, reason: String): unit = this.synchronized {
Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason)
val wakeTime = now + waitMillis
if (waitMillis < 0) {
@@ -146,7 +143,7 @@ object TimerThreadTest {
new Tester (500, "TWO").start
}
- class Tester (duration : int, name:String) extends Actor {
+ class Tester (duration : int, name:String) extends multi.Actor {
var i = 0
def loop:unit = {
diff --git a/src/actors/scala/actors/WorkerThread.scala b/src/actors/scala/actors/multi/WorkerThread.scala
index e46cdc53c9..8c5a4ecb1e 100644
--- a/src/actors/scala/actors/WorkerThread.scala
+++ b/src/actors/scala/actors/multi/WorkerThread.scala
@@ -8,7 +8,7 @@
// $Id$
-package scala.actors
+package scala.actors.multi
/**
* @author Philipp Haller
diff --git a/src/actors/scala/actors/single/Actor.scala b/src/actors/scala/actors/single/Actor.scala
index d323c51d57..fa799db985 100644
--- a/src/actors/scala/actors/single/Actor.scala
+++ b/src/actors/scala/actors/single/Actor.scala
@@ -13,70 +13,13 @@ package scala.actors.single
/**
* @author Philipp Haller
*/
-abstract class Actor extends MailBox {
- def run: Unit = {}
+trait Actor extends scala.actors.Actor with MailBox {
+ def run(): Unit = {}
- def start: Unit = {
- try { run }
- catch {
- case d:Done =>
- // do nothing
- }
+ def start(): Unit = try { run() }
+ catch {
+ case d: Done => // do nothing
}
- private var pid: Pid = null
-
- def self: Pid = {
- if (pid == null) pid = new LocalPid(this)
- pid
- }
-
- def self_= (p: Pid) = pid = p
-
- def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
- receive {
- case Pair(pid1, msg1) => receive {
- case Pair(pid2, msg2) => cont match {
- case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
- }
- }
- case Pair(pid2, msg2) => receive {
- case Pair(pid1, msg1) => cont match {
- case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
- }
- }
- }
- }
-
- def spawn(body: Actor => Unit): Pid = {
- val a = new Actor {
- override def run = body(this)
- }
- a.start
- a.self
- }
-
- def spawn(a: Actor): Pid = {
- a.start
- a.self
- }
-
- def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
- val a = new Actor {
- override def run = receive(cases)
- }
- a.start
- a.self
- }
-
- def makeRef = Actor.makeRef
-}
-
-object Actor {
- private var counter = 0
- type Tag = int
- def makeRef: Tag = {
- counter = counter + 1
- counter
- }
+ def !(msg: Any): Unit = send(msg)
}
diff --git a/src/actors/scala/actors/single/LocalPid.scala b/src/actors/scala/actors/single/LocalPid.scala
deleted file mode 100644
index f04de6ab9c..0000000000
--- a/src/actors/scala/actors/single/LocalPid.scala
+++ /dev/null
@@ -1,93 +0,0 @@
-/* __ *\
-** ________ ___ / / ___ Scala API **
-** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
-** __\ \/ /__/ __ |/ /__/ __ | **
-** /____/\___/_/ |_/____/_/ | | **
-** |/ **
-\* */
-
-// $Id$
-
-package scala.actors.single
-
-import scala.collection.mutable.Queue
-
-/**
- * @author Philipp Haller
- */
-class LocalPid(actor: Actor) extends Pid {
- var target = actor
-
- def !(msg: MailBox#Message): unit = target send msg
-
- def become(clos: Actor => Unit) = {
- // old actor should become anonymous (cannot receive any messages any more)
- // achieved by removing it from target of pid.
-
- val oldActor = target
- //Debug.info("old actor: " + oldActor);
-
- // change our target to point to a newly created actor with the same mailbox.
-
- val newActor = new Actor {
- override def run: Unit = clos(this)
- }
- newActor.sent = oldActor.sent
- target = newActor
- newActor.self = this
-
- //Debug.info("new actor: " + newActor);
-
- // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
- oldActor.sent = new Queue[MailBox#Message]
-
- //Debug.info("Starting new actor.");
- newActor.start // important to start after changing pid because actor may send messages to itself.
- }
-
- private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
- def isDefinedAt(m: MailBox#Message): Boolean = f.isDefinedAt(m)
- def apply(m: MailBox#Message): unit = {
- f(m)
- a receive this
- }
- }
-
- def becomeReceiveLoop(f: PartialFunction[MailBox#Message,Unit]) = {
-
- become(a => a receive new ProxyPartialFunction(a, f))
-
- /*become(
- a:Actor => {
- def loop: Unit = {
- def proxyFun(m: MailBox#Message): Unit = {
- if (f.isDefinedAt(m)) {
- f(m);
- loop
- }
- };
- //a receive proxyFun
- }
- loop
- }
- )*/
- }
-
- def spawn(body: Actor => Unit): Pid = {
- val a = new Actor {
- override def run: Unit = body(this)
- }
- a.start
- a.self
- }
-
- def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
- val a = new Actor {
- override def run: Unit = receive(cases)
- }
- a.start
- a.self
- }
-
- override def toString() = "LocalPid(" + target + ")"
-}
diff --git a/src/actors/scala/actors/single/MailBox.scala b/src/actors/scala/actors/single/MailBox.scala
index f42b287323..3e8116a794 100644
--- a/src/actors/scala/actors/single/MailBox.scala
+++ b/src/actors/scala/actors/single/MailBox.scala
@@ -15,24 +15,20 @@ import scala.collection.mutable.Queue
/**
* @author Philipp Haller
*/
-class MailBox {
-
- type Message = AnyRef
- case class TIMEOUT() extends Message
-
+trait MailBox {
/** Unconsumed messages. */
- var sent = new Queue[Message]
+ var sent = new Queue[Any]
- var continuation: PartialFunction[Message,Unit] = null
+ var continuation: PartialFunction[Any,Unit] = null
// more complex continuation
- var contCases: PartialFunction[Message,Message] = null
- var contThen: Message => unit = null
+ var contCases: PartialFunction[Any,Any] = null
+ var contThen: Any => unit = null
def hasCont =
if ((continuation == null) && (contCases == null)) false
else true
- def contDefinedAt(msg: Message) =
+ def contDefinedAt(msg: Any) =
if (((continuation != null) && continuation.isDefinedAt(msg)) ||
((contCases != null) && contCases.isDefinedAt(msg)))
true
@@ -45,7 +41,7 @@ class MailBox {
private var timeInitial: Long = 0
private var timeoutEnabled: Boolean = false
- def send(msg: Message): unit = synchronized {
+ def send(msg: Any): Unit = synchronized {
if (isAlive)
if (!hasCont) {
Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
@@ -98,7 +94,7 @@ class MailBox {
}
}
- def receive(f: PartialFunction[Message,unit]): scala.All = {
+ def receive(f: PartialFunction[Any, Unit]): Nothing = {
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
@@ -111,7 +107,7 @@ class MailBox {
throw new Done
}
- def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
+ def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
timeInitial = System.currentTimeMillis()
duration = msec
@@ -136,11 +132,7 @@ class MailBox {
throw new Done
}
- // original wish:
- // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
- // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
-
- def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): scala.All = {
+ def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Nothing = {
contCases = null
contThen = null
sent.dequeueFirst(cases.isDefinedAt) match {
diff --git a/src/actors/scala/actors/single/Process.scala b/src/actors/scala/actors/single/Process.scala
new file mode 100644
index 0000000000..3f8e0255bc
--- /dev/null
+++ b/src/actors/scala/actors/single/Process.scala
@@ -0,0 +1,77 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2006, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id: LocalPid.scala 7969 2006-06-23 11:44:31Z michelou $
+
+package scala.actors.single
+
+import scala.collection.mutable.HashSet
+
+/**
+ * @author Philipp Haller
+ */
+class Process extends scala.actors.Process with Actor {
+ case class Exit(from: scala.actors.Process, reason: Symbol)
+
+ private val links = new HashSet[scala.actors.Process]
+
+ def link(to: scala.actors.Process): Unit = {
+ links += to
+ to.linkTo(this)
+ }
+
+ def linkTo(to: scala.actors.Process): Unit = links += to
+
+ def unlink(from: scala.actors.Process): Unit = {
+ links -= from
+ from.unlinkFrom(this)
+ }
+
+ def unlinkFrom(from: scala.actors.Process): Unit = links -= from
+
+ private var trapExit = false
+
+ def processFlag(flag: Symbol, set: boolean) = {
+ if (flag.name.equals("trapExit")) trapExit = set
+ }
+
+ def exit(reason: Symbol): Unit = {
+ exitLinked(reason, new HashSet[Process])
+ if (isAlive) isAlive = false
+ }
+
+ def exit(from: scala.actors.Process, reason: Symbol): Unit = {
+ if (from == this) {
+ exit(reason)
+ }
+ else {
+ if (trapExit)
+ this ! Exit(from, reason)
+ else if (!reason.name.equals("normal"))
+ exit(reason)
+ }
+ }
+
+ def exitLinked(reason: Symbol, exitMarks: HashSet[Process]): Unit = {
+ if (exitMarks contains this) {
+ // we are marked, do nothing
+ }
+ else {
+ exitMarks += this // mark this as exiting
+
+ // exit linked processes
+ val iter = links.elements
+ while (iter.hasNext) {
+ val linked = iter.next
+ unlink(linked)
+ linked.exit(this, reason)
+ }
+ exitMarks -= this
+ }
+ }
+}