summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-06-21 12:35:21 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-06-21 12:35:21 +0000
commit0d8b14c6055e76c0bff3b65d0f428d711abe1f5a (patch)
tree70d30af5d460a943d17d2953041412b79814a2af
parent3fe40a93ffda8571721ff574469171c633191fc4 (diff)
downloadscala-0d8b14c6055e76c0bff3b65d0f428d711abe1f5a.tar.gz
scala-0d8b14c6055e76c0bff3b65d0f428d711abe1f5a.tar.bz2
scala-0d8b14c6055e76c0bff3b65d0f428d711abe1f5a.zip
Added actors library.
-rw-r--r--build.xml40
-rw-r--r--docs/examples/actors/counter.scala60
-rw-r--r--docs/examples/actors/customer.scala66
-rw-r--r--src/actors/scala/actors/Debug.scala38
-rw-r--r--src/actors/scala/actors/Done.scala19
-rw-r--r--src/actors/scala/actors/SPanel.scala74
-rw-r--r--src/actors/scala/actors/Scheduler.scala236
-rw-r--r--src/actors/scala/actors/TimerThread.scala156
-rw-r--r--src/actors/scala/actors/WorkerThread.scala34
-rw-r--r--src/actors/scala/actors/distributed/JXTAServiceBase.scala9
-rw-r--r--src/actors/scala/actors/distributed/JavaSerializer.scala34
-rw-r--r--src/actors/scala/actors/distributed/Messages.scala40
-rw-r--r--src/actors/scala/actors/distributed/MessagesComb.scala36
-rw-r--r--src/actors/scala/actors/distributed/Name.scala7
-rw-r--r--src/actors/scala/actors/distributed/NetKernel.scala591
-rw-r--r--src/actors/scala/actors/distributed/Node.scala6
-rw-r--r--src/actors/scala/actors/distributed/NodeComb.scala12
-rw-r--r--src/actors/scala/actors/distributed/RemoteActor.scala168
-rw-r--r--src/actors/scala/actors/distributed/RemotePid.scala184
-rw-r--r--src/actors/scala/actors/distributed/Serializer.scala53
-rw-r--r--src/actors/scala/actors/distributed/Service.scala72
-rw-r--r--src/actors/scala/actors/distributed/TcpSerializerComb.scala126
-rw-r--r--src/actors/scala/actors/distributed/TcpService.scala215
-rw-r--r--src/actors/scala/actors/distributed/TcpServiceWorker.scala86
-rw-r--r--src/actors/scala/actors/distributed/Util.scala58
-rw-r--r--src/actors/scala/actors/distributed/picklers/BytePickle.scala436
-rw-r--r--src/actors/scala/actors/distributed/picklers/SStreamPickle.scala519
-rw-r--r--src/actors/scala/actors/distributed/picklers/Streams.scala87
-rw-r--r--src/actors/scala/actors/distributed/picklers/UTF8Codec.scala77
-rw-r--r--src/actors/scala/actors/gui/Button.scala20
-rw-r--r--src/actors/scala/actors/gui/Caret.scala8
-rw-r--r--src/actors/scala/actors/gui/Component.scala9
-rw-r--r--src/actors/scala/actors/gui/Container.scala18
-rw-r--r--src/actors/scala/actors/gui/EmptyBorder.scala8
-rw-r--r--src/actors/scala/actors/gui/FormattedTextField.scala9
-rw-r--r--src/actors/scala/actors/gui/Frame.scala33
-rw-r--r--src/actors/scala/actors/gui/GUIApplication.scala20
-rw-r--r--src/actors/scala/actors/gui/Label.scala14
-rw-r--r--src/actors/scala/actors/gui/MainFrame.scala14
-rw-r--r--src/actors/scala/actors/gui/Orientation.scala11
-rw-r--r--src/actors/scala/actors/gui/Panel.scala15
-rw-r--r--src/actors/scala/actors/gui/Publisher.scala126
-rw-r--r--src/actors/scala/actors/gui/SimpleGUIApplication.scala14
-rw-r--r--src/actors/scala/actors/gui/SwingComponent.scala11
-rw-r--r--src/actors/scala/actors/gui/TextComponent.scala22
-rw-r--r--src/actors/scala/actors/gui/TextField.scala22
-rw-r--r--src/actors/scala/actors/gui/event/ButtonPressed.scala3
-rw-r--r--src/actors/scala/actors/gui/event/CaretUpdate.scala3
-rw-r--r--src/actors/scala/actors/gui/event/Event.scala3
-rw-r--r--src/actors/scala/actors/gui/event/MouseDragged.scala3
-rw-r--r--src/actors/scala/actors/gui/event/MouseEvent.scala3
-rw-r--r--src/actors/scala/actors/gui/event/MouseMoved.scala3
-rw-r--r--src/actors/scala/actors/gui/event/TextModified.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowActivated.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowClosed.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowClosing.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowDeactivated.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowDeiconified.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowEvent.scala5
-rw-r--r--src/actors/scala/actors/gui/event/WindowIconified.scala3
-rw-r--r--src/actors/scala/actors/gui/event/WindowOpened.scala3
-rw-r--r--src/actors/scala/actors/gui/layout.scala12
-rw-r--r--src/actors/scala/actors/multi/AbstractPid.scala10
-rw-r--r--src/actors/scala/actors/multi/Actor.scala300
-rw-r--r--src/actors/scala/actors/multi/LocalPid.scala94
-rw-r--r--src/actors/scala/actors/multi/MailBox.scala175
-rw-r--r--src/actors/scala/actors/multi/Pid.scala20
-rw-r--r--src/actors/scala/actors/multi/ReceiverTask.scala16
-rw-r--r--src/actors/scala/actors/single/AbstractPid.scala10
-rw-r--r--src/actors/scala/actors/single/Actor.scala72
-rw-r--r--src/actors/scala/actors/single/LocalPid.scala83
-rw-r--r--src/actors/scala/actors/single/MailBox.scala155
-rw-r--r--src/actors/scala/actors/single/Pid.scala10
73 files changed, 4916 insertions, 1 deletions
diff --git a/build.xml b/build.xml
index 5d443d03ea..23acc968c3 100644
--- a/build.xml
+++ b/build.xml
@@ -53,6 +53,7 @@ PROPERTIES
<property name="dist.dir" value="${basedir}/dists"/>
<property name="lib.jar.name" value="scala-library.jar"/>
<property name="dbc.jar.name" value="scala-dbc.jar"/>
+ <property name="actors.jar.name" value="scala-actors.jar"/>
<property name="comp.jar.name" value="scala-compiler.jar"/>
<property name="scala.exec.name" value="scala"/>
<property name="scalac.exec.name" value="scalac"/>
@@ -289,6 +290,7 @@ BUILD LOCAL REFERENCE (LOCKER) LAYER
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
+ <exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</starr>
<!-- Build compiler -->
@@ -405,6 +407,7 @@ BUILD QUICK-TEST LAYER
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
+ <exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker>
<!-- Build DBC -->
@@ -421,6 +424,20 @@ BUILD QUICK-TEST LAYER
<include name="scala/dbc/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</locker>
+ <!-- Build actors library -->
+ <mkdir dir="${quick.dir}/lib/actors"/>
+ <locker
+ srcdir="${src.dir}/actors"
+ destdir="${quick.dir}/lib/actors"
+ addparams="${nsc.params}"
+ scalacdebugging="${nsc.log-files}">
+ <classpath>
+ <pathelement location="${quick.dir}/lib/library"/>
+ <pathelement location="${quick.dir}/lib/actors"/>
+ </classpath>
+ <include name="scala/actors/**/*.scala"/>
+ <excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
+ </locker>
<!-- Build compiler -->
<mkdir dir="${quick.dir}/lib/compiler"/>
<locker
@@ -540,6 +557,7 @@ TEST
<include name="**/*.scala"/>
<exclude name="scala/Predef.scala"/>
<exclude name="scala/dbc/**"/>
+ <exclude name="scala/actors/**"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick>
<!-- Build DBC -->
@@ -555,6 +573,19 @@ TEST
<include name="scala/dbc/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quick>
+ <!-- Build actors library -->
+ <mkdir dir="${strap.dir}/lib/actors"/>
+ <quick
+ srcdir="${src.dir}/actors"
+ destdir="${strap.dir}/lib/actors"
+ addparams="${nsc.params}">
+ <classpath>
+ <pathelement location="${strap.dir}/lib/library"/>
+ <pathelement location="${strap.dir}/lib/actors"/>
+ </classpath>
+ <include name="scala/actors/**/*.scala"/>
+ <excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
+ </quick>
<!-- Build compiler -->
<mkdir dir="${strap.dir}/lib/compiler"/>
<quick
@@ -683,6 +714,7 @@ DOCUMENTATION
documenttitle="&lt;div&gt;Scala ${version.number}&lt;/div&gt;"
classpath="${quick.dir}/lib/library">
<include name="dbc/**/*.scala"/>
+ <include name="actors/**/*.scala"/>
<include name="library/**/*.scala"/>
<excludesfile name="${nsc.excludes.file}" if="excludes.avail"/>
</quickdoc>
@@ -824,6 +856,12 @@ GENERATES A DISTRIBUTION
<attribute name="Signature-Version" value="${version.number}"/>
</manifest>
</jar>
+ <jar destfile="${dist.current.dir}/lib/${actors.jar.name}">
+ <fileset dir="${strap.dir}/lib/actors"/>
+ <manifest>
+ <attribute name="Signature-Version" value="${version.number}"/>
+ </manifest>
+ </jar>
<!-- Copy executable files -->
<mkdir dir="${dist.current.dir}/bin"/>
<copy todir="${dist.current.dir}/bin">
@@ -886,7 +924,7 @@ GENERATES A DISTRIBUTION
version="${version.number}"
desc="The Scala library. This is the minimal requirement to run any Scala program."
link="${sbaz.universe}/scala-library-${version.number}.sbp">
- <libset dir="${dist.current.dir}/lib" includes="${lib.jar.name},${dbc.jar.name}"/>
+ <libset dir="${dist.current.dir}/lib" includes="${lib.jar.name},${dbc.jar.name},${actors.jar.name}"/>
</quicksbaz>
<!-- Create the Scala developper package -->
<quicksbaz
diff --git a/docs/examples/actors/counter.scala b/docs/examples/actors/counter.scala
new file mode 100644
index 0000000000..8578199bf5
--- /dev/null
+++ b/docs/examples/actors/counter.scala
@@ -0,0 +1,60 @@
+
+import scala.actors.multi.Pid
+import actors.distributed.RemoteActor
+import actors.distributed.TCP
+import actors.distributed.TcpNode
+import actors.distributed.TcpService
+
+abstract class CounterMessage
+case class Incr() extends CounterMessage
+case class Value(p: Pid) extends CounterMessage
+case class Result(v: int) extends CounterMessage
+
+class Counter extends RemoteActor {
+ override def run(): unit =
+ loop(0)
+
+ def loop(value: int): unit = {
+ Console.println("Value: " + value)
+ receive {
+ case Incr() =>
+ loop(value + 1)
+ case Value(p) =>
+ p ! Result(value)
+ loop(value)
+ case other =>
+ loop(value)
+ }
+ }
+}
+
+class CounterUser extends RemoteActor {
+ override def run(): unit = {
+ alive(TCP())
+
+ spawn(TcpNode("127.0.0.1", 9090), "Counter")
+
+ receive {
+ case p: Pid =>
+ // communicate with counter
+ Console.println("" + node + ": Sending Incr() to remote Counter (" + p + ")...")
+ p ! Incr()
+ p ! Incr()
+ p ! Value(self)
+ receive {
+ case Result(v) =>
+ Console.println("Received result: " + v)
+ }
+ }
+ }
+}
+
+object CounterTest {
+ def main(args: Array[String]): unit = {
+ val serv = new TcpService(9090)
+ serv.start()
+
+ val cu = new CounterUser
+ cu.start()
+ }
+}
diff --git a/docs/examples/actors/customer.scala b/docs/examples/actors/customer.scala
new file mode 100644
index 0000000000..64453aee16
--- /dev/null
+++ b/docs/examples/actors/customer.scala
@@ -0,0 +1,66 @@
+/**
+ @author Philipp Haller <philipp.haller@epfl.ch>
+
+ This shows "customer passing" for implementing
+ recursive algorithms using actors.
+ */
+
+import scala.actors._;
+import scala.actors.single._;
+
+abstract class FactorialMessage;
+case class Factorial(n: int, resTo: Actor) extends FactorialMessage;
+case class Value(n: int) extends FactorialMessage;
+
+class FactorialProcess extends Actor {
+ override def run: unit = {
+ receive {
+ case Factorial(n, resTo) =>
+ if (n == 0) {
+ Debug.info("Sending Value(1) to " + resTo)
+ resTo send Value(1)
+ }
+ else {
+ // spawn process that multiplies
+ /*val m = spawnReceive({
+ case Value(value) => resTo send Value(n * value)
+ });*/
+
+ val m = new MultiplyActor(n, resTo)
+ m.start
+ Debug.info("Sending Factorial(" + (n-1) + ", " + m + ") to " + this)
+ this send Factorial(n-1, m)
+ }
+ run
+ }
+ }
+}
+
+class MultiplyActor(factor: int, resTo: Actor) extends Actor {
+ override def run: unit =
+ receive {
+ case Value(value) =>
+ Debug.info("Sending Value(" + factor * value + ") to " + resTo)
+ resTo send Value(factor * value)
+ Debug.info("Done sending.")
+ }
+}
+
+object CustomerPassing {
+ def main(args: Array[String]): unit = {
+ val fac = new FactorialProcess
+ fac.start
+
+ val c = new Actor {
+ override def run: unit = {
+ fac send Factorial(3, this)
+
+ receive {
+ case Value(value) =>
+ System.out.println("Result: " + value)
+ }
+ }
+ }
+ c.start
+ }
+}
diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala
new file mode 100644
index 0000000000..1b1e3f0705
--- /dev/null
+++ b/src/actors/scala/actors/Debug.scala
@@ -0,0 +1,38 @@
+package scala.actors;
+
+/**
+ * @author Philipp Haller
+ */
+object Debug {
+ var lev = 2
+
+ def level = lev
+ def level_= (lev: int) = {
+ //Console.println("Setting debug level to " + lev)
+ this.lev = lev
+ }
+
+ def info(s: String) =
+ if (lev > 2) System.out.println("Info: " + s)
+ def warning(s: String) =
+ if (lev > 1) System.err.println("Warning: " + s)
+ def error(s: String) =
+ if (lev > 0) System.err.println("Error: " + s)
+}
+
+class Debug(tag: String) {
+ var lev = 2
+
+ def level = lev
+ def level_= (lev: int) = {
+ //Console.println("Setting debug level (" + tag + ") to " + lev)
+ this.lev = lev
+ }
+
+ def info(s: String) =
+ if (lev > 2) System.out.println(tag + " (info): " + s)
+ def warning(s: String) =
+ if (lev > 1) System.err.println(tag + " (warn): " + s)
+ def error(s: String) =
+ if (lev > 0) System.err.println(tag + " (erro): " + s)
+}
diff --git a/src/actors/scala/actors/Done.scala b/src/actors/scala/actors/Done.scala
new file mode 100644
index 0000000000..5bef8d89d8
--- /dev/null
+++ b/src/actors/scala/actors/Done.scala
@@ -0,0 +1,19 @@
+package scala.actors;
+
+/**
+ * @author Philipp Haller
+ */
+class Done extends Throwable {
+ override def fillInStackTrace(): Throwable =
+ this;
+}
+
+class ContinueException extends Throwable {
+ override def fillInStackTrace(): Throwable =
+ this;
+}
+
+class AbortException extends Throwable {
+ override def fillInStackTrace(): Throwable =
+ this;
+}
diff --git a/src/actors/scala/actors/SPanel.scala b/src/actors/scala/actors/SPanel.scala
new file mode 100644
index 0000000000..7f3b3a7a1b
--- /dev/null
+++ b/src/actors/scala/actors/SPanel.scala
@@ -0,0 +1,74 @@
+package scala.actors;
+
+
+/*
+ * SPanel.scala
+ * GUI for simple texts
+ *
+ */
+import java.awt._;
+import java.awt.event._;
+import javax.swing.event._;
+import javax.swing._;
+import java.io._;
+
+
+
+class SPanel (WIDTH:int, HEIGHT:int, title:String) extends JFrame{
+
+ private var textArea:JTextArea = null;
+ private var resetButton:JButton = null;
+
+ private var scrollPane:JScrollPane = null;
+ private var panel:JPanel = null;
+ private var contentPane:Container = null;
+ private var levelChoice:JComboBox = null;
+ private var formatChoice:JComboBox = null;
+
+ //init
+ setTitle(title);
+ setSize(WIDTH, HEIGHT);
+ this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
+
+ contentPane = getContentPane();
+ panel = new JPanel();
+ contentPane.add(panel, BorderLayout.SOUTH);
+
+ // Add a text area with scroll bars
+ textArea = new JTextArea(8, 40);
+ scrollPane = new JScrollPane(textArea);
+ contentPane.add(scrollPane, BorderLayout.CENTER);
+
+ // Add a "reset textarea" button
+ resetButton = new JButton("Clear");
+ panel.add(resetButton);
+ resetButton.addActionListener(
+ new ActionListener() {
+ def actionPerformed(evt:ActionEvent):Unit= {
+ textArea.setText("");
+ }
+ }
+ );
+
+ this.repaint();
+ this.show();
+
+
+ def addText(text:String):Unit = {
+ textArea.append(text+'\n');
+
+ if ( textArea.getHeight() > scrollPane.getHeight() ) {
+ scrollPane.getVerticalScrollBar().setValue(scrollPane.getVerticalScrollBar().getMaximum());
+ }
+ repaint();
+ }
+/*
+ def actionPerformed(ActionEvent e):Unit {
+ }
+*/
+
+ override def paint(g:Graphics ): unit = {
+ super.paint( g );
+ }
+
+}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
new file mode 100644
index 0000000000..7caaa1b35c
--- /dev/null
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -0,0 +1,236 @@
+package scala.actors
+
+import scala.collection.mutable._
+
+import scala.actors.multi._
+
+/**
+ * @author Philipp Haller
+ */
+abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
+ def execute(item: ReceiverTask): unit;
+ def getTask(worker: WorkerThread): Runnable;
+ def tick(a: MailBox): unit;
+
+ val QUIT_TASK = new Runnable() {
+ def run(): unit = {};
+ override def toString() = "QUIT_TASK";
+ }
+}
+
+
+object Scheduler /*extends java.util.concurrent.Executor*/ {
+ private var sched: /*java.util.concurrent.Executor*/ IScheduler =
+ //java.util.concurrent.Executors.newFixedThreadPool(2);
+ //new FixedWorkersScheduler(2);
+ new SpareWorkerScheduler2
+ //new SpareWorkerScheduler
+
+ def impl = sched
+ def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = {
+ Debug.info("Using scheduler " + scheduler)
+ sched = scheduler
+ }
+
+ def execute(item: ReceiverTask) =
+ sched.execute(item)
+
+ def tick(a: MailBox) =
+ sched.tick(a)
+}
+
+
+class SpareWorkerScheduler2 extends IScheduler {
+ private val tasks = new Queue[ReceiverTask];
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
+
+ val idle = new Queue[WorkerThread];
+ val ticks = new HashMap[WorkerThread, long]
+ val executing = new HashMap[MailBox, WorkerThread]
+
+ var TICKFREQ = 50
+
+ def init = {
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init;
+
+ var maxWorkers = 0;
+ var ticksCnt = 0;
+
+ def tick(a: MailBox): unit = synchronized {
+ ticksCnt = ticksCnt + 1
+ executing.get(a) match {
+ case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
+ case Some(wt) =>
+ ticks.update(wt, System.currentTimeMillis)
+ }
+ }
+
+ def execute(item: ReceiverTask): unit = synchronized {
+ if (idle.length > 0) {
+ val wt = idle.dequeue
+ executing.update(item.actor, wt)
+ wt.execute(item)
+ }
+ else {
+ /* only create new worker thread
+ when all are blocked according to heuristic
+
+ we check time stamps of latest send/receive ops
+ of ALL workers
+
+ we stop if there is one which is not blocked */
+
+ val iter = workers.elements
+ var foundBusy = false
+ while (iter.hasNext && !foundBusy) {
+ val wt = iter.next
+ ticks.get(wt) match {
+ case None => foundBusy = true // assume not blocked
+ case Some(ts) => {
+ val currTime = System.currentTimeMillis
+ if (currTime - ts < TICKFREQ)
+ foundBusy = true
+ }
+ }
+ }
+
+ if (!foundBusy) {
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ maxWorkers = workers.length // statistics
+
+ executing.update(item.actor, newWorker)
+
+ newWorker.execute(item)
+ newWorker.start()
+ }
+ else {
+ // wait assuming busy thread will be finished soon
+ // and ask for more work
+ tasks += item
+ }
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) {
+ val item = tasks.dequeue
+ executing.update(item.actor, worker)
+ item
+ }
+ else {
+ idle += worker
+ null
+ }
+ }
+}
+
+/**
+ * @author Philipp Haller
+ */
+class SpareWorkerScheduler extends IScheduler {
+ private var canQuit = false;
+ private val tasks = new Queue[Runnable];
+ private val idle = new Queue[WorkerThread];
+
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
+
+ def init = {
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init;
+
+ var maxWorkers = 0;
+
+ def execute(item: ReceiverTask): unit = synchronized {
+ if (idle.length == 0) {
+ tasks += item
+ // create new worker
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ maxWorkers = workers.length
+ newWorker.start()
+ //newWorker.execute(item)
+ }
+ else {
+ canQuit = true
+ idle.dequeue.execute(item)
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ //if ((idle.length == workers.length) && canQuit) haltExcept(worker)
+ //else null
+ }
+ }
+
+ def tick(a: MailBox): unit = {}
+
+ def haltExcept(w: WorkerThread) = {
+ for (val i <- List.range(0, workers.length))
+ if (workers(i) != w) workers(i).halt
+ QUIT_TASK
+ }
+}
+
+
+/**
+ * @author Philipp Haller
+ */
+class FixedWorkersScheduler(workercnt: int) extends IScheduler {
+ private var canQuit = false;
+ private val tasks = new Queue[Runnable];
+ private val idle = new Queue[WorkerThread];
+
+ //Console.println("Running with " + workercnt + " workers")
+ private var workers = new Array[WorkerThread](workercnt);
+
+ def init = {
+ for (val i <- List.range(0, workers.length)) {
+ workers(i) = new WorkerThread(this)
+ workers(i).start()
+ }
+ }
+ init;
+
+ def execute(item: ReceiverTask): unit = synchronized {
+ if (workers.length == 0) item.run
+ else {
+ canQuit = true
+ if (idle.length > 0) idle.dequeue.execute(item)
+ else tasks += item
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ //if ((idle.length == workers.length) && canQuit) haltExcept(worker)
+ //else null
+ }
+ }
+
+ def tick(a: MailBox): unit = {}
+
+ def haltExcept(w: WorkerThread) = {
+ for (val i <- List.range(0, workers.length))
+ if (workers(i) != w) workers(i).halt
+ QUIT_TASK
+ }
+}
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
new file mode 100644
index 0000000000..a483d7b043
--- /dev/null
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -0,0 +1,156 @@
+/**
+ @author Sebastien Noir
+
+ This class allows the (locl) sending of a message to an actor after a timeout. Used by the library to build receiveWithin(time:long). Note that the library deletes non received TIMEOUT() message if a messsage is received before the time-out occurs.
+ */
+
+ package scala.actors
+
+import scala.collection.mutable.PriorityQueue
+
+import scala.actors.multi.Actor
+import scala.actors.multi.MailBox
+
+case class Signal()
+
+object TimerThread extends AnyRef with Runnable {
+ case class WakedActor(actor: MailBox, time: long, reason: String) extends Ordered[WakedActor] {
+ var valid = true
+ def compare [b >: WakedActor <% Ordered[b]](that: b): int = that match {
+ case that2: WakedActor => -(this.time compare that2.time)
+ case _ => error("not comparable")
+ }
+ }
+
+ var queue = new PriorityQueue[WakedActor]
+ val t = new Thread(this); t.start
+
+ var lateList: List[WakedActor] = Nil
+
+ def trashRequest(a: MailBox) = synchronized {
+ // keep in mind: killing dead people is a bad idea!
+ queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
+ case Some(b) =>
+ b.valid = false
+ case None =>
+ lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match {
+ case Some(b2) =>
+ b2.valid = false
+ case None =>
+ }
+ }
+ }
+
+ override def run = while (true) {
+ this.synchronized {
+ try {
+ val sleepTime = dequeueLateAndGetSleepTime
+ if (lateList.isEmpty) {
+ wait(sleepTime)
+ }
+ } catch {
+ case t: Throwable => { t.printStackTrace(); throw t }
+ }
+ }
+
+ // process guys waiting for signal and empty list
+ for (val wa <- lateList) {
+ if (wa.valid) {
+ wa.actor send Signal()
+ }
+ }
+ lateList = Nil
+ }
+
+ def requestSignal(a: Actor, waitMillis: long, reason: String): unit = this.synchronized {
+ Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason)
+ val wakeTime = now + waitMillis
+ if (waitMillis < 0) {
+ a send Signal()
+ return
+ }
+
+ if (queue.isEmpty) { // add to queue and restart sleeping
+ queue += WakedActor(a, wakeTime, reason)
+ notify()
+ } else { //queue full
+ if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
+ queue += WakedActor (a, wakeTime, reason)
+ notify()
+ } else { // simply add to queue
+
+ queue += WakedActor (a, wakeTime, reason)
+ }
+ }
+ }
+
+ def requestTimeout(a: MailBox, waitMillis: long): unit = synchronized {
+ val wakeTime = now + waitMillis
+ if (waitMillis < 0) {
+ a send Signal()
+ return
+ }
+
+ if (queue.isEmpty) { // add to queue and restart sleeping
+ queue += WakedActor(a, wakeTime, "")
+ notify()
+ } else
+ if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
+ queue += WakedActor (a, wakeTime, "")
+ notify()
+ }
+ else // simply add to queue
+ queue += WakedActor (a, wakeTime, "")
+ }
+
+ private def dequeueLateAndGetSleepTime: long = {
+ val FOREVER: long = 0
+ var waitingList: List[WakedActor] = Nil
+
+ while (!queue.isEmpty) {
+ val next = queue.max.time
+ val amount = next - now
+ if (amount > 0) { // guy in queue is not late
+ lateList = waitingList // give back the list of waiting guys for signaling
+ return amount
+ }
+ else // we're late: dequeue and examine next guy
+ waitingList = queue.dequeue :: waitingList
+ }
+
+ // empty queue => sleep forever
+ lateList = waitingList
+ return FOREVER
+ }
+
+ def now = new java.util.Date().getTime()
+}
+
+//================================================================================
+
+object TimerThreadTest {
+ def main (args:Array[String]) = {
+ new Tester (1000, "ONE").start
+ new Tester (500, "TWO").start
+ }
+
+ class Tester (duration : int, name:String) extends Actor {
+ var i = 0
+
+ def loop:unit = {
+ receive {
+ case Signal() =>
+ Console.println(name + i)
+ i = i+1;
+ loop
+ }
+ }
+
+ override def run = {
+ for (val i <-List.range(1,10)) {
+ TimerThread.requestSignal(this, (duration * i).asInstanceOf[long], ""+duration*i)
+ }
+ loop
+ }
+ }
+}
diff --git a/src/actors/scala/actors/WorkerThread.scala b/src/actors/scala/actors/WorkerThread.scala
new file mode 100644
index 0000000000..ab2ffbeb32
--- /dev/null
+++ b/src/actors/scala/actors/WorkerThread.scala
@@ -0,0 +1,34 @@
+package scala.actors;
+
+/**
+ * @author Philipp Haller
+ */
+class WorkerThread(sched: IScheduler) extends Thread {
+ private var task: Runnable = null
+ private var running = true
+
+ def halt = synchronized {
+ running = false
+ notify()
+ }
+
+ def execute(r: Runnable) = synchronized {
+ //Debug.info("WORK: " + this + ": Executing task " + r)
+ task = r
+ notify()
+ }
+
+ override def run(): unit = synchronized {
+ while (running) {
+ if (task != null) {
+ task.run()
+ //Debug.info("WORK: " + this + " has finished.")
+ }
+ //Debug.info("WORK: " + this + ": Getting new task...")
+ task = sched.getTask(this)
+ //Debug.info("WORK (" + this + "): got task " + task)
+ if (task == sched.QUIT_TASK) running = false
+ else if (task == null) wait()
+ }
+ }
+}
diff --git a/src/actors/scala/actors/distributed/JXTAServiceBase.scala b/src/actors/scala/actors/distributed/JXTAServiceBase.scala
new file mode 100644
index 0000000000..b3e2603974
--- /dev/null
+++ b/src/actors/scala/actors/distributed/JXTAServiceBase.scala
@@ -0,0 +1,9 @@
+package scala.actors.distributed;
+
+abstract class JXTAServiceBase(nodename: String) extends Thread with Service {
+ val serializer = new JavaSerializer(this);
+ private val internalNode = new JXTANode(nodename);
+ def node: Node = internalNode;
+ def createPid(actor: RemoteActor): RemotePid =
+ new JXTAPid(internalNode, makeUid, kernel, actor)
+}
diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala
new file mode 100644
index 0000000000..b25437ff86
--- /dev/null
+++ b/src/actors/scala/actors/distributed/JavaSerializer.scala
@@ -0,0 +1,34 @@
+package scala.actors.distributed;
+
+import java.io._;
+import scala.collection.mutable._;
+
+import scala.actors.distributed.picklers.BytePickle.SPU;
+import scala.actors.distributed.picklers._;
+import scala.actors.multi._;
+
+[serializable]
+class JavaSerializer(serv: Service) extends Serializer(serv) {
+ val debug = true;
+
+ def log (s:String) = {
+ if (debug) Console.println("JAVASerializer: "+s)
+ }
+
+ def serialize(o: AnyRef): Array[byte] = {
+ val bos = new ByteArrayOutputStream()
+ val out = new ObjectOutputStream(bos)
+ out.writeObject(o)
+ out.flush()
+ bos.toByteArray()
+ }
+
+ def deserialize(bytes: Array[byte]): AnyRef = {
+ val bis = new ByteArrayInputStream(bytes);
+ val in = new ObjectInputStream(bis);
+ in.readObject();
+ }
+
+ def pid: SPU[Pid] = null;
+ def addRep(name: String, repCons: Serializer => AnyRef): unit = {};
+}
diff --git a/src/actors/scala/actors/distributed/Messages.scala b/src/actors/scala/actors/distributed/Messages.scala
new file mode 100644
index 0000000000..d8a663922a
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Messages.scala
@@ -0,0 +1,40 @@
+package scala.actors.distributed
+
+import scala.actors.multi.Pid
+import scala.actors.multi.ExcHandlerDesc
+
+abstract class MessageTyper {
+ type DataType=Array[byte];
+}
+
+abstract class SystemMessage;
+case class Send(rec: Pid, data: MessageTyper#DataType) extends SystemMessage;
+case class Spawn(replyto: Pid, p: String) extends SystemMessage;
+case class PidReply(res: Pid) extends SystemMessage;
+case class Disconnect() extends SystemMessage;
+
+case class NodeDown() extends SystemMessage;
+
+// ACHTUNG: Tells "from" to create a _uni-directional_ link!
+case class Link(from: Pid, to: Pid) extends SystemMessage;
+case class UnLink(from: Pid, to: Pid) extends SystemMessage;
+case class Exit1(from: Pid, to: Pid, reason: Symbol) extends SystemMessage;
+
+case class SpawnObject(replyto: Pid, data: MessageTyper#DataType) extends SystemMessage;
+
+case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage;
+
+case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage;
+
+/*
+case class NamedSendRep (ser:Serializer) extends TypeRep[NamedSend](ser) {
+ def serialize(content: NamedSend, w: java.io.Writer): unit = {
+ StringRep(ser).serialize(content.sym.name, w)
+ StringRep(ser).serialize(content.data, w)
+ }
+ def deserialize(r:java.io.Reader): NamedSend = {
+ NamedSend(Symbol(StringRep(ser).deserialize(r)),
+ StringRep(ser).deserialize(r))
+ }
+}
+*/
diff --git a/src/actors/scala/actors/distributed/MessagesComb.scala b/src/actors/scala/actors/distributed/MessagesComb.scala
new file mode 100644
index 0000000000..af3edc13a2
--- /dev/null
+++ b/src/actors/scala/actors/distributed/MessagesComb.scala
@@ -0,0 +1,36 @@
+package scala.actors.distributed;
+
+import scala.actors.distributed.picklers.BytePickle._;
+import scala.actors.multi.Pid;
+
+object MessagesComb {
+ def sendPU(ser: Serializer): SPU[Send] =
+ wrap((p: Pair[Pid,Array[byte]]) => Send(p._1, p._2),
+ (s: Send) => Pair(s.rec, s.data),
+ pair(ser.pid, bytearray));
+
+ def spawnPU(ser: Serializer): SPU[Spawn] =
+ wrap((p: Pair[Pid,String]) => Spawn(p._1, p._2),
+ (s: Spawn) => Pair(s.replyto, s.p),
+ pair(ser.pid, string));
+
+ def symbolPU: SPU[Symbol] =
+ wrap((s: String) => Symbol(s),
+ (sym: Symbol) => sym.name,
+ string);
+
+ def exitPU(ser: Serializer): SPU[Exit1] =
+ wrap((p: Triple[Pid,Pid,Symbol]) => Exit1(p._1, p._2, p._3),
+ (e: Exit1) => Triple(e.from, e.to, e.reason),
+ triple(ser.pid, ser.pid, symbolPU));
+
+ def spawnObjectPU(ser: Serializer): SPU[SpawnObject] =
+ wrap((p: Pair[Pid,Array[byte]]) => SpawnObject(p._1, p._2),
+ (s: SpawnObject) => Pair(s.replyto, s.data),
+ pair(ser.pid, bytearray));
+
+ def namedSendPU: SPU[NamedSend] =
+ wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2),
+ (ns: NamedSend) => Pair(ns.sym, ns.data),
+ pair(symbolPU, bytearray));
+}
diff --git a/src/actors/scala/actors/distributed/Name.scala b/src/actors/scala/actors/distributed/Name.scala
new file mode 100644
index 0000000000..02842f1501
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Name.scala
@@ -0,0 +1,7 @@
+package scala.actors.distributed;
+
+case class Name(node: Node, sym: Symbol, kernel: NetKernel) {
+ def !(msg: AnyRef): unit = {
+ kernel.namedSend(this, msg)
+ }
+}
diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala
new file mode 100644
index 0000000000..28821350d6
--- /dev/null
+++ b/src/actors/scala/actors/distributed/NetKernel.scala
@@ -0,0 +1,591 @@
+package scala.actors.distributed
+
+import java.io.StringReader
+import java.io.StringWriter
+import java.util.logging._
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+
+import java.net.UnknownHostException
+import java.io.IOException
+import java.lang.SecurityException
+
+import scala.actors.multi.Actor
+import scala.actors.multi.ExcHandlerDesc
+
+case class RA(a: RemoteActor);
+
+object NetKernel {
+ var kernel: NetKernel = null;
+}
+
+class NetKernel(service: Service) {
+ NetKernel.kernel = this
+
+ // contains constructors
+ private val ptable =
+ new HashMap[String, () => RemoteActor];
+
+ // maps local ids to scala.actors
+ private val rtable =
+ new HashMap[int, RemoteActor];
+
+ // maps scala.actors to their RemotePid
+ private val pidTable =
+ new HashMap[RemoteActor, RemotePid];
+
+ private var running = true;
+
+ val logLevel = Level.FINE
+ val l = Logger.getLogger("NetKernel")
+ l.setLevel(logLevel)
+ val consHand = new ConsoleHandler
+ consHand.setLevel(logLevel)
+ l.addHandler(consHand)
+
+ //start // start NetKernel
+
+ /** only called if destDesc is local. */
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ destDesc.pid match {
+ case rpid: RemotePid =>
+ (rtable get rpid.localId) match {
+ case Some(actor) =>
+ actor.handleExc(destDesc, e)
+ case None =>
+ error("exc desc refers to non-registered actor")
+ }
+ }
+ }
+
+ def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ // locality check (handler local to this node?)
+ destDesc.pid match {
+ case rpid: RemotePid =>
+ if (rpid.node == this.node)
+ handleExc(destDesc, e)
+ else
+ sendToNode(rpid.node, ForwardExc(destDesc, e))
+ }
+ }
+
+ def sendToNode(node: Node, msg: AnyRef) = {
+ //val sw = new StringWriter
+ val bytes = service.serializer.serialize(msg /*, sw*/)
+ service.send(node, bytes)
+ //service.send(node, sw.toString())
+ }
+
+ def addConstructor(key: String, value: () => RemoteActor) =
+ ptable.update(key, value);
+
+ def node: Node = service.node;
+
+ def nodes: List[Node] = service.nodes;
+
+ def pidOf(actor: RemoteActor): RemotePid = synchronized {
+ pidTable.get(actor) match {
+ case None => error("malformed pid table in " + this)
+ case Some(pid) => pid
+ }
+ }
+
+ def disconnectNode(n: Node) = synchronized {
+ service.disconnectNode(n)
+ }
+
+ def getLocalRef(locId: int): RemoteActor =
+ rtable.get(locId) match {
+ case None =>
+ error("" + locId + " is not registered at " + this)
+ case Some(remoteActor: RemoteActor) =>
+ remoteActor
+ }
+
+ def localSend(localId: int, msg: AnyRef): unit = synchronized {
+ rtable.get(localId) match {
+ case None =>
+ error("" + localId + " is not registered at " + this)
+ case Some(remoteActor: RemoteActor) =>
+ //Console.println("local send to " + remoteActor)
+ remoteActor send msg
+ }
+ }
+
+ def localSend(pid: RemotePid, msg: AnyRef): unit =
+ localSend(pid.localId, msg);
+
+ def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized {
+ //Console.println("NetKernel: Remote msg delivery to " + pid)
+ service.remoteSend(pid, msg)
+ }
+
+ def namedSend(name: Name, msg: AnyRef): unit = {
+ if (name.node == this.node) {
+ // look-up name
+ nameTable.get(name.sym) match {
+ case None =>
+ // message is lost
+ //Console.println("lost message " + msg + " because " + name + " not registered.")
+ case Some(localId) => localSend(localId, msg)
+ }
+ }
+ else {
+ // remote send
+
+ // serialize msg
+ ///val sw = new StringWriter
+ val bytes = service.serializer.serialize(msg/*, sw*/)
+
+ sendToNode(name.node, NamedSend(name.sym, bytes))
+ //sendToNode(name.node, NamedSend(name.sym, sw.toString()))
+ }
+ }
+
+ val nameTable = new HashMap[Symbol, int]
+
+ def registerName(name: Symbol, pid: RemotePid): unit = synchronized {
+ nameTable += name -> pid.localId
+ }
+
+ def registerName(name: Symbol, a: RemoteActor): unit = synchronized {
+ val pid = register(a)
+ registerName(name, pid)
+ a.start
+ }
+
+ /*override def run: unit = receive {
+ case ForwardExc(destDesc, e) =>
+ // TODO
+ case Spawn(reply: RemotePid, pname) =>
+ val newPid = spawn(pname)
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ run
+
+ case SpawnObject(reply: RemotePid, data: Array[byte]) =>
+ //val sr = new StringReader(data)
+ //service.serializer.deserialize(sr) match {
+
+ service.serializer.deserialize(data) match {
+ case RA(actor) =>
+ val newPid = register(actor)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ actor.start
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ }
+ run
+
+ case NamedSend(sym: Symbol, data) =>
+ // look-up name
+ nameTable.get(sym) match {
+ case None =>
+ // message is lost
+ Console.println("lost message " + data + " because " + sym + " not registered.")
+ case Some(localId) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+
+ val msg = service.serializer.deserialize(data)
+ localSend(localId, msg)
+ }
+ run
+
+ case Send(pid: RemotePid, data) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+ val msg = service.serializer.deserialize(data)
+
+ Console.println("locally send " + msg + " to " + pid)
+ localSend(pid, msg)
+ run
+
+ case Link(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ linkFromLocal(from, to)
+ run
+
+ case UnLink(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ unlinkFromLocal(from, to)
+ run
+
+ case Exit1(from:RemotePid, to:RemotePid, reason) =>
+ // check if "to" traps exit signals
+ // if so send a message
+ if (trapExits.contains(to.localId))
+ // convert signal into message
+ localSend(to, Exit1(from, to, reason))
+ else
+ if (reason.name.equals("normal")) {
+ // ignore signal
+ }
+ else
+ exit(from, to, reason)
+ run
+ }*/
+
+ // TODO
+ /*def isReachable(remoteNode: Node): boolean = {
+ val pingMsg = new Ping(node)
+ val sw = new StringWriter
+ service.serializer.serialize(pingMsg, sw)
+ service.send(remoteNode, sw.toString())
+ }*/
+
+ def processMsg(msg: AnyRef): unit = synchronized {
+ msg match {
+ case Spawn(reply: RemotePid, pname) =>
+ val newPid = spawn(pname)
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+
+ case SpawnObject(reply: RemotePid, data) =>
+ //val sr = new StringReader(data)
+ //service.serializer.deserialize(sr) match {
+
+ service.serializer.deserialize(data) match {
+ case RA(actor) =>
+ val newPid = register(actor)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ actor.start
+ // need to send back the Pid
+ remoteSend(reply, newPid)
+ }
+
+ case NamedSend(sym: Symbol, data) =>
+ // look-up name
+ nameTable.get(sym) match {
+ case None =>
+ // message is lost
+ //Console.println("lost message " + msg + " because " + sym + " not registered.")
+ case Some(localId) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+ val msg = service.serializer.deserialize(data)
+ localSend(localId, msg)
+ }
+
+ case Send(pid: RemotePid, data) =>
+ // deserialize data
+ //val sr = new StringReader(data)
+ //val msg = service.serializer.deserialize(sr)
+ val msg = service.serializer.deserialize(data)
+ //Console.println("locally send " + msg + " to " + pid)
+ localSend(pid, msg)
+
+ case Link(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ linkFromLocal(from, to)
+
+ case UnLink(from:RemotePid, to:RemotePid) =>
+ // assume from is local
+ unlinkFromLocal(from, to)
+
+ case Exit1(from:RemotePid, to:RemotePid, reason) =>
+ // check if "to" traps exit signals
+ // if so send a message
+ if (trapExits.contains(to.localId)) {
+ // convert signal into message
+ // TODO: simpler message (w/o to) for actor!
+ localSend(to, Exit1(from, to, reason))
+ }
+ else {
+ if (reason.name.equals("normal")) {
+ // ignore signal
+ }
+ else
+ exit(from, to, reason)
+ }
+ }
+ }
+
+ /* Registers an instance of a remote actor inside this NetKernel.
+ */
+ def register(newProc: RemoteActor): RemotePid = synchronized {
+ newProc.kernel = this
+ val newPid = service.createPid(newProc)
+ rtable += newPid.localId -> newProc
+ pidTable += newProc -> newPid
+ newPid
+ }
+
+ // local spawn
+ def spawn(pname: String): RemotePid = synchronized {
+ // get constructor out of table
+ (ptable.get(pname)) match {
+ case None =>
+ //error("No constructor found. Cannot start process.")
+ //null
+
+ val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor]
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
+
+ case Some(cons) =>
+ val newProc = cons()
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
+ }
+ }
+
+ // local spawn
+ def spawn(name: String, arg: RemotePid): RemotePid = synchronized {
+ val newPid = spawn(name)
+ localSend(newPid, arg)
+ newPid
+ }
+
+ // assume this.node != node
+ def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): unit = {
+ val ra = RA(a)
+ //val rsw = new StringWriter
+ //service.serializer.serialize(ra, rsw)
+ val bytes = service.serializer.serialize(ra)
+ //sendToNode(node, SpawnObject(replyTo, rsw.toString()))
+ sendToNode(node, SpawnObject(replyTo, bytes))
+ }
+
+
+ def registerSerializer(index: String, rep: Serializer => AnyRef) = {
+ service.serializer.addRep(index, rep)
+
+ // send registering requests to remote nodes
+ }
+
+ // remote spawn
+ def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized {
+ // check if actor is to be spawned locally
+ if (node == this.node) {
+ val newPid = spawn(name)
+ newPid
+ }
+ else {
+ sendToNode(node, Spawn(replyTo, name))
+ null // pid needs to be sent back
+ }
+ }
+
+ /* Spawns a new actor (locally), executing "fun".
+ */
+ def spawn(fun: RemoteActor => unit): RemotePid = synchronized {
+ val newProc = new RemoteActor {
+ override def run: unit =
+ fun(this);
+ }
+
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+ newProc.start
+ newPid
+ }
+
+ /* Spawns a new actor (locally), executing "fun".
+ */
+ def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized {
+ val newProc = new RemoteActor {
+ override def run: unit =
+ fun(this);
+ }
+
+ // create Pid for remote communication and register process
+ val newPid = register(newProc)
+ //Console.println("Spawned a new " + newProc + " (" + newPid + ")")
+
+ // link new process to pid (assume pid is local)
+ link(pid, newPid)
+
+ newProc.start
+ newPid
+ }
+
+ // maps local ids to their linked pids
+ private val links = new HashMap[int,HashSet[RemotePid]];
+
+ // which of the local processes traps exit signals?
+ private val trapExits = new HashSet[int];
+
+ def processFlag(pid: RemotePid, flag: Symbol, set: boolean) = synchronized {
+ if (flag.name.equals("trapExit")) {
+ if (trapExits.contains(pid.localId) && !set)
+ trapExits -= pid.localId
+ else if (!trapExits.contains(pid.localId) && set)
+ trapExits += pid.localId
+ }
+ }
+
+ // assume from.node == this.node
+ private def unlinkFromLocal(from: RemotePid, to: RemotePid): unit =
+ links.get(from.localId) match {
+ case None =>
+ // has no links -> ignore
+ case Some(set) =>
+ set -= to
+ if (set.size == 0) links -= from.localId
+ };
+
+ /*
+ unlinks bi-directional link
+ assume from.node == this.node
+ */
+ def unlink(from: RemotePid, to: RemotePid): unit = synchronized {
+ unlinkFromLocal(from, to)
+ if (to.node == this.node)
+ unlinkFromLocal(to, from)
+ else
+ // (2) send message to NetKernel of "to" to unlink a
+ // uni-directional link from "to" to "from"
+ sendToNode(to.node, UnLink(to, from))
+ }
+
+ // assume from.node == this.node
+ private def linkFromLocal(from: RemotePid, to: RemotePid): unit =
+ // TODO: send Exit to from if to is invalid
+ links.get(from.localId) match {
+ case None =>
+ // from has no links, yet
+ val linksTo = new HashSet[RemotePid]
+ linksTo += to
+ links += from.localId -> linksTo
+ case Some(set) =>
+ set += to
+ };
+
+ /*
+ creates bi-directional link
+ assume from.node == this.node
+ */
+ def link(from: RemotePid, to: RemotePid): unit = synchronized {
+ // (1) create locally a uni-directional link
+ linkFromLocal(from, to)
+ if (to.node == this.node)
+ linkFromLocal(to, from)
+ else
+ // (2) send message to NetKernel of "to" to create a
+ // uni-directional link from "to" to "from"
+ sendToNode(to.node, Link(to, from))
+ }
+
+ // Assume "to" is local.
+ def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = {
+ // remove link
+ unlinkFromLocal(to, from)
+ exit(to, reason)
+ }
+
+ val exitMarks = new HashSet[RemotePid]
+
+ /*
+ If reason is unequal to 'normal then
+ this will cause all linked processes to
+ (transitively) terminate abnormally.
+
+ Assume pid is local.
+ */
+ def exit(pid: RemotePid, reason: Symbol): unit = synchronized {
+ if (!(exitMarks contains pid)) {
+ exitMarks += pid // mark pid as exiting
+ //Console.println("" + pid + " is exiting (" + reason + ").")
+
+ // first look-up remote actor in rtable
+ val actor = rtable(pid.localId)
+ // remove from table of running processes
+ rtable -= pid.localId
+ // remove from pid table
+ pidTable -= actor
+
+ // send exit signals to linked processes
+ links.get(pid.localId) match {
+ case None =>
+ //Console.println("no linked processes")
+
+ case Some(set) => // set of remote pids that we want to terminate
+ //Console.println("sending exit signals to linked processes")
+
+ val iter = set.elements
+ while (iter.hasNext) {
+ val linkedPid = iter.next
+
+ unlinkFromLocal(pid, linkedPid)
+
+ if (linkedPid.node == this.node) {
+ unlinkFromLocal(linkedPid, pid)
+
+ if (trapExits.contains(linkedPid.localId))
+ localSend(linkedPid, Exit1(pid, linkedPid, reason))
+ else if (!reason.name.equals("normal"))
+ exit(linkedPid, reason)
+ }
+ else
+ sendToNode(linkedPid.node,
+ Exit1(pid, linkedPid, reason))
+ }
+ exitMarks -= pid
+ }
+ }
+ }
+
+ private val monNodes =
+ new HashMap[Node,HashMap[RemotePid,int]];
+
+ def monitorNode(client: RemotePid, mnode: Node, cond: boolean) = synchronized {
+ monNodes.get(mnode) match {
+ case None =>
+ // nobody is monitoring this node
+ if (cond) {
+ val map = new HashMap[RemotePid,int]
+ map += client -> 1
+ monNodes += mnode -> map
+ }
+ case Some(map) =>
+ map.update(client, map(client) + (if (cond) 1 else -1))
+ }
+
+ // if no connection exists:
+ // try connecting, if it fails deliver nodedown msg
+ if (cond && !service.isConnected(mnode)) {
+ try {
+ service.connect(mnode)
+ }
+ catch {
+ case uhe: UnknownHostException =>
+ nodeDown(mnode)
+ case ioe: IOException =>
+ nodeDown(mnode)
+ case se: SecurityException =>
+ nodeDown(mnode)
+ }
+ }
+ }
+
+ def nodeDown(mnode: Node) = {
+ // send NodeDown msg to registered RemotePids
+ monNodes.get(mnode) match {
+ case None =>
+ // nobody is monitoring this node
+ case Some(map) =>
+ // iterate over keys (RemotePids of interested clients)
+ val iter = map.keys
+ while (iter.hasNext) {
+ val client = iter.next
+ for (val i <- List.range(0, map(client))) {
+ // send nodedown msg
+ client ! Pair(NodeDown(), mnode)
+ }
+ }
+ }
+ }
+
+}
diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala
new file mode 100644
index 0000000000..f407bdfd21
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Node.scala
@@ -0,0 +1,6 @@
+package scala.actors.distributed;
+
+[serializable] abstract class Node;
+
+[serializable] case class TcpNode(address: String, port: int) extends Node;
+[serializable] case class JXTANode(name: String) extends Node;
diff --git a/src/actors/scala/actors/distributed/NodeComb.scala b/src/actors/scala/actors/distributed/NodeComb.scala
new file mode 100644
index 0000000000..c18540c6f9
--- /dev/null
+++ b/src/actors/scala/actors/distributed/NodeComb.scala
@@ -0,0 +1,12 @@
+package scala.actors.distributed;
+
+import scala.actors.distributed.picklers.BytePickle._;
+
+object NodeComb {
+ def tcpNodePU: SPU[TcpNode] =
+ wrap((p: Pair[String,int]) => TcpNode(p._1, p._2),
+ (n: TcpNode) => Pair(n.address, n.port), pair(string, nat));
+ def jxtaNodePU: SPU[JXTANode] =
+ wrap((s: String) => JXTANode(s),
+ (n: JXTANode) => n.name, string);
+}
diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala
new file mode 100644
index 0000000000..ef3f98db70
--- /dev/null
+++ b/src/actors/scala/actors/distributed/RemoteActor.scala
@@ -0,0 +1,168 @@
+package scala.actors.distributed
+
+import scala.actors.multi.MailBox
+import scala.actors.multi.Actor
+import scala.actors.multi.Pid
+import scala.actors.multi.LocalPid
+import scala.actors.multi.ExcHandlerDesc
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Stack
+
+abstract class ServiceName
+case class JXTA(groupName: String) extends ServiceName
+case class TCP() extends ServiceName
+
+class RemoteActor extends Actor {
+ override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ // locality check (handler local to this actor?)
+ if (destDesc.pid == self)
+ handleExc(destDesc, e)
+ else
+ kernel.forwardExc(destDesc, e)
+ }
+
+ override def receive(f: PartialFunction[Message,unit]): scala.All = {
+ if (isAlive) {
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ try {
+ f(msg)
+ }
+ catch {
+ case d: Done =>
+ throw new Done
+ case t: Throwable =>
+ if (!excHandlerDescs.isEmpty)
+ forwardExc(excHandlerDescs.top, t)
+ else
+ die(Symbol(t.toString()))
+ }
+ die()
+ case None =>
+ continuation = f
+ Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ var kernel: NetKernel = null;
+
+ def node =
+ self.node
+
+ def nodes = kernel.nodes
+
+ private var selfCached: RemotePid = null
+
+ override def self: RemotePid = {
+ if (selfCached == null)
+ selfCached = kernel pidOf this
+ selfCached
+ }
+
+ def serialize(index: String, rep: Serializer => AnyRef) =
+ kernel.registerSerializer(index, rep);
+
+ def alive(s: ServiceName): unit = {
+ var service: Service = null
+ s match {
+ case TCP() =>
+ val serv = new TcpService(TcpService.generatePort)
+ service = serv
+ serv.start()
+ /*case JXTA(groupName) =>
+ val serv = new ch.epfl.lamp.scala.actors.jxta.JXTAService("AliveActor"
+ + new java.util.Date().getTime() + "-"
+ + new java.util.Random().nextInt(1000),
+ java.util.logging.Level.FINEST) {
+ //millis before we give up group creation and create the group.
+ override def TIME_BEFORE_AUTO_GROUP_CREATE: long = 30000
+ val PIPE_ID:String="1"
+ //val ADV_LIFETIME:long = 1 * 60 * 60 * 1000 //millis to keep advertisements ...
+ override def MY_GROUP_NAME:String = groupName
+ /*val SENDER_MESSAGE = "PalcomDemo"; //used to identify the message element in jxta messages
+ val PIPE_BASE_ID:String = "PIPE4Pal4"+MY_GROUP_NAME;
+ val MESSAGE_THRESHOLD = 5;*/
+ }
+ service = serv
+ serv.start()*/
+ case _ => throw new Exception ("Unknown Service in RemoteActor")
+ }
+ // create RemotePid
+ selfCached = service.kernel.register(this)
+ }
+
+ def node(pid: Pid): Node = pid match {
+ case rpid: RemotePid => rpid.node
+ case lpid: LocalPid => null
+ }
+
+ def disconnectNode(node: Node) =
+ kernel.disconnectNode(node);
+ //does not call start def of Actor
+ def register(name: Symbol, pid: RemotePid): unit =
+ kernel.registerName(name, pid);
+
+ //calls start def of Actor
+ def register(name: Symbol, a: RemoteActor): unit =
+ kernel.registerName(name, a);
+
+ def name(node: Node, sym: Symbol): Name =
+ Name(node, sym, kernel)
+
+ def spawn(node: Node, name: String): RemotePid =
+ kernel.spawn(self, node, name);
+
+ def spawn(node: Node, a: RemoteActor): unit =
+ kernel.spawn(self, node, a);
+
+ def spawn(fun: RemoteActor => unit): RemotePid =
+ kernel.spawn(fun);
+
+ def spawn(a: RemoteActor): RemotePid = {
+ val pid = kernel.register(a)
+ a.start
+ pid
+ }
+
+ def spawnLink(fun: RemoteActor => unit): RemotePid =
+ kernel.spawnLink(self, fun);
+
+ def monitorNode(node: Node, cond: boolean) =
+ kernel.monitorNode(self, node, cond);
+
+ // this should be:
+ // self.link(pid)
+ // if self is RemotePid it will invoke NetKernel
+
+ def link(pid: RemotePid): unit =
+ kernel.link(self, pid);
+
+ def unlink(pid: RemotePid): unit =
+ kernel.unlink(self, pid);
+
+ override def exit(reason: Symbol): unit =
+ kernel.exit(self, reason);
+
+ override def processFlag(flag: Symbol, set: boolean) =
+ kernel.processFlag(self, flag, set);
+
+ override def die(reason: Symbol) = {
+ if (isAlive) {
+ isAlive = false
+ Debug.info("" + this + " died.")
+ kernel.exit(self, reason)
+ }
+ }
+
+ override def die() = {
+ if (isAlive) {
+ isAlive = false
+ Debug.info("" + this + " died.")
+ kernel.exit(self, 'normal)
+ }
+ }
+}
diff --git a/src/actors/scala/actors/distributed/RemotePid.scala b/src/actors/scala/actors/distributed/RemotePid.scala
new file mode 100644
index 0000000000..bb70e3f4a5
--- /dev/null
+++ b/src/actors/scala/actors/distributed/RemotePid.scala
@@ -0,0 +1,184 @@
+package scala.actors.distributed
+
+import scala.actors.multi.Pid
+import scala.actors.multi.MailBox
+import scala.actors.multi.ExcHandlerDesc
+
+import java.io._
+
+[serializable]
+abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends Pid {
+ def this() = this(0, null, null) // for serialization
+
+ private var _locId = locId
+
+ override def equals(that: Any) = that match {
+ case rpid: RemotePid =>
+ (this.node == rpid.node && this.localId == rpid.localId)
+ case _ => false
+ }
+
+ //[throws(classOf[IOException])]
+ private def writeObject(out: ObjectOutputStream): Unit = {
+ //Console.println("writing locID"+locId)
+ out.writeInt(locId)
+ }
+
+ //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
+ private def readObject(in: ObjectInputStream): Unit = {
+ _locId = in.readInt()
+ //Console.println("read _locID"+_locId)
+ }
+
+ //[throws(classOf[ObjectStreamException])]
+ private def readResolve(): AnyRef = {
+ Console.println("readResolve")
+ null
+ //build nothing. Subclasses will do...
+ }
+
+ def node: Node;
+
+ def localId: int = locId;
+
+ def kernel = kern;
+
+ def !(msg: MailBox#Message): unit = {
+ //Console.println("! " + msg)
+ if (actor != null)
+ actor send msg
+ else
+ kernel.remoteSend(this, msg)
+ }
+
+ def link(other: Pid): unit =
+ other match {
+ case rpid: RemotePid =>
+ kernel.link(this, rpid)
+ };
+
+ //TODO (dont know if this is local to kernel.node)
+ def linkTo(other: Pid): unit =
+ other match {
+ case rpid: RemotePid =>
+ // do nothing
+ };
+
+ def unlink(other: Pid): unit =
+ other match {
+ case rpid: RemotePid =>
+ kernel.unlink(this, rpid)
+ };
+
+ //TODO (dont know if this is local to kernel.node)
+ def unlinkFrom(other: Pid): unit =
+ other match {
+ case rpid: RemotePid =>
+ // do nothing
+ };
+
+ def exit(reason: Symbol): unit = kernel.exit(this, reason);
+ def exit(from: Pid, reason: Symbol): unit = {
+ from match {
+ case rpid: RemotePid =>
+ kernel.exit(rpid, reason);
+ }
+ }
+
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {}
+}
+
+[serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) {
+ def node: TcpNode = n;
+
+ private var _locId = locId
+ private var _node = n
+
+ override def equals(that: Any) =
+ super.equals(that)
+
+ //[throws(classOf[IOException])]
+ private def writeObject(out: ObjectOutputStream): Unit = {
+ out.writeInt(locId)
+ out.writeObject(n)
+ }
+
+ //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
+ private def readObject(in: ObjectInputStream): Unit = {
+ _locId = in.readInt()
+ _node = in.readObject().asInstanceOf[TcpNode]
+ }
+
+ //[throws(classOf[ObjectStreamException])]
+ private def readResolve(): AnyRef = {
+ val kernel = NetKernel.kernel;
+ //TODO val actor = kernel.getLocalRef(_locId)
+ TcpPid(_node, _locId, kernel, actor)
+ }
+}
+
+[serializable] case class JXTAPid(n: JXTANode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) {
+ def node: JXTANode = n;
+
+ private var _locId = locId
+ private var _node = n
+
+ override def equals(that: Any) =
+ super.equals(that)
+
+ //[throws(classOf[IOException])]
+ private def writeObject(out: ObjectOutputStream): Unit = {
+ out.writeInt(locId)
+ out.writeObject(n)
+ }
+
+ //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
+ private def readObject(in: ObjectInputStream): Unit = {
+ _locId = in.readInt()
+ _node = in.readObject().asInstanceOf[JXTANode]
+ }
+
+ //[throws(classOf[ObjectStreamException])]
+ private def readResolve(): AnyRef = {
+ val kernel = NetKernel.kernel;
+ //TODO val actor = kernel.getLocalRef(_locId)
+ JXTAPid(_node, _locId, kernel, actor)
+ }
+}
+
+//================================================================================
+
+object CaseTest {
+
+ def getBytes(obj: AnyRef): Array[byte] = {
+ val bos = new ByteArrayOutputStream()
+ val out = new ObjectOutputStream(bos)
+ out.writeObject(obj)
+ out.flush()
+ bos.toByteArray()
+ }
+
+ def getObject(a: Array[byte]): AnyRef = {
+ val bis = new ByteArrayInputStream(a)
+ val in = new ObjectInputStream(bis)
+ val obj = in.readObject()
+ obj
+ }
+
+ def main(args: Array[String]): Unit = {
+ val node = JXTANode ("test node");
+ val pid1 = JXTAPid (node, 4, null, null);
+ val pid2 = JXTAPid (node, 4, new NetKernel(null), null);
+
+ Console.println("node Before: " + node)
+ Console.println("node After : " + getObject(getBytes(node)))
+
+ Console.println("pid1 Before: " + pid1)
+ Console.println("pid1 After : " + getObject(getBytes(pid1)))
+
+ Console.println("pid2 Before: " + pid2)
+ Console.println("pid2 After : " + getObject(getBytes(pid2)))
+
+ Console.println("pid2 After : " + getObject((new String (getBytes(pid2))).getBytes))
+ }
+}
diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala
new file mode 100644
index 0000000000..868d996131
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Serializer.scala
@@ -0,0 +1,53 @@
+package scala.actors.distributed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+
+import scala.actors.distributed.picklers.BytePickle.SPU;
+import scala.actors.multi.Pid;
+
+abstract class Serializer(s: Service) {
+ def serialize(o: AnyRef/*, w: Writer*/): Array[byte];
+ def deserialize(a: Array[byte]/*r: Reader*/): AnyRef;
+
+ // throws IOException
+ def readBytes(inputStream: DataInputStream): Array[byte] = {
+ try {
+ val length = inputStream.readInt();
+ val bytes = new Array[byte](length);
+ inputStream.readFully(bytes, 0, length);
+ return bytes;
+ }
+ catch {
+ case npe: NullPointerException => {
+ throw new EOFException("Connection closed.");
+ }
+ }
+ }
+
+ // throws IOException, ClassNotFoundException
+ def readObject(inputStream: DataInputStream): AnyRef = {
+ val bytes = readBytes(inputStream);
+ deserialize(bytes);
+ }
+
+ // throws IOException
+ def writeBytes(outputStream: DataOutputStream, bytes: Array[byte]): unit = {
+ val length = bytes.length;
+ // original length
+ outputStream.writeInt(length);
+ outputStream.write(bytes, 0, length);
+ outputStream.flush();
+ }
+
+ // throws IOException
+ def writeObject(outputStream: DataOutputStream, obj: AnyRef) = {
+ val bytes = serialize(obj);
+ writeBytes(outputStream, bytes);
+ }
+
+ def pid: SPU[Pid];
+ def service = s;
+ def addRep(name: String, repCons: Serializer => AnyRef): unit;
+}
diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala
new file mode 100644
index 0000000000..23a09a09c3
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Service.scala
@@ -0,0 +1,72 @@
+package scala.actors.distributed;
+
+import java.io.StringWriter;
+
+trait Service {
+ val serializer: Serializer;
+ def node: Node;
+ def createPid(actor: RemoteActor): RemotePid;
+ def send(node: Node, data: Array[byte]): unit;
+ def connect(node: Node): unit; // non blocking.
+ def disconnectNode(node: Node): unit;
+ def isConnected(node: Node): boolean;
+
+ //blocking. timeout depends on Implementation.
+ def isReachable(node: Node): boolean;
+
+ def getRoundTripTimeMillis(node:Node): long; //blocking
+
+ def nodes:List[Node]
+
+// implemented parts:
+
+ private val kern = new NetKernel(this);
+ def kernel = kern;
+
+ def spawn(name: String): RemotePid =
+ kern spawn name;
+
+ def spawn(name: String, arg: RemotePid): RemotePid =
+ kern.spawn(name, arg);
+
+
+ //suggested addition by seb
+ def spawn(fun: RemoteActor => unit): RemotePid =
+ kernel.spawn(fun);
+ def spawn(a:RemoteActor):RemotePid = {
+ //Console.println("Service:spawn(RemoteActor)")
+ val pid = kernel.register(a)
+ //Console.println("RemoteActor("+a+") registered in kernel")
+ a.start
+ //Console.println("RemoteActor("+a+") started")
+ pid
+ }
+
+ def send(pid: RemotePid, msg: AnyRef): unit = synchronized {
+ if (pid.node == this.node)
+ kernel.localSend(pid, msg)
+ else
+ kernel.remoteSend(pid, msg)
+ }
+
+ def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized {
+ //Console.println("Service: Sending " + msg + " to " + pid)
+ // lets try to serialize the message
+ //val sw = new StringWriter
+ //serializer.serialize(msg, sw)
+ val bytes = serializer.serialize(msg)
+ //val sendMsg = Send(pid, sw.toString())
+ val sendMsg = Send(pid, bytes)
+ //val sw2 = new StringWriter
+ //serializer.serialize(sendMsg, sw2)
+ //send(pid.node, sw2.toString())
+ val bytes2 = serializer.serialize(sendMsg)
+ send(pid.node, bytes2)
+ }
+
+ private var idCnt = 0;
+ def makeUid = { idCnt = idCnt + 1; idCnt }
+
+
+
+}
diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
new file mode 100644
index 0000000000..9d806c8c9c
--- /dev/null
+++ b/src/actors/scala/actors/distributed/TcpSerializerComb.scala
@@ -0,0 +1,126 @@
+package scala.actors.distributed;
+
+import java.io._;
+import scala.collection.mutable._;
+
+import scala.actors.distributed.picklers.BytePickle._;
+
+import scala.actors.distributed.MessagesComb._;
+import scala.actors.distributed.NodeComb._;
+import scala.actors.multi._;
+
+//import scala.actors.distributed.examples.CounterMessagesComb._;
+
+//TODO: change Service to NetKernel in Serializer interface
+class TcpSerializerComb(serv: Service) extends Serializer(serv) {
+
+ private def lookup(typename: String): PU[AnyRef] = {
+ val op = table.get(typename)
+ op match {
+ case None =>
+ error("No type representation found.")
+ null
+ case Some(rep) =>
+ val repr = rep.asInstanceOf[PU[AnyRef]]
+ repr
+ }
+ }
+
+ private def lookup(r: Reader): PU[AnyRef] = {
+ // read length of type name
+ val carr = new Array[char](8)
+ r.read(carr)
+ val len = Util.decode(new String(carr))
+ val content = new Array[char](len)
+ r.read(content)
+ lookup(new String(content))
+ }
+
+ def pid: SPU[Pid] = {
+ val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel,
+ if (p._1 == serv.node) serv.kernel.getLocalRef(p._2)
+ else null),
+ (t: TcpPid) => Pair(t.node, t.localId),
+ pair(tcpNodePU, nat));
+
+ wrap((p:Pid) => p, (pid:Pid) => pid match {
+ case tpid: TcpPid =>
+ tpid
+ case other =>
+ error("no instance of TcpPid!!")
+ }, nodeIntPU)
+ }
+
+ def anyRef: SPU[AnyRef] =
+ wrap((typename: String) => Class.forName(typename).newInstance().asInstanceOf[AnyRef],
+ (obj: AnyRef) => Util.baseName(obj),
+ string);
+
+ def actorPU: SPU[RA] =
+ wrap((typename: String) => RA(Class.forName(typename).newInstance().asInstanceOf[RemoteActor]),
+ (obj: RA) => Util.baseName(obj.a),
+ string);
+
+ val log = new Debug("TcpSerializerComb")
+ log.level = 3
+
+ val table = new HashMap[String, AnyRef]
+
+ initialize
+ def initialize = {
+ table += "int" -> nat
+ table += "Send" -> sendPU(this)
+ table += "Spawn" -> spawnPU(this)
+ table += "TcpNode" -> tcpNodePU
+ table += "TcpPid" -> pid
+ table += "Exit" -> exitPU(this)
+ table += "AnyRef" -> anyRef
+
+ table += "RA" -> actorPU
+ table += "SpawnObject" -> spawnObjectPU(this)
+
+ //table += "Incr" -> incrPU(this)
+ //table += "Value" -> valuePU(this)
+ //table += "Result" -> resultPU(this)
+ }
+
+ def addRep(name: String, repCons: Serializer => AnyRef) =
+ table.update(name, repCons(this));
+
+ def +=(name: String) =
+ new InternalMapTo(name);
+
+ class InternalMapTo(name: String) {
+ def ->(repCons: Serializer => AnyRef): unit =
+ table.update(name, repCons(TcpSerializerComb.this));
+ }
+
+ def serialize(o: AnyRef): Array[byte] = {
+ log.info("pickling value of type " + Util.baseName(o));
+ val op = table.get(Util.baseName(o));
+ op match {
+ case None => error("No type representation for " + Util.baseName(o) + " found. Cannot serialize.");
+ case Some(rep) =>
+ // first write type name
+ val bytes = pickle(string, Util.baseName(o))
+ val repr = rep.asInstanceOf[SPU[AnyRef]];
+ log.info("using type representation " + repr);
+ val res = repr.appP(o, new PicklerState(bytes, new PicklerEnv)).stream
+ res
+ }
+
+ }
+
+ def deserialize(bytes: Array[byte]): AnyRef = {
+ val ups = string.appU(new UnPicklerState(bytes, new UnPicklerEnv))
+ val typename = ups._1
+ table.get(typename) match {
+ case None => error("No type representation for " + typename + " found. Cannot deserialize.")
+ case Some(rep) =>
+ val repr = rep.asInstanceOf[SPU[AnyRef]];
+ val obj = repr.appU(ups._2)._1
+ log.info("unpickling successful")
+ obj
+ }
+ }
+}
diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala
new file mode 100644
index 0000000000..d714eba314
--- /dev/null
+++ b/src/actors/scala/actors/distributed/TcpService.scala
@@ -0,0 +1,215 @@
+package scala.actors.distributed;
+
+import java.net._;
+import java.io._;
+
+import java.util.logging._;
+
+object TcpService {
+ val random = new java.util.Random(0)
+
+ def generatePort: int = {
+ var portnum = 0
+ try {
+ portnum = 8000 + random.nextInt(500)
+ val socket = new ServerSocket(portnum)
+ socket.close()
+ }
+ catch {
+ case ioe: IOException =>
+ // this happens when trying to open a socket twice at the same port
+ // try again
+ generatePort
+ case se: SecurityException =>
+ // do nothing
+ }
+ portnum
+ }
+}
+
+object TestPorts {
+ def main(args: Array[String]): unit = {
+ val random = new java.util.Random(0)
+ val socket = new ServerSocket(8000 + random.nextInt(500))
+ Console.println(TcpService.generatePort)
+ }
+}
+
+class TcpService(port: int) extends Thread with Service {
+ val serializer: JavaSerializer = new JavaSerializer(this);
+
+ private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port);
+ def node: TcpNode = internalNode;
+
+ def createPid(actor: RemoteActor): RemotePid =
+ new TcpPid(internalNode, makeUid, kernel, actor)
+
+ def send(node: Node, data: String): unit = synchronized {
+ // retrieve worker thread (if any) that already has connection
+ node match {
+ case tnode: TcpNode =>
+ getConnection(tnode) match {
+ case None =>
+ // we are not connected, yet
+ Console.println("We are not connected, yet.");
+ val newWorker = connect(tnode); //bad in a sync BLOCK!!!
+ newWorker transmit data
+ case Some(worker) => worker transmit data
+ }
+ case any => error("no TcpNode!");
+ }
+ }
+
+ def send(node: Node, data: Array[byte]): unit = synchronized {
+ // retrieve worker thread (if any) that already has connection
+ node match {
+ case tnode: TcpNode =>
+ getConnection(tnode) match {
+ case None =>
+ // we are not connected, yet
+ Console.println("We are not connected, yet.");
+ val newWorker = connect(tnode); //bad in a sync BLOCK!!!
+ newWorker transmit data
+ case Some(worker) => worker transmit data
+ }
+ case any => error("no TcpNode!");
+ }
+ }
+
+ override def run(): unit = {
+ try {
+ val socket = new ServerSocket(port);
+ Console.println("Tcp Service started: " + node);
+
+ while (true) {
+ val nextClient = socket.accept();
+ Console.println("Received request from " + nextClient.getInetAddress() + ":" + nextClient.getPort());
+
+ // this is bad because client will have other port than actual node
+ // solution: worker should read node from stream
+ // and call main thread to update connection table
+
+ // spawn new worker thread
+ val worker = new TcpServiceWorker(this, nextClient);
+ worker.readNode;
+ // start worker thread
+ worker.start()
+ }
+ }
+ catch {
+ case ioe:IOException => {
+ // do nothing
+ }
+ case sec:SecurityException => {
+ // do nothing
+ }
+ }
+ }
+
+ // connection management
+
+ private val connections = new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker];
+
+ def nodes:List[Node] = throw new Exception ("nodes need to be implemented in TcpService!")
+
+ def addConnection(n: TcpNode, w: TcpServiceWorker) = synchronized {
+ connections += n -> w
+ }
+
+ def getConnection(n: TcpNode) = synchronized {
+ connections.get(n)
+ }
+
+ def isConnected(n: Node): boolean = synchronized {
+ n match {
+ case tnode: TcpNode =>
+ connections.get(tnode) match {
+ case None => false
+ case Some(x) => true
+ }
+ case _ => false
+ }
+ }
+
+ def connect(n: Node): unit = synchronized {
+ n match {
+ case tnode: TcpNode =>
+ connect(tnode)
+ }
+ }
+
+ def connect(n: TcpNode): TcpServiceWorker = synchronized {
+ Console.println("" + node + ": Connecting to node " + n + " ...")
+ val sock = new Socket(n.address, n.port)
+ Console.println("Connected.")
+ // spawn new worker thread
+ val worker = new TcpServiceWorker(this, sock)
+ worker.sendNode;
+ // start worker thread
+ worker.start()
+ // register locally (we want to reuse connections which correspond to connected sockets)
+ // update connection table
+ addConnection(n, worker)
+ worker
+ }
+
+ def disconnectNode(n: Node) = synchronized {
+ n match {
+ case node: TcpNode =>
+ Console.println("Disconnecting from " + node + " ...")
+ connections.get(node) match {
+ case None => Console.println("Cannot disconnect from " + node + ". Not connected.")
+ case Some(worker) =>
+ //TODO: sending disconnect message
+ //worker.sendDisconnect;
+ // update table
+ connections -= node
+ Console.println("Halting worker...")
+ worker.halt
+ }
+ case any => error("No TcpNode!!");
+ }
+ }
+
+ def isReachable(node: Node): boolean =
+ if (isConnected(node)) true
+ else try {
+ connect(node)
+ return true
+ }
+ catch {
+ case uhe: UnknownHostException =>
+ false
+ case ioe: IOException =>
+ false
+ case se: SecurityException =>
+ false
+ }
+
+ def getRoundTripTimeMillis(node: Node): long = 0
+
+ def nodeDown(mnode: TcpNode): unit = synchronized {
+ kernel nodeDown mnode
+ connections -= mnode
+ }
+
+ /*def closeConnection(worker: TcpServiceWorker): unit = synchronized {
+ connections.get(worker) match {
+ case None =>
+ System.err.println("Worker " + worker + " not registered.");
+ case Some(socket) => {
+ try {
+ socket.close();
+ connections -= worker
+ }
+ catch {
+ case ioe:IOException =>
+ System.err.println("Couldn't close connection.");
+ connections -= worker
+ }
+ }
+ };
+ System.out.println("OK. Connection closed.")
+ }*/
+
+}
diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
new file mode 100644
index 0000000000..4346b09d61
--- /dev/null
+++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala
@@ -0,0 +1,86 @@
+package scala.actors.distributed;
+
+import java.net._;
+import java.io._;
+
+class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
+ val in = so.getInputStream();
+ val out = so.getOutputStream();
+
+ val datain = new DataInputStream(in);
+ val dataout = new DataOutputStream(out);
+
+ val reader = new BufferedReader(new InputStreamReader(in));
+ val writer = new PrintWriter(new OutputStreamWriter(out));
+
+ val log = new Debug("TcpServiceWorker")
+ log.level = 2
+
+ def transmit(msg: Send): unit = synchronized {
+ val data = parent.serializer.serialize(msg);
+ transmit(data);
+ }
+
+ def transmit(data: String): unit = synchronized {
+ log.info("Transmitting " + data)
+ writer.write(data)
+ writer.flush()
+ }
+
+ def transmit(data: Array[byte]): unit = synchronized {
+ log.info("Transmitting " + data)
+ dataout.writeInt(data.length);
+ dataout.write(data)
+ dataout.flush()
+ }
+
+ def sendNode = {
+ Console.println("Sending our name " + parent.node);
+ parent.serializer.writeObject(dataout, parent.node);
+ }
+
+ var connectedNode: TcpNode = _;
+
+ def readNode = {
+ Console.println("" + parent.node + ": Reading node name...");
+ //val node = parent.serializer.deserialize(reader);
+ val node = parent.serializer.readObject(datain);
+ Console.println("Connection from " + node);
+ node match {
+ case n: TcpNode => {
+ connectedNode = n
+ Console.println("Adding connection to " + node + " to table.");
+ parent.addConnection(n, this)
+ }
+ }
+ }
+
+ var running = true;
+ def halt = synchronized {
+ so.close(); // close socket
+ running = false; // stop
+ }
+
+ override def run(): unit = {
+ try {
+ while (running) {
+ if (in.available() > 0) {
+ log.info("deserializing...");
+ //val msg = parent.serializer.deserialize(reader);
+ val msg = parent.serializer.readObject(datain);
+ log.info("Received object: " + msg);
+ parent.kernel.processMsg(msg)
+ }
+ }
+ }
+ catch {
+ case ioe:IOException =>
+ Console.println("" + ioe + " while reading from socket.");
+ parent nodeDown connectedNode
+ case e:Exception =>
+ // catch-all
+ Console.println("" + e + " while reading from socket.");
+ parent nodeDown connectedNode
+ }
+ }
+}
diff --git a/src/actors/scala/actors/distributed/Util.scala b/src/actors/scala/actors/distributed/Util.scala
new file mode 100644
index 0000000000..a54e455edf
--- /dev/null
+++ b/src/actors/scala/actors/distributed/Util.scala
@@ -0,0 +1,58 @@
+package scala.actors.distributed;
+
+import java.io._;
+import scala.collection.mutable._;
+
+object Util {
+ def pad(s: String, req: int): String = {
+ val buf = new StringBuffer;
+ val add: int = req - s.length();
+ for (val i <- List.range(1, add+1))
+ buf append "0";
+ buf append s;
+ buf.toString()
+ }
+
+ def encode(i: int) = pad(Integer.toHexString(i), 8);
+ def encode(l: long) = pad(java.lang.Long.toHexString(l), 16);
+ def decode(s: String): int = Integer.decode("0x" + s).intValue();
+ def decodeLong(s: String): long = java.lang.Long.decode("0x" + s).longValue();
+
+ def baseName(o: Any) = {
+ val s = o.toString();
+
+ def baseName(s: String): String = {
+ if (s.indexOf('$') != -1)
+ baseName(s.substring(0,s.indexOf('$')))
+ else if (s.indexOf('(') != -1)
+ baseName(s.substring(0,s.indexOf('(')))
+ else if (s.indexOf('@') != -1)
+ baseName(s.substring(0,s.indexOf('@')))
+ else s
+ }
+
+ baseName(s)
+ }
+
+ def extractArgs(s: String): Buffer[String] = {
+ // extract strings between first-level commas
+ var level: int = 0;
+ val carr: Array[char] = s.toCharArray();
+ var buf = new StringBuffer; // current string
+ val args = new ArrayBuffer[String];
+
+ for (val i <- List.range(0,carr.length)) {
+ if ((level == 0) && (carr(i) == ',')) {
+ // argument finished
+ args += buf.toString();
+ buf = new StringBuffer
+ } else {
+ if (carr(i) == '(') level = level + 1;
+ if (carr(i) == ')') level = level - 1;
+ buf append carr(i)
+ }
+ }
+ args += buf.toString();
+ args
+ }
+}
diff --git a/src/actors/scala/actors/distributed/picklers/BytePickle.scala b/src/actors/scala/actors/distributed/picklers/BytePickle.scala
new file mode 100644
index 0000000000..796fc9a353
--- /dev/null
+++ b/src/actors/scala/actors/distributed/picklers/BytePickle.scala
@@ -0,0 +1,436 @@
+package scala.actors.distributed.picklers
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.ArrayBuffer
+
+/**
+ Pickler combinators.
+
+ Author: Philipp Haller <philipp.haller@epfl.ch>
+ */
+
+object BytePickle {
+ class PicklerState(val stream: Array[byte], val dict: PicklerEnv) {}
+ class UnPicklerState(val stream: Array[byte], val dict: UnPicklerEnv) {}
+
+ abstract class PU[t] {
+ def appP(a: t, state: Array[byte]): Array[byte];
+ def appU(state: Array[byte]): Pair[t, Array[byte]];
+ }
+
+ abstract class SPU[t] {
+ def appP(a: t, state: PicklerState): PicklerState;
+ def appU(state: UnPicklerState): Pair[t, UnPicklerState];
+ }
+
+ class PicklerEnv extends HashMap[Any, int] {
+ private var cnt: int = 64;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+ }
+
+ class UnPicklerEnv extends HashMap[int, Any] {
+ private var cnt: int = 64;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+ }
+
+ abstract class RefDef;
+ case class Ref() extends RefDef;
+ case class Def() extends RefDef;
+
+ def refDef: PU[RefDef] = new PU[RefDef] {
+ def appP(b: RefDef, s: Array[byte]): Array[byte] =
+ b match {
+ case Ref() => Array.concat(s, (List[byte](0)).toArray)
+ case Def() => Array.concat(s, (List[byte](1)).toArray)
+ };
+ def appU(s: Array[byte]): Pair[RefDef, Array[byte]] =
+ if (s(0) == 0) Pair(Ref(), s.subArray(1, s.length))
+ else Pair(Def(), s.subArray(1, s.length));
+ }
+
+ val REF = 0
+ val DEF = 1
+
+ def unat: PU[int] = new PU[int] {
+ def appP(n: int, s: Array[byte]): Array[byte] =
+ Array.concat(s, nat2Bytes(n));
+ def appU(s: Array[byte]): Pair[int, Array[byte]] = {
+ var num = 0
+ def readNat: int = {
+ var b = 0;
+ var x = 0;
+ do {
+ b = s(num)
+ num = num + 1
+ x = (x << 7) + (b & 0x7f);
+ } while ((b & 0x80) != 0);
+ x
+ }
+ Pair(readNat, s.subArray(num, s.length))
+ }
+ }
+
+ def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
+ def appP(v: a, state: PicklerState): PicklerState = {
+ /*
+ - is there some value equal to v associated with a location l in the pickle environment?
+ - yes: write REF-tag to outstream together with l
+ - no:
+ write DEF-tag to outstream
+ record current location l of outstream
+ --> serialize value
+ add entry to pickle environment, mapping v onto l
+ */
+ val pe = state.dict
+ pe.get(v) match {
+ case None =>
+ //Console.println("" + v + " is new")
+ //Console.println("writing DEF...")
+ val sPrime = refDef.appP(Def(), state.stream)
+ val l = pe.nextLoc()
+
+ //Console.println("applying pickler to state " + sPrime)
+ val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
+
+ //Console.println("updating dict (" + l + ") for " + v)
+ pe.update(v, l)
+
+ return sPrimePrime
+ case Some(l) =>
+ //Console.println("writing REF...")
+ val sPrime = refDef.appP(Ref(), state.stream)
+
+ //Console.println("writing location to state " + sPrime)
+ return new PicklerState(unat.appP(l, sPrime), pe)
+ }
+ }
+ def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
+ /*
+ - first, read tag (i.e. DEF or REF)
+ - if REF:
+ read location l
+ look up resulting value in unpickler environment
+ - if DEF:
+ record location l of input stream
+ --> deserialize value v with argument deserializer
+ add entry to unpickler environment, mapping l onto v
+ */
+ val upe = state.dict
+ val res = refDef.appU(state.stream)
+ res._1 match {
+ case Def() =>
+ val l = upe.nextLoc
+ val res2 = pa.appU(new UnPicklerState(res._2, upe))
+ upe.update(l, res2._1)
+ return res2
+ case Ref() =>
+ val res2 = unat.appU(res._2) // read location
+ upe.get(res2._1) match { // lookup value in unpickler env
+ case None => error("invalid unpickler environment"); return null
+ case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
+ }
+ }
+ }
+ }
+
+ def upickle[t](p: PU[t], a: t): Array[byte] =
+ p.appP(a, new Array[byte](0));
+
+ def uunpickle[t](p: PU[t], stream: Array[byte]): t =
+ p.appU(stream)._1;
+
+ def pickle[t](p: SPU[t], a: t): Array[byte] =
+ p.appP(a, new PicklerState(new Array[byte](0), new PicklerEnv)).stream;
+
+ def unpickle[t](p: SPU[t], stream: Array[byte]): t =
+ p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
+
+ def ulift[t](x: t): PU[t] = new PU[t] {
+ def appP(a: t, state: Array[byte]): Array[byte] =
+ if (x != a) { error("value to be pickled (" + a + ") != " + x); state }
+ else state;
+ def appU(state: Array[byte]) = Pair(x, state);
+ }
+
+ def lift[t](x: t): SPU[t] = new SPU[t] {
+ def appP(a: t, state: PicklerState): PicklerState =
+ if (x != a) { /*error("value to be pickled (" + a + ") != " + x);*/ state }
+ else state;
+ def appU(state: UnPicklerState) = Pair(x, state);
+ }
+
+ def usequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
+ def appP(b: u, s: Array[byte]): Array[byte] = {
+ val a = f(b)
+ val sPrime = pa.appP(a, s)
+ val pb = k(a)
+ val sPrimePrime = pb.appP(b, sPrime)
+ sPrimePrime
+ }
+ def appU(s: Array[byte]): Pair[u, Array[byte]] = {
+ val resPa = pa.appU(s)
+ val a = resPa._1
+ val sPrime = resPa._2
+ val pb = k(a)
+ pb.appU(sPrime)
+ }
+ }
+
+ def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
+ def appP(b: u, s: PicklerState): PicklerState = {
+ val a = f(b)
+ //Console.println("pickling " + a + ", s: " + s.stream)
+ val sPrime = pa.appP(a, s)
+ val pb = k(a)
+ //Console.println("pickling " + b + ", s: " + s.stream)
+ pb.appP(b, sPrime)
+ }
+ def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
+ val resPa = pa.appU(s)
+ val a = resPa._1
+ val sPrime = resPa._2
+ val pb = k(a)
+ pb.appU(sPrime)
+ }
+ }
+
+ def upair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
+ def fst(p: Pair[a,b]): a = p._1;
+ def snd(p: Pair[a,b]): b = p._2;
+ usequ(fst, pa, (x: a) => usequ(snd, pb, (y: b) => ulift(Pair(x, y))))
+ }
+
+ def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
+ def fst(p: Pair[a,b]): a = p._1;
+ def snd(p: Pair[a,b]): b = p._2;
+ sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
+ }
+
+ def triple[a,b,c](pa: SPU[a], pb: SPU[b], pc: SPU[c]): SPU[Triple[a,b,c]] = {
+ def fst(p: Triple[a,b,c]): a = p._1;
+ def snd(p: Triple[a,b,c]): b = p._2;
+ def trd(p: Triple[a,b,c]): c = p._3;
+
+ sequ(fst, pa,
+ (x: a) => sequ(snd, pb,
+ (y: b) => sequ(trd, pc,
+ (z: c) => lift(Triple(x, y, z)))))
+ }
+
+ def uwrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
+ usequ(j, pa, (x: a) => ulift(i(x)));
+
+ def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
+ sequ(j, pa, (x: a) => lift(i(x)));
+
+ def appendByte(a: Array[byte], b: int): Array[byte] = {
+ Array.concat(a, (List[byte](b.asInstanceOf[byte])).toArray)
+ }
+
+ def nat2Bytes(x: int): Array[byte] = {
+ val buf = new ArrayBuffer[byte]
+ def writeNatPrefix(x: int): unit = {
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf += ((x & 0x7f) | 0x80).asInstanceOf[byte];
+ }
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf += (x & 0x7f).asInstanceOf[byte];
+ buf.toArray
+ }
+
+ def nat: SPU[int] = new SPU[int] {
+ def appP(n: int, s: PicklerState): PicklerState = {
+ new PicklerState(Array.concat(s.stream, nat2Bytes(n)), s.dict);
+ }
+ def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
+ var num = 0
+ def readNat: int = {
+ var b = 0;
+ var x = 0;
+ do {
+ b = s.stream(num)
+ num = num + 1
+ x = (x << 7) + (b & 0x7f);
+ } while ((b & 0x80) != 0);
+ x
+ }
+ Pair(readNat, new UnPicklerState(s.stream.subArray(num, s.stream.length), s.dict))
+ }
+ }
+
+ def byte: SPU[byte] = new SPU[byte] {
+ def appP(b: byte, s: PicklerState): PicklerState =
+ new PicklerState(Array.concat(s.stream, (List[byte](b)).toArray), s.dict);
+ def appU(s: UnPicklerState): Pair[byte, UnPicklerState] =
+ Pair(s.stream(0), new UnPicklerState(s.stream.subArray(1, s.stream.length), s.dict));
+ }
+
+ def string: SPU[String] =
+ share(wrap((a:Array[byte]) => UTF8Codec.decode(a, 0, a.length), (s:String) => UTF8Codec.encode(s), bytearray));
+
+ def bytearray: SPU[Array[byte]] = {
+ wrap((l:List[byte]) => l.toArray, .toList, list(byte))
+ }
+
+ def bool: SPU[boolean] = {
+ def toEnum(b: boolean) = if (b) 1 else 0;
+ def fromEnum(n: int) = if (n == 0) false else true;
+ wrap(fromEnum, toEnum, nat)
+ }
+
+ def ufixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
+ def pairToList(p: Pair[a,List[a]]): List[a] =
+ p._1 :: p._2;
+ def listToPair(l: List[a]): Pair[a,List[a]] =
+ l match { case x :: xs => Pair(x, xs) }
+
+ if (n == 0) ulift(Nil)
+ else
+ uwrap(pairToList, listToPair, upair(pa, ufixedList(pa)(n-1)))
+ }
+
+ def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
+ def pairToList(p: Pair[a,List[a]]): List[a] =
+ p._1 :: p._2;
+ def listToPair(l: List[a]): Pair[a,List[a]] =
+ l match { case x :: xs => Pair(x, xs) }
+
+ if (n == 0) lift(Nil)
+ else
+ wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
+ }
+
+ def list[a](pa: SPU[a]): SPU[List[a]] =
+ sequ((l: List[a])=>l.length, nat, fixedList(pa));
+
+ def ulist[a](pa: PU[a]): PU[List[a]] =
+ usequ((l:List[a]) => l.length, unat, ufixedList(pa));
+
+ def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
+ sequ(tag, nat, (x: int)=> ps.apply(x)());
+
+ def printByteArray(a: Array[byte]) = {
+ val iter = a.elements
+ while (iter.hasNext) {
+ val el = iter.next
+ Console.print("" + el + ", ")
+ }
+ }
+
+ def main(args: Array[String]) = {
+ // test nat2Bytes
+
+ Console.println(printByteArray(nat2Bytes(1)))
+ Console.println(printByteArray(nat2Bytes(10)))
+ Console.println(printByteArray(nat2Bytes(16)))
+ Console.println(printByteArray(nat2Bytes(256)))
+
+ Console.println(100000)
+ var res = pickle(nat, 100000)
+ Console.println(printByteArray(res))
+ var up = unpickle(nat, res)
+ Console.println(up)
+
+ // -- int list
+ val intList = List(1, 7, 13)
+ Console.println(intList)
+ val res9 = pickle(list(nat), intList)
+ Console.println(printByteArray(res9))
+ val up9 = unpickle(list(nat), res9)
+ Console.println(up9)
+
+ // ---------------
+ // -- boolean list
+ val bList = List(false, true, true)
+ Console.println(bList)
+ val res2 = pickle(list(bool), bList)
+ Console.println(printByteArray(res2))
+ val up2 = unpickle(list(bool), res2)
+ Console.println(up2)
+
+ // -- string
+ val s = "Hello"
+ Console.println(s)
+ val res3 = pickle(string, s)
+ Console.println(printByteArray(res3))
+ val up3 = unpickle(string, res3)
+ Console.println(up3)
+
+ val personPU = wrap((p:Pair[String,int]) => Person(p._1, p._2), (p:Person) => Pair(p.name, p.age), pair(string, nat));
+ val p = Person("Philipp", 25)
+ Console.println(p)
+ val res4 = pickle(personPU, p)
+ Console.println(printByteArray(res4))
+ val up4 = unpickle(personPU, res4)
+ Console.println(up4)
+
+ val x = Var("x");
+ val i = Lam("x", x);
+ val k = Lam("x", Lam("y", x));
+ val kki = App(k, App(k, i));
+
+ /*def varPU: PU[Term] = wrap(Var,
+ (t: Term)=> t match {case Var(x)=>x},
+ string);
+ def lamPU: PU[Term] = wrap(p: Pair[String,Term]=>Lam(p._1, p._2),
+ (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
+ pair(string, termPU));
+ def appPU: PU[Term] = wrap(p: Pair[Term,Term]=>App(p._1, p._2),
+ (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
+ pair(termPU, termPU));
+ def termPU: PU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
+ List(()=>varPU, ()=>lamPU, ()=>appPU));
+
+ Console.println("\n" + k);
+ val res5 = pickle(termPU, k);
+ Console.println(res5);
+ val up5 = unpickle(termPU, res5);
+ Console.println(up5);
+
+ Console.println("\n" + kki);
+ val res6 = pickle(termPU, kki);
+ Console.println(res6);
+ Console.println("len: " + res6.length)
+ val up6 = unpickle(termPU, res6);
+ Console.println(up6);*/
+
+ def varSPU: SPU[Term] = wrap(Var,
+ (t: Term)=> t match {case Var(x)=>x},
+ string);
+
+ def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
+ (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
+ pair(string, termSPU));
+
+ def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
+ (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
+ pair(termSPU, termSPU));
+
+ def termSPU: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
+ List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
+
+ Console.println("\n" + k);
+ val res8 = pickle(termSPU, k);
+ Console.println(printByteArray(res8));
+ Console.println("len: " + res8.length)
+ val up8 = unpickle(termSPU, res8);
+ Console.println(up8);
+
+ Console.println("\n" + kki);
+ val res7 = pickle(termSPU, kki);
+ Console.println(printByteArray(res7));
+ Console.println("len: " + res7.length)
+
+ val up7 = unpickle(termSPU, res7);
+ Console.println(up7);
+ }
+
+ case class Person(name: String, age: int);
+
+ abstract class Term;
+ case class Var(s: String) extends Term;
+ case class Lam(s: String, t: Term) extends Term;
+ case class App(t1: Term, t2: Term) extends Term;
+}
diff --git a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala
new file mode 100644
index 0000000000..c678e5acf8
--- /dev/null
+++ b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala
@@ -0,0 +1,519 @@
+package scala.actors.distributed.picklers;
+
+import scala.collection.mutable.HashMap;
+
+import java.io.StringReader;
+import java.io.StringWriter;
+
+/**
+ Pickler combinators.
+
+ Author: Philipp Haller <philipp.haller@epfl.ch>
+ */
+
+object SStreamPickle {
+ abstract class PU[t] {
+ def appP(a: t, state: OutStream): OutStream;
+ def appU(state: InStream): Pair[t,InStream];
+ }
+
+ //def pickle[t](p: PU[t], a: t): OutStream =
+ // p.appP(a, "");
+
+ def unpickle[t](p: PU[t], stream: InStream): t =
+ p.appU(stream)._1;
+
+ def lift[t](x: t): PU[t] = new PU[t] {
+ def appP(a: t, state: OutStream): OutStream = state;
+ def appU(state: InStream) = Pair(x, state);
+ }
+
+ def sequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] {
+ def appP(b: u, s: OutStream): OutStream = {
+ val a = f(b)
+ val sPrime = pa.appP(a, s)
+ val pb = k(a)
+ val sPrimePrime = pb.appP(b, sPrime)
+ sPrimePrime
+ }
+ def appU(s: InStream): Pair[u,InStream] = {
+ val resPa = pa.appU(s)
+ val a = resPa._1
+ val sPrime = resPa._2
+ val pb = k(a)
+ pb.appU(sPrime)
+ }
+ }
+
+ def pair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = {
+ def fst(p: Pair[a,b]): a = p._1;
+ def snd(p: Pair[a,b]): b = p._2;
+ sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
+ }
+
+ def triple[a,b,c](pa: PU[a], pb: PU[b], pc: PU[c]): PU[Triple[a,b,c]] = {
+ def fst(p: Triple[a,b,c]): a = p._1;
+ def snd(p: Triple[a,b,c]): b = p._2;
+ def trd(p: Triple[a,b,c]): c = p._3;
+
+ sequ(fst, pa,
+ (x: a) => sequ(snd, pb,
+ (y: b) => sequ(trd, pc,
+ (z: c) => lift(Triple(x, y, z)))))
+ }
+
+ def wrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] =
+ sequ(j, pa, (x: a) => lift(i(x)));
+
+ def unit: PU[unit] =
+ lift(unit);
+
+ def pad(s: String, req: int): String = {
+ val buf = new StringBuffer
+ for (val i <- List.range(1, req-s.length+1))
+ buf append "0"
+ (buf append s).toString
+ }
+ def encode(i: int): String = pad(Integer.toHexString(i), 8);
+ def decode(s: String): int = Integer.decode("0x" + s).intValue();
+
+ def int: PU[int] = new PU[int] {
+ def appP(n: int, s: OutStream): OutStream = {
+ s.write(encode(n))
+ s
+ }
+ def appU(s: InStream): Pair[int,InStream] = {
+ val substr = s.read(8)
+ //Console.println("unpickling " + substr)
+ Pair(decode(substr), s)
+ }
+ }
+
+ def char: PU[char] = new PU[char] {
+ def appP(b: char, s: OutStream): OutStream = {
+ s.write(b)
+ s
+ }
+ def appU(s: InStream): Pair[char,InStream] = {
+ val carr = new Array[char](1)
+ s.read(carr)
+ //Console.println("unpickling " + carr(0))
+ Pair(carr(0), s)
+ }
+ }
+
+ def bool: PU[boolean] = {
+ def toEnum(b: boolean) = if (b) 1 else 0;
+ def fromEnum(n: int) = if (n == 0) false else true;
+ wrap(fromEnum, toEnum, nat)
+ }
+
+ def fixedList[a](pa: PU[a])(n: int): PU[List[a]] = {
+ def pairToList(p: Pair[a,List[a]]): List[a] =
+ p._1 :: p._2;
+ def listToPair(l: List[a]): Pair[a,List[a]] =
+ l match { case x :: xs => Pair(x, xs) }
+
+ if (n == 0) lift(Nil)
+ else
+ wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
+ }
+
+ def list[a](pa: PU[a]): PU[List[a]] =
+ sequ((l: List[a])=>l.length, nat, fixedList(pa));
+
+ def string: PU[String] =
+ wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
+
+ def alt[a](tag: a => int, ps: List[PU[a]]): PU[a] =
+ sequ(tag, int, ps.apply);
+
+ def data[a](tag: a => int, ps: List[()=>PU[a]]): PU[a] =
+ sequ(tag, nat, (x: int)=> ps.apply(x)());
+
+ def option[a](pa: PU[a]): PU[Option[a]] = {
+ def tag(x: Option[a]) = x match {
+ case None => 0
+ case Some(y) => 1
+ }
+ def fromSome(x: Option[a]) = x match {
+ case Some(y) => y
+ case None => null
+ }
+ def toSome(x: a): Option[a] = Some(x);
+ val pnone: PU[Option[a]] = lift(None)
+ alt(tag, List(pnone, wrap(toSome, fromSome, pa)))
+ }
+
+ def byteString(b: int) =
+ pad(Integer.toHexString(b), 2);
+
+ def natString(x: int): String = {
+ val buf = new StringBuffer
+
+ def writeNatPrefix(x: int): unit = {
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf.append(byteString((x & 0x7f) | 0x80));
+ }
+
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf.append(byteString(x & 0x7f));
+ buf.toString()
+ }
+
+ def nat: PU[int] = new PU[int] {
+ def appP(n: int, s: OutStream): OutStream = {
+ s.write(natString(n))
+ s
+ }
+ def appU(s: InStream): Pair[int,InStream] = {
+ def readNat: int = {
+ var b = 0;
+ var x = 0;
+ do {
+ b = decode(s.read(2));
+ x = (x << 7) + (b & 0x7f);
+ } while ((b & 0x80) != 0);
+ x
+ }
+ Pair(readNat, s)
+ }
+ }
+
+ def main(args: Array[String]) = {
+ def testBase128(x: int) = {
+ Console.println(x)
+
+ val sw = new StringWriter
+ val os = new OutStream(sw)
+ val res = nat.appP(x, os)
+ os.flush()
+ Console.println(sw.toString())
+
+ val up = nat.appU(new InStream(new StringReader(sw.toString())))
+ Console.println(up._1)
+ }
+
+ testBase128(0)
+ testBase128(1)
+ testBase128(64)
+ testBase128(127)
+ testBase128(128)
+ testBase128(8192)
+
+ def pickleTest[a](x: a, pa: PU[a]) = {
+ Console.println(x)
+
+ val sw = new StringWriter
+ val os = new OutStream(sw)
+ val res = pa.appP(x, os)
+ os.flush()
+ Console.println(sw.toString())
+
+ val up = pa.appU(new InStream(new StringReader(sw.toString())))
+ Console.println(up._1)
+ }
+
+ pickleTest(List(1, 7, 13), list(nat))
+
+ pickleTest(List(false, true, true), list(bool))
+
+ pickleTest("Hello", string)
+
+
+ val personPU = wrap((p: Pair[String,int]) => Person(p._1, p._2), (p: Person) => Pair(p.name, p.age), pair(string, nat));
+ val p = Person("Philipp", 25)
+ pickleTest(p, personPU)
+
+
+ val x = Var("x");
+ val i = Lam("x", x);
+ val k = Lam("x", Lam("y", x));
+ val kki = App(k, App(k, i));
+
+ def varPU: PU[Term] =
+ wrap(Var,
+ (t: Term)=> t match {case Var(x)=>x},
+ string);
+ def lamPU: PU[Term] =
+ wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
+ (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
+ pair(string, termPU));
+ def appPU: PU[Term] =
+ wrap((p: Pair[Term,Term])=>App(p._1, p._2),
+ (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
+ pair(termPU, termPU));
+ def termPU: PU[Term] =
+ data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
+ List(()=>varPU, ()=>lamPU, ()=>appPU));
+
+ pickleTest(k, termPU)
+ pickleTest(kki, termPU)
+ }
+
+ case class Person(name: String, age: int);
+}
+
+abstract class Term;
+case class Var(s: String) extends Term;
+case class Lam(s: String, t: Term) extends Term;
+case class App(t1: Term, t2: Term) extends Term;
+
+
+object ShareStreamPickle {
+ abstract class SPU[t] {
+ def appP(a: t, state: PicklerState): PicklerState;
+ def appU(state: UnPicklerState): Pair[t, UnPicklerState];
+ }
+
+ //def pickle[t](p: SPU[t], a: t): String =
+ // p.appP(a, new PicklerState("", new PicklerEnv)).stream;
+
+ //def unpickle[t](p: SPU[t], stream: String): t =
+ // p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1;
+
+ class PicklerEnv extends HashMap[Any, int] {
+ private var cnt: int = 64;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+ }
+
+ class UnPicklerEnv extends HashMap[int, Any] {
+ private var cnt: int = 64;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+ }
+
+ class PicklerState(val stream: OutStream, val dict: PicklerEnv) {}
+ class UnPicklerState(val stream: InStream, val dict: UnPicklerEnv) {}
+
+ abstract class RefDef;
+ case class Ref() extends RefDef;
+ case class Def() extends RefDef;
+
+ def refDef: SStreamPickle.PU[RefDef] = new SStreamPickle.PU[RefDef] {
+ def appP(b: RefDef, s: OutStream): OutStream =
+ b match {
+ case Ref() => s.write("0"); s
+ case Def() => s.write("1"); s
+ };
+ def appU(s: InStream): Pair[RefDef, InStream] =
+ if (s.readChar == '0') Pair(Ref(), s)
+ else Pair(Def(), s);
+ }
+
+ def share[a](pa: SPU[a]): SPU[a] = new SPU[a] {
+ def appP(v: a, state: PicklerState): PicklerState = {
+ /*
+ - is there some value equal to v associated with a location l in the pickle environment?
+ - yes: write REF-tag to outstream together with l
+ - no:
+ write DEF-tag to outstream
+ record current location l of outstream
+ --> serialize value
+ add entry to pickle environment, mapping v onto l
+ */
+ val pe = state.dict
+ pe.get(v) match {
+ case None =>
+ //Console.println("" + v + " is new")
+ //Console.println("writing DEF...")
+ val sPrime = refDef.appP(Def(), state.stream)
+ val l = pe.nextLoc()
+
+ //Console.println("applying pickler to state " + sPrime)
+ val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe))
+
+ //Console.println("updating dict (" + l + ") for " + v)
+ pe.update(v, l)
+
+ return sPrimePrime
+ case Some(l) =>
+ //Console.println("writing REF...")
+ val sPrime = refDef.appP(Ref(), state.stream)
+ //Console.println("writing location to state " + sPrime)
+ return new PicklerState(SStreamPickle.nat.appP(l, sPrime), pe)
+ }
+ }
+ def appU(state: UnPicklerState): Pair[a, UnPicklerState] = {
+ /*
+ - first, read tag (i.e. DEF or REF)
+ - if REF:
+ read location l
+ look up resulting value in unpickler environment
+ - if DEF:
+ record location l of input stream
+ --> deserialize value v with argument deserializer
+ add entry to unpickler environment, mapping l onto v
+ */
+ val upe = state.dict
+ val res = refDef.appU(state.stream)
+ res._1 match {
+ case Def() =>
+ val l = upe.nextLoc
+ val res2 = pa.appU(new UnPicklerState(res._2, upe))
+ upe.update(l, res2._1)
+ return res2
+ case Ref() =>
+ val res2 = SStreamPickle.nat.appU(res._2) // read location
+ upe.get(res2._1) match { // lookup value in unpickler env
+ case None => error("invalid unpickler environment"); return null
+ case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe))
+ }
+ }
+ }
+ }
+
+ def lift[t](x: t): SPU[t] = new SPU[t] {
+ def appP(a: t, state: PicklerState): PicklerState = state;
+ def appU(state: UnPicklerState) = Pair(x, state);
+ }
+
+ def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] {
+ def appP(b: u, s: PicklerState): PicklerState = {
+ val a = f(b)
+ //Console.println("pickling " + a + ", s: " + s.stream)
+ val sPrime = pa.appP(a, s)
+ val pb = k(a)
+ //Console.println("pickling " + b + ", s: " + s.stream)
+ pb.appP(b, sPrime)
+ }
+ def appU(s: UnPicklerState): Pair[u, UnPicklerState] = {
+ val resPa = pa.appU(s)
+ val a = resPa._1
+ val sPrime = resPa._2
+ val pb = k(a)
+ pb.appU(sPrime)
+ }
+ }
+
+ def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = {
+ def fst(p: Pair[a,b]): a = p._1;
+ def snd(p: Pair[a,b]): b = p._2;
+ sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y))))
+ }
+
+ def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] =
+ sequ(j, pa, (x: a) => lift(i(x)));
+
+ def char: SPU[char] = new SPU[char] {
+ def appP(b: char, s: PicklerState): PicklerState = {
+ s.stream.write(b)
+ new PicklerState(s.stream, s.dict)
+ }
+ def appU(s: UnPicklerState): Pair[char, UnPicklerState] =
+ Pair(s.stream.readChar, new UnPicklerState(s.stream, s.dict));
+ }
+
+ def pad(s: String, req: int): String = {
+ val buf = new StringBuffer
+ for (val i <- List.range(1, req-s.length+1))
+ buf append "0"
+ (buf append s).toString
+ }
+ def encode(i: int): String = pad(Integer.toHexString(i), 8);
+ def decode(s: String): int = Integer.decode("0x" + s).intValue();
+
+ def byteString(b: int) =
+ pad(Integer.toHexString(b), 2);
+
+ def natString(x: int): String = {
+ val buf = new StringBuffer
+
+ def writeNatPrefix(x: int): unit = {
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf.append(byteString((x & 0x7f) | 0x80));
+ }
+
+ val y = x >>> 7;
+ if (y != 0) writeNatPrefix(y);
+ buf.append(byteString(x & 0x7f));
+ buf.toString()
+ }
+
+ def nat: SPU[int] = new SPU[int] {
+ def appP(n: int, s: PicklerState): PicklerState = {
+ s.stream.write(natString(n))
+ new PicklerState(s.stream, s.dict)
+ }
+ def appU(s: UnPicklerState): Pair[int,UnPicklerState] = {
+ def readNat: int = {
+ var b = 0;
+ var x = 0;
+ do {
+ b = decode(s.stream.read(2));
+ x = (x << 7) + (b & 0x7f);
+ } while ((b & 0x80) != 0);
+ x
+ }
+ Pair(readNat, new UnPicklerState(s.stream, s.dict))
+ }
+ }
+
+ def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = {
+ def pairToList(p: Pair[a,List[a]]): List[a] =
+ p._1 :: p._2;
+ def listToPair(l: List[a]): Pair[a,List[a]] =
+ l match { case x :: xs => Pair(x, xs) }
+
+ if (n == 0) lift(Nil)
+ else
+ wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1)))
+ }
+
+ def list[a](pa: SPU[a]): SPU[List[a]] =
+ sequ((l: List[a])=>l.length, nat, fixedList(pa));
+
+ def string: SPU[String] =
+ wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char));
+
+ def alt[a](tag: a => int, ps: List[SPU[a]]): SPU[a] =
+ sequ(tag, nat, ps.apply);
+
+ def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] =
+ sequ(tag, nat, (x: int)=> ps.apply(x)());
+
+ def main(args: Array[String]) = {
+ def pickleTest[a](x: a, pa: SPU[a]) = {
+ Console.println(x)
+
+ val sw = new StringWriter
+ val os = new OutStream(sw)
+ val res = pa.appP(x, new PicklerState(os, new PicklerEnv))
+ os.flush()
+ Console.println(sw.toString())
+
+ val up = pa.appU(new UnPicklerState(new InStream(new StringReader(sw.toString())), new UnPicklerEnv))
+ Console.println(up._1)
+ }
+
+ val x = Var("x");
+ val i = Lam("x", x);
+ val k = Lam("x", Lam("y", x));
+ val kki = App(k, App(k, i));
+
+ def varSPU: SPU[Term] = wrap(Var,
+ (t: Term)=> t match {case Var(x)=>x},
+ string);
+
+ def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2),
+ (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)},
+ pair(string, termSPU));
+
+ def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2),
+ (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)},
+ pair(termSPU, termSPU));
+
+ def termSPU: SPU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
+ List(()=>varSPU, ()=>lamSPU, ()=>appSPU));
+
+ def termSPUshared: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2},
+ List(()=>varSPU, ()=>lamSPU, ()=>appSPU)));
+
+ pickleTest(k, termSPU)
+ pickleTest(k, termSPUshared)
+ pickleTest(kki, termSPU)
+ pickleTest(kki, termSPUshared)
+ }
+}
diff --git a/src/actors/scala/actors/distributed/picklers/Streams.scala b/src/actors/scala/actors/distributed/picklers/Streams.scala
new file mode 100644
index 0000000000..fa294785ab
--- /dev/null
+++ b/src/actors/scala/actors/distributed/picklers/Streams.scala
@@ -0,0 +1,87 @@
+package scala.actors.distributed.picklers;
+
+import java.io.Reader;
+import java.io.Writer;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+
+import scala.collection.mutable._;
+
+class OutStream(writer: Writer) {
+ val picklerEnv = new PicklerEnv;
+
+ private var loc: int = 0;
+
+ def getLocation = loc;
+
+ def write(s: String): unit = {
+ loc = loc + s.length()
+ writer.write(s)
+ //Console.println("new loc: " + loc)
+ }
+
+ def write(c: char): unit = {
+ loc = loc + 1
+ writer.write(c)
+ //Console.println("new loc: " + loc)
+ }
+
+ def flush(): unit =
+ writer.flush();
+}
+
+class InStream(reader: Reader) {
+ val unpicklerEnv = new UnpicklerEnv;
+
+ private var loc: int = 0;
+
+ def getLocation = loc;
+
+ def read(num: int): String = {
+ val carr = new Array[char](num)
+ val cnt = reader.read(carr)
+ loc = loc + num
+ //Console.println("new loc: " + loc)
+ new String(carr)
+
+ /*val buf = new StringBuffer
+ var ch = r.read()
+ loc = loc + 1
+ if (num > 1) {
+ var cnt = 1
+ while (cnt < num && ch != -1) {
+ buf.append(ch)
+ ch = r.read()
+ loc = loc + 1
+ cnt = cnt + 1
+ }
+ if (cnt == num)
+ buf.toString()
+ else
+ buf.toString() // error
+ }
+ else
+ new String(ch.asInstanceOf[char])*/
+ }
+
+ def read(cbuf: Array[char]): int = {
+ loc = loc + cbuf.length
+ reader.read(cbuf)
+ }
+
+ def readChar: char = {
+ val carr = new Array[char](1)
+ read(carr)
+ carr(0)
+ }
+}
+
+class PicklerEnv[a] extends HashMap[a, int] {
+ private var cnt: int = 0;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+}
+
+class UnpicklerEnv[a] extends HashMap[int, a] {
+ private var cnt: int = 0;
+ def nextLoc() = { cnt = cnt + 1; cnt };
+}
diff --git a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala
new file mode 100644
index 0000000000..36ad115cdd
--- /dev/null
+++ b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala
@@ -0,0 +1,77 @@
+/* ____ ____ ____ ____ ______ *\
+** / __// __ \/ __// __ \/ ____/ SOcos COmpiles Scala **
+** __\_ \/ /_/ / /__/ /_/ /\_ \ (c) 2002-2006, LAMP/EPFL **
+** /_____/\____/\___/\____/____/ **
+\* */
+
+// $Id: UTF8Codec.scala 7116 2006-04-11 15:36:05Z mihaylov $
+
+
+package scala.actors.distributed.picklers
+
+object UTF8Codec {
+
+ def encode(src: Array[Char], from: Int, dst: Array[Byte], to: Int, len: Int): Int = {
+ var i = from;
+ var j = to;
+ val end = from + len;
+ while (i < end) {
+ val ch = src(i);
+ i = i + 1;
+ if (ch < 128) {
+ dst(j) = ch.toByte;
+ j = j + 1;
+ }
+ else if (ch <= 0x3FF) {
+ dst(j) = (0xC0 | (ch >> 6)).toByte;
+ dst(j+1) = (0x80 | (ch & 0x3F)).toByte;
+ j = j + 2;
+ } else {
+ dst(j) = (0xE0 | (ch >> 12)).toByte;
+ dst(j+1) = (0x80 | ((ch >> 6) & 0x3F)).toByte;
+ dst(j+2) = (0x80 | (ch & 0x3F)).toByte;
+ j = j + 3;
+ }
+ }
+ j
+ }
+
+ def encode(s: String, dst: Array[Byte], to: Int): Int =
+ encode(s.toCharArray(), 0, dst, to, s.length());
+
+
+ def encode(s: String): Array[Byte] = {
+ val dst = new Array[Byte](s.length() * 3);
+ val len = encode(s, dst, 0);
+ dst.subArray(0, len)
+ }
+
+ def decode(src: Array[Byte], from: Int,
+ dst: Array[Char], to: Int, len: Int): Int =
+ {
+ var i = from;
+ var j = to;
+ val end = from + len;
+ while (i < end) {
+ var b = src(i) & 0xFF;
+ i = i + 1;
+ if (b >= 0xE0) {
+ b = ((b & 0x0F) << 12) | (src(i) & 0x3F) << 6;
+ b = b | (src(i+1) & 0x3F);
+ i = i + 2;
+ } else if (b >= 0xC0) {
+ b = ((b & 0x1F) << 6) | (src(i) & 0x3F);
+ i = i + 1;
+ }
+ dst(j) = b.toChar;
+ j = j + 1;
+ }
+ j
+ }
+
+ def decode(src: Array[Byte], from: Int, len: Int): String = {
+ val cs = new Array[Char](len);
+ String.copyValueOf(cs, 0, decode(src, 0, cs, 0, len));
+ }
+
+}
diff --git a/src/actors/scala/actors/gui/Button.scala b/src/actors/scala/actors/gui/Button.scala
new file mode 100644
index 0000000000..99b74d27be
--- /dev/null
+++ b/src/actors/scala/actors/gui/Button.scala
@@ -0,0 +1,20 @@
+package scala.actors.gui
+
+import javax.swing._
+import event._
+
+/** A class for buttons; standard constructor wraps around a swing button */
+class Button(val jbutton: JButton) extends Container(jbutton) with SwingComponent with Publisher {
+ def this(txt: String) = this(new JButton(txt))
+ def this() = this(new JButton())
+ def text: String = jbutton.getText()
+ def text_=(s: String) = jbutton.setText(s)
+ def icon: Icon = jbutton.getIcon()
+ def icon_=(i: Icon) = jbutton.setIcon(i)
+ jbutton.addActionListener {
+ new java.awt.event.ActionListener {
+ def actionPerformed(e: java.awt.event.ActionEvent): unit =
+ publish(ButtonPressed(Button.this))
+ }
+ }
+}
diff --git a/src/actors/scala/actors/gui/Caret.scala b/src/actors/scala/actors/gui/Caret.scala
new file mode 100644
index 0000000000..05cfc2659a
--- /dev/null
+++ b/src/actors/scala/actors/gui/Caret.scala
@@ -0,0 +1,8 @@
+package scala.actors.gui;
+
+import javax.swing
+
+class Caret(val jcaret: swing.text.Caret) {
+ def dot: int = jcaret.getDot()
+ def mark: int = jcaret.getMark()
+}
diff --git a/src/actors/scala/actors/gui/Component.scala b/src/actors/scala/actors/gui/Component.scala
new file mode 100644
index 0000000000..9610b433fa
--- /dev/null
+++ b/src/actors/scala/actors/gui/Component.scala
@@ -0,0 +1,9 @@
+package scala.actors.gui;
+
+import javax.swing._;
+import java.awt._;
+
+class Component(val acomponent: java.awt.Component) extends Subscriber {
+ def show: this.type = { acomponent.setVisible(true); this }
+}
+
diff --git a/src/actors/scala/actors/gui/Container.scala b/src/actors/scala/actors/gui/Container.scala
new file mode 100644
index 0000000000..e30ac78caa
--- /dev/null
+++ b/src/actors/scala/actors/gui/Container.scala
@@ -0,0 +1,18 @@
+package scala.actors.gui;
+
+import javax.swing._
+import scala.collection.mutable.ListBuffer
+
+class Container(val jcontainer: java.awt.Container) extends Component(jcontainer) {
+ def this() = this(new java.awt.Container())
+ val elems = new ListBuffer[Component]
+ def += (c: Component) = {
+ elems += c
+ jcontainer.add(c.acomponent)
+ }
+ def -= (c: Component) = {
+ elems -= c
+ jcontainer.remove(c.acomponent)
+ }
+}
+
diff --git a/src/actors/scala/actors/gui/EmptyBorder.scala b/src/actors/scala/actors/gui/EmptyBorder.scala
new file mode 100644
index 0000000000..01ff7481cb
--- /dev/null
+++ b/src/actors/scala/actors/gui/EmptyBorder.scala
@@ -0,0 +1,8 @@
+package scala.actors.gui;
+
+import javax.swing._
+
+class EmptyBorder(_top: int, _left: int, _bottom: int, _right: int)
+extends border.EmptyBorder(_top, _left, _bottom, _right) {
+ def this() = this(0, 0, 0, 0)
+}
diff --git a/src/actors/scala/actors/gui/FormattedTextField.scala b/src/actors/scala/actors/gui/FormattedTextField.scala
new file mode 100644
index 0000000000..74b0c6e359
--- /dev/null
+++ b/src/actors/scala/actors/gui/FormattedTextField.scala
@@ -0,0 +1,9 @@
+package scala.actors.gui;
+
+import javax.swing._
+import java.awt.event._
+import event._
+
+class FormattedTextField(val jftextfield: JFormattedTextField) extends TextComponent(jftextfield) {
+ def this(format: java.text.Format) = this(new JFormattedTextField(format));
+}
diff --git a/src/actors/scala/actors/gui/Frame.scala b/src/actors/scala/actors/gui/Frame.scala
new file mode 100644
index 0000000000..10b79f54c3
--- /dev/null
+++ b/src/actors/scala/actors/gui/Frame.scala
@@ -0,0 +1,33 @@
+package scala.actors.gui;
+
+import javax.swing._;
+import event._;
+
+class Frame(val jframe: JFrame) extends Container(jframe) with Publisher {
+ def this() = this(new JFrame("Untitled Frame"))
+ def title: String = jframe.getTitle()
+ def title_=(s: String) = jframe.setTitle(s)
+ val contents = new Container(jframe.getContentPane())
+ private var default_button: Button = null
+ def defaultButton = default_button
+ def defaultButton_=(b: Button) = { default_button = b; jframe.getRootPane().setDefaultButton(b.jbutton) }
+ def pack: this.type = { jframe.pack(); this }
+ jframe.addWindowListener {
+ new java.awt.event.WindowListener {
+ def windowActivated(e: java.awt.event.WindowEvent) = publish(WindowActivated(Frame.this))
+ def windowClosed(e: java.awt.event.WindowEvent) = publish(WindowClosed(Frame.this))
+ def windowClosing(e: java.awt.event.WindowEvent) = publish(WindowClosing(Frame.this))
+ def windowDeactivated(e: java.awt.event.WindowEvent) = publish(WindowDeactivated(Frame.this))
+ def windowDeiconified(e: java.awt.event.WindowEvent) = publish(WindowDeiconified(Frame.this))
+ def windowIconified(e: java.awt.event.WindowEvent) = publish(WindowIconified(Frame.this))
+ def windowOpened(e: java.awt.event.WindowEvent) = publish(WindowOpened(Frame.this))
+ }
+ }
+
+ /*jframe.addMouseMotionListener (
+ new java.awt.event.MouseMotionListener {
+ def mouseDragged(e: java.awt.event.MouseEvent) = publish(MouseDragged(e))
+ def mouseMoved(e: java.awt.event.MouseEvent) = publish(MouseMoved(e))
+ }
+ )*/
+}
diff --git a/src/actors/scala/actors/gui/GUIApplication.scala b/src/actors/scala/actors/gui/GUIApplication.scala
new file mode 100644
index 0000000000..7d52e75628
--- /dev/null
+++ b/src/actors/scala/actors/gui/GUIApplication.scala
@@ -0,0 +1,20 @@
+package scala.actors.gui
+
+import javax.swing._
+import event.Event
+
+class GUIApplication {
+ def defaultLookAndFeelDecorated: boolean = true
+
+ def init() = {
+ UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName())
+ JFrame.setDefaultLookAndFeelDecorated(defaultLookAndFeelDecorated)
+ }
+
+ def run(prog: => unit): unit =
+ SwingUtilities.invokeLater {
+ new Runnable() {
+ def run() = { init(); prog }
+ }
+ }
+}
diff --git a/src/actors/scala/actors/gui/Label.scala b/src/actors/scala/actors/gui/Label.scala
new file mode 100644
index 0000000000..5094f9436c
--- /dev/null
+++ b/src/actors/scala/actors/gui/Label.scala
@@ -0,0 +1,14 @@
+package scala.actors.gui;
+
+import javax.swing._;
+
+class Label(val jlabel: JLabel) extends Container(jlabel) with SwingComponent {
+ def this(txt: String) = this(new JLabel(txt))
+ def this() = this("Untitled Label")
+ def text: String = jlabel.getText()
+ def text_=(s: String) = jlabel.setText(s)
+ def halign: Orientation.Value = Orientation(jlabel.getHorizontalAlignment())
+ def halign_=(x: Orientation.Value) = jlabel.setHorizontalAlignment(x.id)
+ def valign: Orientation.Value = Orientation(jlabel.getVerticalAlignment())
+ def valign_=(x: Orientation.Value) = jlabel.setVerticalAlignment(x.id)
+}
diff --git a/src/actors/scala/actors/gui/MainFrame.scala b/src/actors/scala/actors/gui/MainFrame.scala
new file mode 100644
index 0000000000..b76a170e71
--- /dev/null
+++ b/src/actors/scala/actors/gui/MainFrame.scala
@@ -0,0 +1,14 @@
+package scala.actors.gui;
+
+import javax.swing._;
+import scala.actors.gui.event._;
+
+class MainFrame(jframe: JFrame) extends Frame(jframe) {
+ def this() = this(new JFrame("Untitled Frame"))
+
+ addHandler {
+ case WindowClosing(_) => System.exit(1)
+ }
+
+ subscribe(this)
+}
diff --git a/src/actors/scala/actors/gui/Orientation.scala b/src/actors/scala/actors/gui/Orientation.scala
new file mode 100644
index 0000000000..a71157380a
--- /dev/null
+++ b/src/actors/scala/actors/gui/Orientation.scala
@@ -0,0 +1,11 @@
+package scala.actors.gui
+
+import javax.swing.SwingConstants._
+
+object Orientation extends Enumeration {
+ val left = Value(LEFT, "left")
+ val right = Value(RIGHT, "right")
+ val bottom = Value(BOTTOM, "bottom")
+ val top = Value(TOP, "top")
+ val center = Value(CENTER, "center")
+}
diff --git a/src/actors/scala/actors/gui/Panel.scala b/src/actors/scala/actors/gui/Panel.scala
new file mode 100644
index 0000000000..59cbff6784
--- /dev/null
+++ b/src/actors/scala/actors/gui/Panel.scala
@@ -0,0 +1,15 @@
+package scala.actors.gui;
+
+import javax.swing._
+import java.awt.event._
+
+class Panel(val jpanel: JPanel) extends Container(jpanel) with SwingComponent {
+ def this(layout: java.awt.LayoutManager, elements: Component*) = {
+ this(new JPanel(layout));
+ for (val elem <- elements) this += elem
+ }
+ def this(elements: Component*) = this(new java.awt.FlowLayout, elements: _*)
+
+ def layout: java.awt.LayoutManager = jpanel.getLayout()
+ def layout_=(x: java.awt.LayoutManager) = jpanel.setLayout(x)
+}
diff --git a/src/actors/scala/actors/gui/Publisher.scala b/src/actors/scala/actors/gui/Publisher.scala
new file mode 100644
index 0000000000..0b71bc30f6
--- /dev/null
+++ b/src/actors/scala/actors/gui/Publisher.scala
@@ -0,0 +1,126 @@
+package scala.actors.gui
+
+import scala.collection.mutable.ListBuffer
+
+import scala.actors.single.Actor
+import scala.actors.single.Pid
+
+import scala.actors.gui.event.Event
+
+class EventHandlers {
+ type Handler = PartialFunction[AnyRef,unit]
+
+ private val handlers = new ListBuffer[Handler]
+
+ def += (h: Handler) = { handlers += h }
+ def -= (h: Handler) = { handlers -= h }
+
+ def compoundHandler = new Handler {
+ def isDefinedAt(e: AnyRef): boolean = handlers.exists(.isDefinedAt(e))
+
+ def apply(e: AnyRef): unit =
+ handlers.find(.isDefinedAt(e)) match {
+ case Some(h) => h.apply(e)
+ case None => // do nothing
+ }
+ }
+}
+
+trait Responder extends Actor {
+ protected val handlers = new EventHandlers
+
+ final def eventloop(f: PartialFunction[Message,unit]): scala.All =
+ receive(new RecursiveProxyHandler(this, f))
+
+ def eventblock(f: PartialFunction[Message,unit]): unit = {
+ try {
+ receive(new RecursiveProxyHandler(this, f))
+ }
+ catch {
+ case d: Done =>
+ // do nothing
+ }
+ }
+
+ private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
+ def isDefinedAt(m: Message): boolean =
+ true // events should be removed from the mailbox immediately!
+
+ def apply(m: Message): unit = {
+ if (f.isDefinedAt(m)) f(m) // overrides any installed handler
+ else
+ if (handlers.compoundHandler.isDefinedAt(m))
+ handlers.compoundHandler(m)
+ else {
+ // do nothing
+ }
+ a receive this
+ }
+ }
+}
+
+case class Subscribe(s: Subscriber)
+case class Publish(e: Event)
+
+trait Subscriber extends Responder {
+ type Handler = PartialFunction[AnyRef,unit]
+ def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this)
+}
+
+trait Publisher extends Responder {
+ case class HandlerAdded()
+
+ private val subscribers = new ListBuffer[Subscriber]
+
+ handlers += { // installs _permanent_ handler!
+ case Subscribe(s) =>
+ //Console.println("" + this + ": rec subscription from " + s)
+ subscribers += s
+ case Publish(e) => for (val s <- subscribers) s send e
+ }
+
+ //Console.println("" + this + ": exec toplevel eventloop (Publisher)")
+
+ eventblock {
+ case HandlerAdded() =>
+ //Console.println("" + this + " received HandlerAdded()")
+ }
+
+ def addHandler(h: EventHandlers#Handler) = {
+ //Console.println("" + this + ": installing new handler")
+ handlers += h
+ this send HandlerAdded() // causes currently active eventloop to recursively call itself
+ }
+
+ def publish(e: Event) = {
+ //Console.println("Publishing event: " + e)
+ for (val s <- subscribers) s send e
+ }
+
+ // TODO: super.receive might already be overridden!
+ //final override def receive(f: PartialFunction[Message,unit]): scala.All =
+ //super.receive(new ProxyPubSubHandler(f))
+
+ private class ProxyPubSubHandler(f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] {
+ def isDefinedAt(m: Message): boolean =
+ if (f.isDefinedAt(m)) true
+ else m match {
+ case Subscribe(s) => true
+ case Publish(e) => true
+ case other => false
+ }
+
+ def apply(m: Message): unit = {
+ m match {
+ case Subscribe(s) =>
+ //Console.println("Rec subscription: " + s)
+ subscribers += s
+ case Publish(e) =>
+ for (val s <- subscribers) s send e
+ case other =>
+ // do nothing
+ }
+ if (f.isDefinedAt(m)) f(m)
+ }
+ }
+}
diff --git a/src/actors/scala/actors/gui/SimpleGUIApplication.scala b/src/actors/scala/actors/gui/SimpleGUIApplication.scala
new file mode 100644
index 0000000000..30154932cc
--- /dev/null
+++ b/src/actors/scala/actors/gui/SimpleGUIApplication.scala
@@ -0,0 +1,14 @@
+package scala.actors.gui;
+
+import javax.swing._
+
+abstract class SimpleGUIApplication extends GUIApplication {
+
+ def top: Frame;
+
+ def main(args: Array[String]) = {
+ run { top.pack.show }
+ }
+
+ implicit def string2label(s: String): Label = new Label(s)
+}
diff --git a/src/actors/scala/actors/gui/SwingComponent.scala b/src/actors/scala/actors/gui/SwingComponent.scala
new file mode 100644
index 0000000000..cbe6478c32
--- /dev/null
+++ b/src/actors/scala/actors/gui/SwingComponent.scala
@@ -0,0 +1,11 @@
+package scala.actors.gui
+
+import javax.swing._
+import java.awt._
+
+trait SwingComponent extends Component {
+ val jcomponent = acomponent.asInstanceOf[JComponent];
+ def border: javax.swing.border.Border = jcomponent.getBorder()
+ def border_=(x: javax.swing.border.Border): unit = jcomponent.setBorder(x)
+}
+
diff --git a/src/actors/scala/actors/gui/TextComponent.scala b/src/actors/scala/actors/gui/TextComponent.scala
new file mode 100644
index 0000000000..955745045f
--- /dev/null
+++ b/src/actors/scala/actors/gui/TextComponent.scala
@@ -0,0 +1,22 @@
+package scala.actors.gui
+
+import javax.swing._
+import javax.swing.text.JTextComponent
+import javax.swing.event.{CaretEvent,CaretListener}
+import event.CaretUpdate
+
+class TextComponent(val jtextcomponent: JTextComponent)
+extends Container(jtextcomponent) with SwingComponent with Publisher {
+
+ def text: String = jtextcomponent.getText()
+ def text_=(x: String) = jtextcomponent.setText(x)
+
+ val caret = new Caret(jtextcomponent.getCaret())
+
+ jtextcomponent.addCaretListener {
+ new CaretListener {
+ def caretUpdate(e: CaretEvent) =
+ publish(CaretUpdate(TextComponent.this))
+ }
+ }
+}
diff --git a/src/actors/scala/actors/gui/TextField.scala b/src/actors/scala/actors/gui/TextField.scala
new file mode 100644
index 0000000000..0e32fc29f5
--- /dev/null
+++ b/src/actors/scala/actors/gui/TextField.scala
@@ -0,0 +1,22 @@
+package scala.actors.gui;
+
+import javax.swing._
+import java.awt.event._
+import event._
+
+class TextField(val jtextfield: JTextField) extends TextComponent(jtextfield) {
+ def this(text: String, columns: int) = this(new JTextField(text, columns));
+ def this(text: String) = this(new JTextField(text));
+ def this(columns: int) = this(new JTextField(columns));
+ def this() = this(new JTextField());
+
+ def columns: int = jtextfield.getColumns()
+ def columns_=(x: int) = jtextfield.setColumns(x)
+
+ jtextfield.addActionListener {
+ new ActionListener {
+ def actionPerformed(e: ActionEvent) =
+ publish(TextModified(TextField.this))
+ }
+ }
+}
diff --git a/src/actors/scala/actors/gui/event/ButtonPressed.scala b/src/actors/scala/actors/gui/event/ButtonPressed.scala
new file mode 100644
index 0000000000..9d120314fb
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/ButtonPressed.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class ButtonPressed(b: Button) extends Event
diff --git a/src/actors/scala/actors/gui/event/CaretUpdate.scala b/src/actors/scala/actors/gui/event/CaretUpdate.scala
new file mode 100644
index 0000000000..15c48385d9
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/CaretUpdate.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class CaretUpdate(text: TextComponent) extends Event
diff --git a/src/actors/scala/actors/gui/event/Event.scala b/src/actors/scala/actors/gui/event/Event.scala
new file mode 100644
index 0000000000..949cf2cb4d
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/Event.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+abstract class Event
diff --git a/src/actors/scala/actors/gui/event/MouseDragged.scala b/src/actors/scala/actors/gui/event/MouseDragged.scala
new file mode 100644
index 0000000000..37b768b922
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/MouseDragged.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class MouseDragged(override val event: java.awt.event.MouseEvent) extends MouseEvent(event);
diff --git a/src/actors/scala/actors/gui/event/MouseEvent.scala b/src/actors/scala/actors/gui/event/MouseEvent.scala
new file mode 100644
index 0000000000..da43fbd8b8
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/MouseEvent.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+abstract class MouseEvent(val event: java.awt.event.MouseEvent) extends Event;
diff --git a/src/actors/scala/actors/gui/event/MouseMoved.scala b/src/actors/scala/actors/gui/event/MouseMoved.scala
new file mode 100644
index 0000000000..bf2028f159
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/MouseMoved.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class MouseMoved(override val event: java.awt.event.MouseEvent) extends MouseEvent(event);
diff --git a/src/actors/scala/actors/gui/event/TextModified.scala b/src/actors/scala/actors/gui/event/TextModified.scala
new file mode 100644
index 0000000000..0af1b49838
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/TextModified.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class TextModified(text: TextComponent) extends Event
diff --git a/src/actors/scala/actors/gui/event/WindowActivated.scala b/src/actors/scala/actors/gui/event/WindowActivated.scala
new file mode 100644
index 0000000000..797a2d0fc5
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowActivated.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowActivated(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowClosed.scala b/src/actors/scala/actors/gui/event/WindowClosed.scala
new file mode 100644
index 0000000000..7676809299
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowClosed.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowClosed(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowClosing.scala b/src/actors/scala/actors/gui/event/WindowClosing.scala
new file mode 100644
index 0000000000..bd63987c4d
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowClosing.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowClosing(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowDeactivated.scala b/src/actors/scala/actors/gui/event/WindowDeactivated.scala
new file mode 100644
index 0000000000..4452f792d6
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowDeactivated.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowDeactivated(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowDeiconified.scala b/src/actors/scala/actors/gui/event/WindowDeiconified.scala
new file mode 100644
index 0000000000..1c7cbca57c
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowDeiconified.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowDeiconified(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowEvent.scala b/src/actors/scala/actors/gui/event/WindowEvent.scala
new file mode 100644
index 0000000000..25177d8e2a
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowEvent.scala
@@ -0,0 +1,5 @@
+package scala.actors.gui.event
+
+abstract class WindowEvent extends Event {
+ val window: Frame
+}
diff --git a/src/actors/scala/actors/gui/event/WindowIconified.scala b/src/actors/scala/actors/gui/event/WindowIconified.scala
new file mode 100644
index 0000000000..abb6362188
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowIconified.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowIconified(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/event/WindowOpened.scala b/src/actors/scala/actors/gui/event/WindowOpened.scala
new file mode 100644
index 0000000000..f2656497a3
--- /dev/null
+++ b/src/actors/scala/actors/gui/event/WindowOpened.scala
@@ -0,0 +1,3 @@
+package scala.actors.gui.event
+
+case class WindowOpened(window: Frame) extends WindowEvent;
diff --git a/src/actors/scala/actors/gui/layout.scala b/src/actors/scala/actors/gui/layout.scala
new file mode 100644
index 0000000000..6d53d267ae
--- /dev/null
+++ b/src/actors/scala/actors/gui/layout.scala
@@ -0,0 +1,12 @@
+package scala.actors.gui
+
+import java.awt._
+
+object layout {
+
+ val flex = 0
+
+ def grid(rows: int, columns: int) = new GridLayout(rows, columns)
+ def row = new FlowLayout()
+ def column = grid(flex, 1)
+}
diff --git a/src/actors/scala/actors/multi/AbstractPid.scala b/src/actors/scala/actors/multi/AbstractPid.scala
new file mode 100644
index 0000000000..41cb745b57
--- /dev/null
+++ b/src/actors/scala/actors/multi/AbstractPid.scala
@@ -0,0 +1,10 @@
+package scala.actors.multi;
+
+/**
+ * @author Philipp Haller
+ */
+trait AbstractPid {
+ def !(msg: MailBox#Message): unit
+ def become(clos: Actor => Unit): unit
+ def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
+}
diff --git a/src/actors/scala/actors/multi/Actor.scala b/src/actors/scala/actors/multi/Actor.scala
new file mode 100644
index 0000000000..f1aa354880
--- /dev/null
+++ b/src/actors/scala/actors/multi/Actor.scala
@@ -0,0 +1,300 @@
+package scala.actors.multi
+
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.HashSet
+import scala.collection.mutable.Stack
+
+case class ExcHandlerDesc(pid: Pid, eid: int)
+
+class ExcHandler(actions: PartialFunction[Throwable, unit],
+ actor: Actor,
+ parent: ExcHandlerDesc) {
+ def handle(e: Throwable): unit = {
+ if (!actions.isDefinedAt(e)) {
+ if (parent != null) actor.forwardExc(parent, e)
+ }
+ else
+ actions(e)
+ }
+}
+
+/**
+ * @author Philipp Haller
+ */
+abstract class Actor extends MailBox {
+ def run(): Unit = {}
+
+ def start(): Unit = {
+ var finished = true
+ try { run }
+ catch {
+ case d: Done =>
+ finished = false
+ case t: Throwable =>
+ if (!excHandlerDescs.isEmpty)
+ forwardExc(excHandlerDescs.top, t)
+ else
+ exit(new Symbol(t.toString()))
+ }
+ if (finished) die()
+ }
+
+ case class Exit(from: Pid, reason: Symbol) extends Message
+
+ private var pid: Pid = null
+ def self: Pid = {
+ if (pid == null) pid = new LocalPid(this)
+ pid
+ }
+ def self_= (p: Pid) = pid = p
+
+
+ private val links = new HashSet[Pid]
+
+ def link(to: Pid): unit = {
+ // TODO: check if exists (eff: dont need to.linkTo...)
+ links += to
+ to.linkTo(self)
+ }
+ def linkTo(to: Pid): unit = links += to
+
+ def unlink(from: Pid): unit = {
+ // TODO: check if exists (eff)
+ links -= from
+ from.unlinkFrom(self)
+ }
+ def unlinkFrom(from: Pid): unit = links -= from
+
+
+ private var trapExit = false
+
+ def processFlag(flag: Symbol, set: boolean) = {
+ if (flag.name.equals("trapExit")) trapExit = set
+ }
+
+ def exit(reason: Symbol): unit = {
+ exitLinked(reason, new HashSet[Pid])
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ }
+ }
+
+ def exit(from: Pid, reason: Symbol): unit = {
+ if (from == self) {
+ exit(reason)
+ }
+ else {
+ if (trapExit)
+ this send Exit(from, reason)
+ else if (!reason.name.equals("normal"))
+ exit(reason)
+ }
+ }
+
+ def exitLinked(reason: Symbol, exitMarks: HashSet[Pid]): unit = {
+ if (exitMarks contains self) {
+ // we are marked, do nothing
+ }
+ else {
+ exitMarks += self // mark self as exiting
+ //Console.println("" + self + " is exiting (" + reason + ").")
+
+ // exit linked scala.actors
+ val iter = links.elements
+ while (iter.hasNext) {
+ val linkedPid = iter.next
+ unlink(linkedPid)
+ linkedPid.exit(self, reason)
+ }
+ exitMarks -= self
+ }
+ }
+
+ def spawnLink(body: Actor => unit): Pid = {
+ val a = new Actor {
+ override def run = body(this)
+ }
+ if (!excHandlerDescs.isEmpty)
+ a.pushExcHandlerDesc(excHandlerDescs.top)
+ // link new process to self
+ link(a.self)
+ a.start
+ a.self
+ }
+
+ val excHandlerDescs = new Stack[ExcHandlerDesc]
+
+ // exception handler table
+ val excHandlers = new HashMap[int, ExcHandler]
+
+ def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = {
+ // locality check (handler local to this actor?)
+ if (destDesc.pid == self) {
+ handleExc(destDesc, e)
+ }
+ else {
+ // forward to pid of destination descriptor
+ destDesc.pid.handleExc(destDesc, e)
+ }
+ }
+
+ def pushExcHandlerDesc(desc: ExcHandlerDesc) = {
+ excHandlerDescs += desc
+ }
+
+ /** is only called for local handlers
+ (i.e. destDesc.pid == self) */
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable) =
+ (excHandlers get destDesc.eid) match {
+ case Some(handler) =>
+ handler.handle(e)
+ case None =>
+ error("exc desc refers to non-registered handler")
+ };
+
+ var excCnt = 0
+ def freshExcId = { excCnt = excCnt + 1; excCnt }
+
+ def tryAsync(block: => unit,
+ handlerFun: PartialFunction[Throwable, unit]) = {
+ val excHandler =
+ new ExcHandler(handlerFun,
+ this,
+ if (excHandlerDescs.isEmpty) null
+ else excHandlerDescs.top)
+
+ val desc = ExcHandlerDesc(self, freshExcId)
+ // associate desc with handler
+ excHandlers += desc.eid -> excHandler
+ // push desc onto stack
+ excHandlerDescs += desc
+ //Console.println("desc stack height: " + excHandlerDescs.length)
+ // execute code block
+ block
+ }
+
+ def die(reason: Symbol) = {
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ exit(reason)
+ }
+ }
+
+ override def die() = {
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ exit('normal)
+ }
+ }
+
+ def process(f: PartialFunction[Message,unit], msg: Message): unit = {
+ try {
+ f(msg)
+ }
+ catch {
+ case d: Done =>
+ throw new Done
+ case t: Throwable =>
+ throw t
+ if (!excHandlerDescs.isEmpty)
+ forwardExc(excHandlerDescs.top, t)
+ else
+ die(Symbol(t.toString()))
+ }
+ }
+
+ override def receive(f: PartialFunction[Message,unit]): scala.All = {
+ if (isAlive) {
+ Scheduler.tick(this)
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ process(f, msg)
+ die()
+ case None =>
+ continuation = f
+ //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ override def receiveMsg(msg: MailBox#Message) = {
+ //Debug.info("" + Thread.currentThread() + ": Resuming " + this)
+ if (continuation != null) {
+ val f = continuation
+ continuation = null
+ scheduled = false
+ process(f, msg)
+ die()
+ }
+ else {
+ // use more complex receive-and-return continuation
+ val cases = contCases
+ val then = contThen
+ contCases = null
+ contThen = null
+ scheduled = false
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ }
+
+ def spawn(a: Actor): Pid = {
+ // let "a" inherit active exception handler
+ if (!excHandlerDescs.isEmpty)
+ a.pushExcHandlerDesc(excHandlerDescs.top)
+ a.start
+ a.self
+ }
+
+ def spawn(body: Actor => unit): Pid = {
+ val a = new Actor {
+ override def run = body(this)
+ }
+ if (!excHandlerDescs.isEmpty)
+ a.pushExcHandlerDesc(excHandlerDescs.top)
+ a.start
+ a.self
+ }
+
+ def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = {
+ val a = new Actor {
+ override def run = receive(cases)
+ }
+ if (!excHandlerDescs.isEmpty)
+ a.pushExcHandlerDesc(excHandlerDescs.top)
+ a.start
+ a.self
+ }
+
+ def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
+ receive {
+ case Pair(pid1, msg1) => receive {
+ case Pair(pid2, msg2) => cont match {
+ case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
+ }
+ }
+ case Pair(pid2, msg2) => receive {
+ case Pair(pid1, msg1) => cont match {
+ case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
+ }
+ }
+ }
+ }
+
+ def makeRef = Actor.makeRef
+}
+
+object Actor {
+ private var counter = 0
+ type Tag = int
+ def makeRef: Tag = {
+ counter = counter + 1
+ counter
+ }
+}
diff --git a/src/actors/scala/actors/multi/LocalPid.scala b/src/actors/scala/actors/multi/LocalPid.scala
new file mode 100644
index 0000000000..5201abe22e
--- /dev/null
+++ b/src/actors/scala/actors/multi/LocalPid.scala
@@ -0,0 +1,94 @@
+package scala.actors.multi;
+
+import scala.collection.mutable.Queue;
+
+/**
+ * @author Philipp Haller
+ */
+class LocalPid(actor: Actor) extends Pid {
+ var target = actor
+
+ def !(msg: MailBox#Message): unit = target send msg
+
+ def link(other: Pid): unit = target link other
+ def linkTo(other: Pid): unit = target linkTo other
+ def unlink(other: Pid): unit = target unlink other
+ def unlinkFrom(other: Pid): unit = target unlinkFrom other
+ def exit(reason: Symbol): unit = target exit reason
+ def exit(from: Pid, reason: Symbol): unit = target.exit(from, reason)
+
+ def spawn(body: Actor => Unit): Pid = {
+ val a = new Actor {
+ override def run: Unit = body(this)
+ }
+ a.start
+ a.self
+ }
+
+ def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
+ val a = new Actor {
+ override def run: Unit = receive(cases)
+ }
+ a.start
+ a.self
+ }
+
+ override def toString() = "LocalPid(" + target + ")"
+
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit =
+ actor.handleExc(destDesc, e);
+
+ def become(clos: Actor => Unit) = {
+ // old actor should become anonymous (cannot receive any messages any more)
+ // achieved by removing it from target of pid.
+
+ val oldActor = target
+ //Debug.info("old actor: " + oldActor);
+
+ // change our target to point to a newly created actor with the same mailbox.
+
+ val newActor = new Actor {
+ override def run: Unit = clos(this)
+ }
+ newActor.sent = oldActor.sent
+ target = newActor
+ newActor.self = this
+
+ //Debug.info("new actor: " + newActor);
+
+ // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
+ oldActor.sent = new Queue[MailBox#Message]
+
+ //Debug.info("Starting new actor.");
+ newActor.start // important to start after changing pid because actor may send messages to itself.
+ }
+
+ private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
+ def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m)
+ def apply(m: MailBox#Message): unit = {
+ f(m)
+ a receive this
+ }
+ }
+
+ def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = {
+
+ become(a => a receive new ProxyPartialFunction(a, f))
+
+ /*become(
+ a:Actor => {
+ def loop: unit = {
+ def proxyFun(m: MailBox#Message): unit = {
+ if (f.isDefinedAt(m)) {
+ f(m);
+ loop
+ }
+ };
+ //a receive proxyFun
+ }
+ loop
+ }
+ )*/
+ }
+
+}
diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala
new file mode 100644
index 0000000000..cf721403cb
--- /dev/null
+++ b/src/actors/scala/actors/multi/MailBox.scala
@@ -0,0 +1,175 @@
+package scala.actors.multi;
+
+import scala.collection.mutable.Queue;
+
+/**
+ * @author Philipp Haller
+ */
+class MailBox {
+ type Message = AnyRef
+ case class TIMEOUT() extends Message
+
+ /** Unconsumed messages. */
+ var sent = new Queue[Message]
+
+ var continuation: PartialFunction[Message,Unit] = null
+ // more complex continuation
+ var contCases: PartialFunction[Message,Message] = null
+ var contThen: Message => unit = null
+
+ def hasCont =
+ if ((continuation == null) && (contCases == null)) false
+ else true
+
+ def contDefinedAt(msg: Message) =
+ if (((continuation != null) && continuation.isDefinedAt(msg)) ||
+ ((contCases != null) && contCases.isDefinedAt(msg)))
+ true
+ else
+ false
+
+ var isAlive = true
+ var scheduled = false
+
+ private var pendingSignal = false
+
+ def send(msg: Message): unit = synchronized {
+ if (isAlive) {
+ if (!hasCont || scheduled) {
+ //Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
+ msg match {
+ case Signal() =>
+ // do not add to mailbox
+ case _ =>
+ sent += msg
+ }
+ }
+ else
+ msg match {
+ case Signal() =>
+ if (!contDefinedAt(TIMEOUT())) die()
+ else {
+ val task = new ReceiverTask(this, TIMEOUT())
+ //Debug.info("ready to receive. dispatch new task " + task)
+ scheduled = true
+ Scheduler.execute(task)
+ }
+ case _ =>
+ if (!contDefinedAt(msg))
+ sent += msg
+ else {
+ if (pendingSignal) {
+ pendingSignal = false
+ TimerThread.trashRequest(this)
+ }
+ val task = new ReceiverTask(this, msg)
+ //Debug.info("ready to receive. dispatch new task " + task)
+ scheduled = true
+ Scheduler.execute(task)
+ }
+ }
+ }
+ }
+
+ def receiveMsg(msg: MailBox#Message) = {
+ //Debug.info("" + Thread.currentThread() + ": Resuming " + this)
+ if (continuation != null) {
+ val f = continuation
+ continuation = null
+ scheduled = false
+ f(msg)
+ die()
+ }
+ else {
+ // use more complex receive-and-return continuation
+ val cases = contCases
+ val then = contThen
+ contCases = null
+ contThen = null
+ scheduled = false
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ }
+
+ def receive(f: PartialFunction[Message,unit]): scala.All = {
+ if (isAlive) {
+ Scheduler.tick(this)
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ f(msg)
+ die()
+ case None =>
+ continuation = f
+ //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
+ Scheduler.tick(this)
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ f(msg)
+ die()
+ case None =>
+ // if timeout == 0 then execute timeout action if specified (see Erlang book)
+ if (msec == 0) {
+ if (f.isDefinedAt(TIMEOUT()))
+ f(TIMEOUT())
+ die()
+ }
+ else {
+ if (msec > 0) {
+ TimerThread.requestTimeout(this, msec)
+ pendingSignal = true
+ }
+ continuation = f
+ //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ // original wish:
+ // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
+ // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
+
+ def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): unit = {
+ contCases = null
+ contThen = null
+ sent.dequeueFirst(cases.isDefinedAt) match {
+ case Some(msg) => {
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ case None => {
+ contCases = cases
+ contThen = then
+ //Debug.info("No msg found. Saved complex continuation.")
+ }
+ }
+ throw new Done
+ }
+
+ // receiv {...} then (msg => {...msg...})
+
+ class ReceiveAndReturn(cases: PartialFunction[Message,Message]) {
+ def then(body: Message => unit): unit = receiveAndReturn(cases, body)
+ }
+
+ def receiv(cases: PartialFunction[Message,Message]): ReceiveAndReturn =
+ new ReceiveAndReturn(cases)
+
+ def die() = {
+ if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
+ }
+ }
+}
diff --git a/src/actors/scala/actors/multi/Pid.scala b/src/actors/scala/actors/multi/Pid.scala
new file mode 100644
index 0000000000..99e5c59913
--- /dev/null
+++ b/src/actors/scala/actors/multi/Pid.scala
@@ -0,0 +1,20 @@
+package scala.actors.multi
+
+/**
+ * @author Philipp Haller
+ */
+[serializable]abstract class Pid {
+ def !(msg: MailBox#Message): unit;
+
+ def link(other: Pid): unit;
+ def linkTo(other: Pid): unit; // uni-directional
+ def unlink(other: Pid): unit;
+ def unlinkFrom(other: Pid): unit; // uni-directional
+ def exit(reason: Symbol): unit;
+ def exit(from: Pid, reason: Symbol): unit;
+
+ def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit;
+
+ //def become(clos: Actor => Unit): unit
+ //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
+}
diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala
new file mode 100644
index 0000000000..218ab000de
--- /dev/null
+++ b/src/actors/scala/actors/multi/ReceiverTask.scala
@@ -0,0 +1,16 @@
+package scala.actors.multi;
+
+/**
+ * @author Philipp Haller
+ */
+class ReceiverTask(val actor: MailBox, msg: MailBox#Message) extends Runnable {
+ def run(): unit = {
+ try {
+ actor receiveMsg msg
+ }
+ catch {
+ case d: Done =>
+ // do nothing (continuation is already saved)
+ }
+ }
+}
diff --git a/src/actors/scala/actors/single/AbstractPid.scala b/src/actors/scala/actors/single/AbstractPid.scala
new file mode 100644
index 0000000000..28f972148d
--- /dev/null
+++ b/src/actors/scala/actors/single/AbstractPid.scala
@@ -0,0 +1,10 @@
+package scala.actors.single;
+
+/**
+ * @author Philipp Haller
+ */
+trait AbstractPid {
+ def !(msg: MailBox#Message): unit
+ def become(clos: Actor => Unit): unit
+ def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
+}
diff --git a/src/actors/scala/actors/single/Actor.scala b/src/actors/scala/actors/single/Actor.scala
new file mode 100644
index 0000000000..41d1f4a59e
--- /dev/null
+++ b/src/actors/scala/actors/single/Actor.scala
@@ -0,0 +1,72 @@
+package scala.actors.single
+
+/**
+ * @author Philipp Haller
+ */
+abstract class Actor extends MailBox {
+ def run: Unit = {}
+
+ def start: Unit = {
+ try { run }
+ catch {
+ case d:Done =>
+ // do nothing
+ }
+ }
+
+ private var pid: Pid = null
+
+ def self: Pid = {
+ if (pid == null) pid = new LocalPid(this)
+ pid
+ }
+
+ def self_= (p: Pid) = pid = p
+
+ def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = {
+ receive {
+ case Pair(pid1, msg1) => receive {
+ case Pair(pid2, msg2) => cont match {
+ case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
+ }
+ }
+ case Pair(pid2, msg2) => receive {
+ case Pair(pid1, msg1) => cont match {
+ case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs))
+ }
+ }
+ }
+ }
+
+ def spawn(body: Actor => unit): Pid = {
+ val a = new Actor {
+ override def run = body(this)
+ }
+ a.start
+ a.self
+ }
+
+ def spawn(a: Actor): Pid = {
+ a.start
+ a.self
+ }
+
+ def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = {
+ val a = new Actor {
+ override def run = receive(cases)
+ }
+ a.start
+ a.self
+ }
+
+ def makeRef = Actor.makeRef
+}
+
+object Actor {
+ private var counter = 0
+ type Tag = int
+ def makeRef: Tag = {
+ counter = counter + 1
+ counter
+ }
+}
diff --git a/src/actors/scala/actors/single/LocalPid.scala b/src/actors/scala/actors/single/LocalPid.scala
new file mode 100644
index 0000000000..8186dca426
--- /dev/null
+++ b/src/actors/scala/actors/single/LocalPid.scala
@@ -0,0 +1,83 @@
+package scala.actors.single;
+
+import scala.collection.mutable.Queue;
+
+/**
+ * @author Philipp Haller
+ */
+class LocalPid(actor: Actor) extends Pid {
+ var target = actor
+
+ def !(msg: MailBox#Message): unit = target send msg
+
+ def become(clos: Actor => Unit) = {
+ // old actor should become anonymous (cannot receive any messages any more)
+ // achieved by removing it from target of pid.
+
+ val oldActor = target
+ //Debug.info("old actor: " + oldActor);
+
+ // change our target to point to a newly created actor with the same mailbox.
+
+ val newActor = new Actor {
+ override def run: Unit = clos(this)
+ }
+ newActor.sent = oldActor.sent
+ target = newActor
+ newActor.self = this
+
+ //Debug.info("new actor: " + newActor);
+
+ // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor)
+ oldActor.sent = new Queue[MailBox#Message]
+
+ //Debug.info("Starting new actor.");
+ newActor.start // important to start after changing pid because actor may send messages to itself.
+ }
+
+ private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] {
+ def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m)
+ def apply(m: MailBox#Message): unit = {
+ f(m)
+ a receive this
+ }
+ }
+
+ def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = {
+
+ become(a => a receive new ProxyPartialFunction(a, f))
+
+ /*become(
+ a:Actor => {
+ def loop: unit = {
+ def proxyFun(m: MailBox#Message): unit = {
+ if (f.isDefinedAt(m)) {
+ f(m);
+ loop
+ }
+ };
+ //a receive proxyFun
+ }
+ loop
+ }
+ )*/
+ }
+
+ def spawn(body: Actor => Unit): Pid = {
+ val a = new Actor {
+ override def run: Unit = body(this)
+ }
+ a.start
+ a.self
+ }
+
+ def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = {
+ val a = new Actor {
+ override def run: Unit = receive(cases)
+ }
+ a.start
+ a.self
+ }
+
+ override def toString() = "LocalPid(" + target + ")"
+}
diff --git a/src/actors/scala/actors/single/MailBox.scala b/src/actors/scala/actors/single/MailBox.scala
new file mode 100644
index 0000000000..68d6c5e784
--- /dev/null
+++ b/src/actors/scala/actors/single/MailBox.scala
@@ -0,0 +1,155 @@
+package scala.actors.single;
+
+import scala.collection.mutable.Queue;
+
+/**
+ * @author Philipp Haller
+ */
+class MailBox {
+
+ type Message = AnyRef
+ case class TIMEOUT() extends Message
+
+ /** Unconsumed messages. */
+ var sent = new Queue[Message]
+
+ var continuation: PartialFunction[Message,Unit] = null
+ // more complex continuation
+ var contCases: PartialFunction[Message,Message] = null
+ var contThen: Message => unit = null
+
+ def hasCont =
+ if ((continuation == null) && (contCases == null)) false
+ else true
+
+ def contDefinedAt(msg: Message) =
+ if (((continuation != null) && continuation.isDefinedAt(msg)) ||
+ ((contCases != null) && contCases.isDefinedAt(msg)))
+ true
+ else
+ false
+
+ var isAlive = true
+
+ private var duration: long = 0
+ private var timeInitial: long = 0
+ private var timeoutEnabled: boolean = false
+
+ def send(msg: Message): unit = synchronized {
+ if (isAlive)
+ if (!hasCont) {
+ Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
+ sent += msg
+ }
+ else {
+ var message = msg
+ var timeoutOccurred = false
+
+ if (timeoutEnabled && (System.currentTimeMillis() - timeInitial > duration))
+ timeoutOccurred = true
+
+ if (timeoutOccurred && !contDefinedAt(TIMEOUT()))
+ die()
+ else {
+ if (timeoutOccurred) message = TIMEOUT()
+
+ if (contDefinedAt(message)) {
+ // we exit receive, so reset timeoutEnabled
+ timeoutEnabled = false
+
+ try {
+ if (continuation != null) {
+ val f = continuation
+ continuation = null
+ f(msg)
+ die()
+ }
+ else {
+ // use more complex receive-and-return continuation
+ val cases = contCases
+ val then = contThen
+ contCases = null
+ contThen = null
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ }
+ catch {
+ case d: Done =>
+ // do nothing (continuation is already saved)
+ }
+ }
+ else {
+ Debug.info("cont not defined at msg. appending to mailbox.")
+ if (!timeoutOccurred) sent += message
+ }
+ }
+ }
+ }
+
+ def receive(f: PartialFunction[Message,unit]): scala.All = {
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ f(msg)
+ die()
+ case None =>
+ continuation = f
+ Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ throw new Done
+ }
+
+ def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = {
+ timeInitial = System.currentTimeMillis()
+ duration = msec
+
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) =>
+ f(msg)
+ die()
+ case None =>
+ // if timeout == 0 then execute timeout action if specified (see Erlang book)
+ if (duration == 0) {
+ if (f.isDefinedAt(TIMEOUT()))
+ f(TIMEOUT())
+ die()
+ }
+ else {
+ timeoutEnabled = true
+ continuation = f
+ Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
+ throw new Done
+ }
+
+ // original wish:
+ // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B
+ // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit
+
+ def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): scala.All = {
+ contCases = null
+ contThen = null
+ sent.dequeueFirst(cases.isDefinedAt) match {
+ case Some(msg) => {
+ val result = cases(msg)
+ then(result)
+ die()
+ }
+ case None => {
+ contCases = cases
+ contThen = then
+ Debug.info("No msg found. Saved complex continuation.")
+ }
+ }
+ throw new Done
+ }
+
+ def die() = {
+ isAlive = false
+ Debug.info("" + this + " died.")
+ }
+}
diff --git a/src/actors/scala/actors/single/Pid.scala b/src/actors/scala/actors/single/Pid.scala
new file mode 100644
index 0000000000..2c7ab20031
--- /dev/null
+++ b/src/actors/scala/actors/single/Pid.scala
@@ -0,0 +1,10 @@
+package scala.actors.single;
+
+/**
+ * @author Philipp Haller
+ */
+abstract class Pid {
+ def !(msg: MailBox#Message): unit;
+ //def become(clos: Actor => Unit): unit
+ //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit
+}