From 0d8b14c6055e76c0bff3b65d0f428d711abe1f5a Mon Sep 17 00:00:00 2001 From: Philipp Haller Date: Wed, 21 Jun 2006 12:35:21 +0000 Subject: Added actors library. --- src/actors/scala/actors/Debug.scala | 38 ++ src/actors/scala/actors/Done.scala | 19 + src/actors/scala/actors/SPanel.scala | 74 +++ src/actors/scala/actors/Scheduler.scala | 236 ++++++++ src/actors/scala/actors/TimerThread.scala | 156 ++++++ src/actors/scala/actors/WorkerThread.scala | 34 ++ .../scala/actors/distributed/JXTAServiceBase.scala | 9 + .../scala/actors/distributed/JavaSerializer.scala | 34 ++ src/actors/scala/actors/distributed/Messages.scala | 40 ++ .../scala/actors/distributed/MessagesComb.scala | 36 ++ src/actors/scala/actors/distributed/Name.scala | 7 + .../scala/actors/distributed/NetKernel.scala | 591 +++++++++++++++++++++ src/actors/scala/actors/distributed/Node.scala | 6 + src/actors/scala/actors/distributed/NodeComb.scala | 12 + .../scala/actors/distributed/RemoteActor.scala | 168 ++++++ .../scala/actors/distributed/RemotePid.scala | 184 +++++++ .../scala/actors/distributed/Serializer.scala | 53 ++ src/actors/scala/actors/distributed/Service.scala | 72 +++ .../actors/distributed/TcpSerializerComb.scala | 126 +++++ .../scala/actors/distributed/TcpService.scala | 215 ++++++++ .../actors/distributed/TcpServiceWorker.scala | 86 +++ src/actors/scala/actors/distributed/Util.scala | 58 ++ .../actors/distributed/picklers/BytePickle.scala | 436 +++++++++++++++ .../distributed/picklers/SStreamPickle.scala | 519 ++++++++++++++++++ .../actors/distributed/picklers/Streams.scala | 87 +++ .../actors/distributed/picklers/UTF8Codec.scala | 77 +++ src/actors/scala/actors/gui/Button.scala | 20 + src/actors/scala/actors/gui/Caret.scala | 8 + src/actors/scala/actors/gui/Component.scala | 9 + src/actors/scala/actors/gui/Container.scala | 18 + src/actors/scala/actors/gui/EmptyBorder.scala | 8 + .../scala/actors/gui/FormattedTextField.scala | 9 + src/actors/scala/actors/gui/Frame.scala | 33 ++ src/actors/scala/actors/gui/GUIApplication.scala | 20 + src/actors/scala/actors/gui/Label.scala | 14 + src/actors/scala/actors/gui/MainFrame.scala | 14 + src/actors/scala/actors/gui/Orientation.scala | 11 + src/actors/scala/actors/gui/Panel.scala | 15 + src/actors/scala/actors/gui/Publisher.scala | 126 +++++ .../scala/actors/gui/SimpleGUIApplication.scala | 14 + src/actors/scala/actors/gui/SwingComponent.scala | 11 + src/actors/scala/actors/gui/TextComponent.scala | 22 + src/actors/scala/actors/gui/TextField.scala | 22 + .../scala/actors/gui/event/ButtonPressed.scala | 3 + .../scala/actors/gui/event/CaretUpdate.scala | 3 + src/actors/scala/actors/gui/event/Event.scala | 3 + .../scala/actors/gui/event/MouseDragged.scala | 3 + src/actors/scala/actors/gui/event/MouseEvent.scala | 3 + src/actors/scala/actors/gui/event/MouseMoved.scala | 3 + .../scala/actors/gui/event/TextModified.scala | 3 + .../scala/actors/gui/event/WindowActivated.scala | 3 + .../scala/actors/gui/event/WindowClosed.scala | 3 + .../scala/actors/gui/event/WindowClosing.scala | 3 + .../scala/actors/gui/event/WindowDeactivated.scala | 3 + .../scala/actors/gui/event/WindowDeiconified.scala | 3 + .../scala/actors/gui/event/WindowEvent.scala | 5 + .../scala/actors/gui/event/WindowIconified.scala | 3 + .../scala/actors/gui/event/WindowOpened.scala | 3 + src/actors/scala/actors/gui/layout.scala | 12 + src/actors/scala/actors/multi/AbstractPid.scala | 10 + src/actors/scala/actors/multi/Actor.scala | 300 +++++++++++ src/actors/scala/actors/multi/LocalPid.scala | 94 ++++ src/actors/scala/actors/multi/MailBox.scala | 175 ++++++ src/actors/scala/actors/multi/Pid.scala | 20 + src/actors/scala/actors/multi/ReceiverTask.scala | 16 + src/actors/scala/actors/single/AbstractPid.scala | 10 + src/actors/scala/actors/single/Actor.scala | 72 +++ src/actors/scala/actors/single/LocalPid.scala | 83 +++ src/actors/scala/actors/single/MailBox.scala | 155 ++++++ src/actors/scala/actors/single/Pid.scala | 10 + 70 files changed, 4751 insertions(+) create mode 100644 src/actors/scala/actors/Debug.scala create mode 100644 src/actors/scala/actors/Done.scala create mode 100644 src/actors/scala/actors/SPanel.scala create mode 100644 src/actors/scala/actors/Scheduler.scala create mode 100644 src/actors/scala/actors/TimerThread.scala create mode 100644 src/actors/scala/actors/WorkerThread.scala create mode 100644 src/actors/scala/actors/distributed/JXTAServiceBase.scala create mode 100644 src/actors/scala/actors/distributed/JavaSerializer.scala create mode 100644 src/actors/scala/actors/distributed/Messages.scala create mode 100644 src/actors/scala/actors/distributed/MessagesComb.scala create mode 100644 src/actors/scala/actors/distributed/Name.scala create mode 100644 src/actors/scala/actors/distributed/NetKernel.scala create mode 100644 src/actors/scala/actors/distributed/Node.scala create mode 100644 src/actors/scala/actors/distributed/NodeComb.scala create mode 100644 src/actors/scala/actors/distributed/RemoteActor.scala create mode 100644 src/actors/scala/actors/distributed/RemotePid.scala create mode 100644 src/actors/scala/actors/distributed/Serializer.scala create mode 100644 src/actors/scala/actors/distributed/Service.scala create mode 100644 src/actors/scala/actors/distributed/TcpSerializerComb.scala create mode 100644 src/actors/scala/actors/distributed/TcpService.scala create mode 100644 src/actors/scala/actors/distributed/TcpServiceWorker.scala create mode 100644 src/actors/scala/actors/distributed/Util.scala create mode 100644 src/actors/scala/actors/distributed/picklers/BytePickle.scala create mode 100644 src/actors/scala/actors/distributed/picklers/SStreamPickle.scala create mode 100644 src/actors/scala/actors/distributed/picklers/Streams.scala create mode 100644 src/actors/scala/actors/distributed/picklers/UTF8Codec.scala create mode 100644 src/actors/scala/actors/gui/Button.scala create mode 100644 src/actors/scala/actors/gui/Caret.scala create mode 100644 src/actors/scala/actors/gui/Component.scala create mode 100644 src/actors/scala/actors/gui/Container.scala create mode 100644 src/actors/scala/actors/gui/EmptyBorder.scala create mode 100644 src/actors/scala/actors/gui/FormattedTextField.scala create mode 100644 src/actors/scala/actors/gui/Frame.scala create mode 100644 src/actors/scala/actors/gui/GUIApplication.scala create mode 100644 src/actors/scala/actors/gui/Label.scala create mode 100644 src/actors/scala/actors/gui/MainFrame.scala create mode 100644 src/actors/scala/actors/gui/Orientation.scala create mode 100644 src/actors/scala/actors/gui/Panel.scala create mode 100644 src/actors/scala/actors/gui/Publisher.scala create mode 100644 src/actors/scala/actors/gui/SimpleGUIApplication.scala create mode 100644 src/actors/scala/actors/gui/SwingComponent.scala create mode 100644 src/actors/scala/actors/gui/TextComponent.scala create mode 100644 src/actors/scala/actors/gui/TextField.scala create mode 100644 src/actors/scala/actors/gui/event/ButtonPressed.scala create mode 100644 src/actors/scala/actors/gui/event/CaretUpdate.scala create mode 100644 src/actors/scala/actors/gui/event/Event.scala create mode 100644 src/actors/scala/actors/gui/event/MouseDragged.scala create mode 100644 src/actors/scala/actors/gui/event/MouseEvent.scala create mode 100644 src/actors/scala/actors/gui/event/MouseMoved.scala create mode 100644 src/actors/scala/actors/gui/event/TextModified.scala create mode 100644 src/actors/scala/actors/gui/event/WindowActivated.scala create mode 100644 src/actors/scala/actors/gui/event/WindowClosed.scala create mode 100644 src/actors/scala/actors/gui/event/WindowClosing.scala create mode 100644 src/actors/scala/actors/gui/event/WindowDeactivated.scala create mode 100644 src/actors/scala/actors/gui/event/WindowDeiconified.scala create mode 100644 src/actors/scala/actors/gui/event/WindowEvent.scala create mode 100644 src/actors/scala/actors/gui/event/WindowIconified.scala create mode 100644 src/actors/scala/actors/gui/event/WindowOpened.scala create mode 100644 src/actors/scala/actors/gui/layout.scala create mode 100644 src/actors/scala/actors/multi/AbstractPid.scala create mode 100644 src/actors/scala/actors/multi/Actor.scala create mode 100644 src/actors/scala/actors/multi/LocalPid.scala create mode 100644 src/actors/scala/actors/multi/MailBox.scala create mode 100644 src/actors/scala/actors/multi/Pid.scala create mode 100644 src/actors/scala/actors/multi/ReceiverTask.scala create mode 100644 src/actors/scala/actors/single/AbstractPid.scala create mode 100644 src/actors/scala/actors/single/Actor.scala create mode 100644 src/actors/scala/actors/single/LocalPid.scala create mode 100644 src/actors/scala/actors/single/MailBox.scala create mode 100644 src/actors/scala/actors/single/Pid.scala (limited to 'src') diff --git a/src/actors/scala/actors/Debug.scala b/src/actors/scala/actors/Debug.scala new file mode 100644 index 0000000000..1b1e3f0705 --- /dev/null +++ b/src/actors/scala/actors/Debug.scala @@ -0,0 +1,38 @@ +package scala.actors; + +/** + * @author Philipp Haller + */ +object Debug { + var lev = 2 + + def level = lev + def level_= (lev: int) = { + //Console.println("Setting debug level to " + lev) + this.lev = lev + } + + def info(s: String) = + if (lev > 2) System.out.println("Info: " + s) + def warning(s: String) = + if (lev > 1) System.err.println("Warning: " + s) + def error(s: String) = + if (lev > 0) System.err.println("Error: " + s) +} + +class Debug(tag: String) { + var lev = 2 + + def level = lev + def level_= (lev: int) = { + //Console.println("Setting debug level (" + tag + ") to " + lev) + this.lev = lev + } + + def info(s: String) = + if (lev > 2) System.out.println(tag + " (info): " + s) + def warning(s: String) = + if (lev > 1) System.err.println(tag + " (warn): " + s) + def error(s: String) = + if (lev > 0) System.err.println(tag + " (erro): " + s) +} diff --git a/src/actors/scala/actors/Done.scala b/src/actors/scala/actors/Done.scala new file mode 100644 index 0000000000..5bef8d89d8 --- /dev/null +++ b/src/actors/scala/actors/Done.scala @@ -0,0 +1,19 @@ +package scala.actors; + +/** + * @author Philipp Haller + */ +class Done extends Throwable { + override def fillInStackTrace(): Throwable = + this; +} + +class ContinueException extends Throwable { + override def fillInStackTrace(): Throwable = + this; +} + +class AbortException extends Throwable { + override def fillInStackTrace(): Throwable = + this; +} diff --git a/src/actors/scala/actors/SPanel.scala b/src/actors/scala/actors/SPanel.scala new file mode 100644 index 0000000000..7f3b3a7a1b --- /dev/null +++ b/src/actors/scala/actors/SPanel.scala @@ -0,0 +1,74 @@ +package scala.actors; + + +/* + * SPanel.scala + * GUI for simple texts + * + */ +import java.awt._; +import java.awt.event._; +import javax.swing.event._; +import javax.swing._; +import java.io._; + + + +class SPanel (WIDTH:int, HEIGHT:int, title:String) extends JFrame{ + + private var textArea:JTextArea = null; + private var resetButton:JButton = null; + + private var scrollPane:JScrollPane = null; + private var panel:JPanel = null; + private var contentPane:Container = null; + private var levelChoice:JComboBox = null; + private var formatChoice:JComboBox = null; + + //init + setTitle(title); + setSize(WIDTH, HEIGHT); + this.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE); + + contentPane = getContentPane(); + panel = new JPanel(); + contentPane.add(panel, BorderLayout.SOUTH); + + // Add a text area with scroll bars + textArea = new JTextArea(8, 40); + scrollPane = new JScrollPane(textArea); + contentPane.add(scrollPane, BorderLayout.CENTER); + + // Add a "reset textarea" button + resetButton = new JButton("Clear"); + panel.add(resetButton); + resetButton.addActionListener( + new ActionListener() { + def actionPerformed(evt:ActionEvent):Unit= { + textArea.setText(""); + } + } + ); + + this.repaint(); + this.show(); + + + def addText(text:String):Unit = { + textArea.append(text+'\n'); + + if ( textArea.getHeight() > scrollPane.getHeight() ) { + scrollPane.getVerticalScrollBar().setValue(scrollPane.getVerticalScrollBar().getMaximum()); + } + repaint(); + } +/* + def actionPerformed(ActionEvent e):Unit { + } +*/ + + override def paint(g:Graphics ): unit = { + super.paint( g ); + } + +} diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala new file mode 100644 index 0000000000..7caaa1b35c --- /dev/null +++ b/src/actors/scala/actors/Scheduler.scala @@ -0,0 +1,236 @@ +package scala.actors + +import scala.collection.mutable._ + +import scala.actors.multi._ + +/** + * @author Philipp Haller + */ +abstract class IScheduler /*extends java.util.concurrent.Executor*/ { + def execute(item: ReceiverTask): unit; + def getTask(worker: WorkerThread): Runnable; + def tick(a: MailBox): unit; + + val QUIT_TASK = new Runnable() { + def run(): unit = {}; + override def toString() = "QUIT_TASK"; + } +} + + +object Scheduler /*extends java.util.concurrent.Executor*/ { + private var sched: /*java.util.concurrent.Executor*/ IScheduler = + //java.util.concurrent.Executors.newFixedThreadPool(2); + //new FixedWorkersScheduler(2); + new SpareWorkerScheduler2 + //new SpareWorkerScheduler + + def impl = sched + def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = { + Debug.info("Using scheduler " + scheduler) + sched = scheduler + } + + def execute(item: ReceiverTask) = + sched.execute(item) + + def tick(a: MailBox) = + sched.tick(a) +} + + +class SpareWorkerScheduler2 extends IScheduler { + private val tasks = new Queue[ReceiverTask]; + private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; + + val idle = new Queue[WorkerThread]; + val ticks = new HashMap[WorkerThread, long] + val executing = new HashMap[MailBox, WorkerThread] + + var TICKFREQ = 50 + + def init = { + for (val i <- List.range(0, 2)) { + val worker = new WorkerThread(this) + workers += worker + worker.start() + } + } + init; + + var maxWorkers = 0; + var ticksCnt = 0; + + def tick(a: MailBox): unit = synchronized { + ticksCnt = ticksCnt + 1 + executing.get(a) match { + case None => // thread outside of scheduler; error("No worker thread associated with actor " + a) + case Some(wt) => + ticks.update(wt, System.currentTimeMillis) + } + } + + def execute(item: ReceiverTask): unit = synchronized { + if (idle.length > 0) { + val wt = idle.dequeue + executing.update(item.actor, wt) + wt.execute(item) + } + else { + /* only create new worker thread + when all are blocked according to heuristic + + we check time stamps of latest send/receive ops + of ALL workers + + we stop if there is one which is not blocked */ + + val iter = workers.elements + var foundBusy = false + while (iter.hasNext && !foundBusy) { + val wt = iter.next + ticks.get(wt) match { + case None => foundBusy = true // assume not blocked + case Some(ts) => { + val currTime = System.currentTimeMillis + if (currTime - ts < TICKFREQ) + foundBusy = true + } + } + } + + if (!foundBusy) { + val newWorker = new WorkerThread(this) + workers += newWorker + maxWorkers = workers.length // statistics + + executing.update(item.actor, newWorker) + + newWorker.execute(item) + newWorker.start() + } + else { + // wait assuming busy thread will be finished soon + // and ask for more work + tasks += item + } + } + } + + def getTask(worker: WorkerThread) = synchronized { + if (tasks.length > 0) { + val item = tasks.dequeue + executing.update(item.actor, worker) + item + } + else { + idle += worker + null + } + } +} + +/** + * @author Philipp Haller + */ +class SpareWorkerScheduler extends IScheduler { + private var canQuit = false; + private val tasks = new Queue[Runnable]; + private val idle = new Queue[WorkerThread]; + + private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]; + + def init = { + for (val i <- List.range(0, 2)) { + val worker = new WorkerThread(this) + workers += worker + worker.start() + } + } + init; + + var maxWorkers = 0; + + def execute(item: ReceiverTask): unit = synchronized { + if (idle.length == 0) { + tasks += item + // create new worker + val newWorker = new WorkerThread(this) + workers += newWorker + maxWorkers = workers.length + newWorker.start() + //newWorker.execute(item) + } + else { + canQuit = true + idle.dequeue.execute(item) + } + } + + def getTask(worker: WorkerThread) = synchronized { + if (tasks.length > 0) tasks.dequeue + else { + idle += worker + null + //if ((idle.length == workers.length) && canQuit) haltExcept(worker) + //else null + } + } + + def tick(a: MailBox): unit = {} + + def haltExcept(w: WorkerThread) = { + for (val i <- List.range(0, workers.length)) + if (workers(i) != w) workers(i).halt + QUIT_TASK + } +} + + +/** + * @author Philipp Haller + */ +class FixedWorkersScheduler(workercnt: int) extends IScheduler { + private var canQuit = false; + private val tasks = new Queue[Runnable]; + private val idle = new Queue[WorkerThread]; + + //Console.println("Running with " + workercnt + " workers") + private var workers = new Array[WorkerThread](workercnt); + + def init = { + for (val i <- List.range(0, workers.length)) { + workers(i) = new WorkerThread(this) + workers(i).start() + } + } + init; + + def execute(item: ReceiverTask): unit = synchronized { + if (workers.length == 0) item.run + else { + canQuit = true + if (idle.length > 0) idle.dequeue.execute(item) + else tasks += item + } + } + + def getTask(worker: WorkerThread) = synchronized { + if (tasks.length > 0) tasks.dequeue + else { + idle += worker + null + //if ((idle.length == workers.length) && canQuit) haltExcept(worker) + //else null + } + } + + def tick(a: MailBox): unit = {} + + def haltExcept(w: WorkerThread) = { + for (val i <- List.range(0, workers.length)) + if (workers(i) != w) workers(i).halt + QUIT_TASK + } +} diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala new file mode 100644 index 0000000000..a483d7b043 --- /dev/null +++ b/src/actors/scala/actors/TimerThread.scala @@ -0,0 +1,156 @@ +/** + @author Sebastien Noir + + This class allows the (locl) sending of a message to an actor after a timeout. Used by the library to build receiveWithin(time:long). Note that the library deletes non received TIMEOUT() message if a messsage is received before the time-out occurs. + */ + + package scala.actors + +import scala.collection.mutable.PriorityQueue + +import scala.actors.multi.Actor +import scala.actors.multi.MailBox + +case class Signal() + +object TimerThread extends AnyRef with Runnable { + case class WakedActor(actor: MailBox, time: long, reason: String) extends Ordered[WakedActor] { + var valid = true + def compare [b >: WakedActor <% Ordered[b]](that: b): int = that match { + case that2: WakedActor => -(this.time compare that2.time) + case _ => error("not comparable") + } + } + + var queue = new PriorityQueue[WakedActor] + val t = new Thread(this); t.start + + var lateList: List[WakedActor] = Nil + + def trashRequest(a: MailBox) = synchronized { + // keep in mind: killing dead people is a bad idea! + queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match { + case Some(b) => + b.valid = false + case None => + lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match { + case Some(b2) => + b2.valid = false + case None => + } + } + } + + override def run = while (true) { + this.synchronized { + try { + val sleepTime = dequeueLateAndGetSleepTime + if (lateList.isEmpty) { + wait(sleepTime) + } + } catch { + case t: Throwable => { t.printStackTrace(); throw t } + } + } + + // process guys waiting for signal and empty list + for (val wa <- lateList) { + if (wa.valid) { + wa.actor send Signal() + } + } + lateList = Nil + } + + def requestSignal(a: Actor, waitMillis: long, reason: String): unit = this.synchronized { + Console.println("TTTT Actor "+a+" requests Signal in "+waitMillis +" ms for :"+reason) + val wakeTime = now + waitMillis + if (waitMillis < 0) { + a send Signal() + return + } + + if (queue.isEmpty) { // add to queue and restart sleeping + queue += WakedActor(a, wakeTime, reason) + notify() + } else { //queue full + if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping + queue += WakedActor (a, wakeTime, reason) + notify() + } else { // simply add to queue + + queue += WakedActor (a, wakeTime, reason) + } + } + } + + def requestTimeout(a: MailBox, waitMillis: long): unit = synchronized { + val wakeTime = now + waitMillis + if (waitMillis < 0) { + a send Signal() + return + } + + if (queue.isEmpty) { // add to queue and restart sleeping + queue += WakedActor(a, wakeTime, "") + notify() + } else + if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping + queue += WakedActor (a, wakeTime, "") + notify() + } + else // simply add to queue + queue += WakedActor (a, wakeTime, "") + } + + private def dequeueLateAndGetSleepTime: long = { + val FOREVER: long = 0 + var waitingList: List[WakedActor] = Nil + + while (!queue.isEmpty) { + val next = queue.max.time + val amount = next - now + if (amount > 0) { // guy in queue is not late + lateList = waitingList // give back the list of waiting guys for signaling + return amount + } + else // we're late: dequeue and examine next guy + waitingList = queue.dequeue :: waitingList + } + + // empty queue => sleep forever + lateList = waitingList + return FOREVER + } + + def now = new java.util.Date().getTime() +} + +//================================================================================ + +object TimerThreadTest { + def main (args:Array[String]) = { + new Tester (1000, "ONE").start + new Tester (500, "TWO").start + } + + class Tester (duration : int, name:String) extends Actor { + var i = 0 + + def loop:unit = { + receive { + case Signal() => + Console.println(name + i) + i = i+1; + loop + } + } + + override def run = { + for (val i <-List.range(1,10)) { + TimerThread.requestSignal(this, (duration * i).asInstanceOf[long], ""+duration*i) + } + loop + } + } +} diff --git a/src/actors/scala/actors/WorkerThread.scala b/src/actors/scala/actors/WorkerThread.scala new file mode 100644 index 0000000000..ab2ffbeb32 --- /dev/null +++ b/src/actors/scala/actors/WorkerThread.scala @@ -0,0 +1,34 @@ +package scala.actors; + +/** + * @author Philipp Haller + */ +class WorkerThread(sched: IScheduler) extends Thread { + private var task: Runnable = null + private var running = true + + def halt = synchronized { + running = false + notify() + } + + def execute(r: Runnable) = synchronized { + //Debug.info("WORK: " + this + ": Executing task " + r) + task = r + notify() + } + + override def run(): unit = synchronized { + while (running) { + if (task != null) { + task.run() + //Debug.info("WORK: " + this + " has finished.") + } + //Debug.info("WORK: " + this + ": Getting new task...") + task = sched.getTask(this) + //Debug.info("WORK (" + this + "): got task " + task) + if (task == sched.QUIT_TASK) running = false + else if (task == null) wait() + } + } +} diff --git a/src/actors/scala/actors/distributed/JXTAServiceBase.scala b/src/actors/scala/actors/distributed/JXTAServiceBase.scala new file mode 100644 index 0000000000..b3e2603974 --- /dev/null +++ b/src/actors/scala/actors/distributed/JXTAServiceBase.scala @@ -0,0 +1,9 @@ +package scala.actors.distributed; + +abstract class JXTAServiceBase(nodename: String) extends Thread with Service { + val serializer = new JavaSerializer(this); + private val internalNode = new JXTANode(nodename); + def node: Node = internalNode; + def createPid(actor: RemoteActor): RemotePid = + new JXTAPid(internalNode, makeUid, kernel, actor) +} diff --git a/src/actors/scala/actors/distributed/JavaSerializer.scala b/src/actors/scala/actors/distributed/JavaSerializer.scala new file mode 100644 index 0000000000..b25437ff86 --- /dev/null +++ b/src/actors/scala/actors/distributed/JavaSerializer.scala @@ -0,0 +1,34 @@ +package scala.actors.distributed; + +import java.io._; +import scala.collection.mutable._; + +import scala.actors.distributed.picklers.BytePickle.SPU; +import scala.actors.distributed.picklers._; +import scala.actors.multi._; + +[serializable] +class JavaSerializer(serv: Service) extends Serializer(serv) { + val debug = true; + + def log (s:String) = { + if (debug) Console.println("JAVASerializer: "+s) + } + + def serialize(o: AnyRef): Array[byte] = { + val bos = new ByteArrayOutputStream() + val out = new ObjectOutputStream(bos) + out.writeObject(o) + out.flush() + bos.toByteArray() + } + + def deserialize(bytes: Array[byte]): AnyRef = { + val bis = new ByteArrayInputStream(bytes); + val in = new ObjectInputStream(bis); + in.readObject(); + } + + def pid: SPU[Pid] = null; + def addRep(name: String, repCons: Serializer => AnyRef): unit = {}; +} diff --git a/src/actors/scala/actors/distributed/Messages.scala b/src/actors/scala/actors/distributed/Messages.scala new file mode 100644 index 0000000000..d8a663922a --- /dev/null +++ b/src/actors/scala/actors/distributed/Messages.scala @@ -0,0 +1,40 @@ +package scala.actors.distributed + +import scala.actors.multi.Pid +import scala.actors.multi.ExcHandlerDesc + +abstract class MessageTyper { + type DataType=Array[byte]; +} + +abstract class SystemMessage; +case class Send(rec: Pid, data: MessageTyper#DataType) extends SystemMessage; +case class Spawn(replyto: Pid, p: String) extends SystemMessage; +case class PidReply(res: Pid) extends SystemMessage; +case class Disconnect() extends SystemMessage; + +case class NodeDown() extends SystemMessage; + +// ACHTUNG: Tells "from" to create a _uni-directional_ link! +case class Link(from: Pid, to: Pid) extends SystemMessage; +case class UnLink(from: Pid, to: Pid) extends SystemMessage; +case class Exit1(from: Pid, to: Pid, reason: Symbol) extends SystemMessage; + +case class SpawnObject(replyto: Pid, data: MessageTyper#DataType) extends SystemMessage; + +case class NamedSend(sym: Symbol, data: MessageTyper#DataType) extends SystemMessage; + +case class ForwardExc(destDesc: ExcHandlerDesc, e: Throwable) extends SystemMessage; + +/* +case class NamedSendRep (ser:Serializer) extends TypeRep[NamedSend](ser) { + def serialize(content: NamedSend, w: java.io.Writer): unit = { + StringRep(ser).serialize(content.sym.name, w) + StringRep(ser).serialize(content.data, w) + } + def deserialize(r:java.io.Reader): NamedSend = { + NamedSend(Symbol(StringRep(ser).deserialize(r)), + StringRep(ser).deserialize(r)) + } +} +*/ diff --git a/src/actors/scala/actors/distributed/MessagesComb.scala b/src/actors/scala/actors/distributed/MessagesComb.scala new file mode 100644 index 0000000000..af3edc13a2 --- /dev/null +++ b/src/actors/scala/actors/distributed/MessagesComb.scala @@ -0,0 +1,36 @@ +package scala.actors.distributed; + +import scala.actors.distributed.picklers.BytePickle._; +import scala.actors.multi.Pid; + +object MessagesComb { + def sendPU(ser: Serializer): SPU[Send] = + wrap((p: Pair[Pid,Array[byte]]) => Send(p._1, p._2), + (s: Send) => Pair(s.rec, s.data), + pair(ser.pid, bytearray)); + + def spawnPU(ser: Serializer): SPU[Spawn] = + wrap((p: Pair[Pid,String]) => Spawn(p._1, p._2), + (s: Spawn) => Pair(s.replyto, s.p), + pair(ser.pid, string)); + + def symbolPU: SPU[Symbol] = + wrap((s: String) => Symbol(s), + (sym: Symbol) => sym.name, + string); + + def exitPU(ser: Serializer): SPU[Exit1] = + wrap((p: Triple[Pid,Pid,Symbol]) => Exit1(p._1, p._2, p._3), + (e: Exit1) => Triple(e.from, e.to, e.reason), + triple(ser.pid, ser.pid, symbolPU)); + + def spawnObjectPU(ser: Serializer): SPU[SpawnObject] = + wrap((p: Pair[Pid,Array[byte]]) => SpawnObject(p._1, p._2), + (s: SpawnObject) => Pair(s.replyto, s.data), + pair(ser.pid, bytearray)); + + def namedSendPU: SPU[NamedSend] = + wrap((p: Pair[Symbol,Array[byte]]) => NamedSend(p._1, p._2), + (ns: NamedSend) => Pair(ns.sym, ns.data), + pair(symbolPU, bytearray)); +} diff --git a/src/actors/scala/actors/distributed/Name.scala b/src/actors/scala/actors/distributed/Name.scala new file mode 100644 index 0000000000..02842f1501 --- /dev/null +++ b/src/actors/scala/actors/distributed/Name.scala @@ -0,0 +1,7 @@ +package scala.actors.distributed; + +case class Name(node: Node, sym: Symbol, kernel: NetKernel) { + def !(msg: AnyRef): unit = { + kernel.namedSend(this, msg) + } +} diff --git a/src/actors/scala/actors/distributed/NetKernel.scala b/src/actors/scala/actors/distributed/NetKernel.scala new file mode 100644 index 0000000000..28821350d6 --- /dev/null +++ b/src/actors/scala/actors/distributed/NetKernel.scala @@ -0,0 +1,591 @@ +package scala.actors.distributed + +import java.io.StringReader +import java.io.StringWriter +import java.util.logging._ + +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet + +import java.net.UnknownHostException +import java.io.IOException +import java.lang.SecurityException + +import scala.actors.multi.Actor +import scala.actors.multi.ExcHandlerDesc + +case class RA(a: RemoteActor); + +object NetKernel { + var kernel: NetKernel = null; +} + +class NetKernel(service: Service) { + NetKernel.kernel = this + + // contains constructors + private val ptable = + new HashMap[String, () => RemoteActor]; + + // maps local ids to scala.actors + private val rtable = + new HashMap[int, RemoteActor]; + + // maps scala.actors to their RemotePid + private val pidTable = + new HashMap[RemoteActor, RemotePid]; + + private var running = true; + + val logLevel = Level.FINE + val l = Logger.getLogger("NetKernel") + l.setLevel(logLevel) + val consHand = new ConsoleHandler + consHand.setLevel(logLevel) + l.addHandler(consHand) + + //start // start NetKernel + + /** only called if destDesc is local. */ + def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = { + destDesc.pid match { + case rpid: RemotePid => + (rtable get rpid.localId) match { + case Some(actor) => + actor.handleExc(destDesc, e) + case None => + error("exc desc refers to non-registered actor") + } + } + } + + def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { + // locality check (handler local to this node?) + destDesc.pid match { + case rpid: RemotePid => + if (rpid.node == this.node) + handleExc(destDesc, e) + else + sendToNode(rpid.node, ForwardExc(destDesc, e)) + } + } + + def sendToNode(node: Node, msg: AnyRef) = { + //val sw = new StringWriter + val bytes = service.serializer.serialize(msg /*, sw*/) + service.send(node, bytes) + //service.send(node, sw.toString()) + } + + def addConstructor(key: String, value: () => RemoteActor) = + ptable.update(key, value); + + def node: Node = service.node; + + def nodes: List[Node] = service.nodes; + + def pidOf(actor: RemoteActor): RemotePid = synchronized { + pidTable.get(actor) match { + case None => error("malformed pid table in " + this) + case Some(pid) => pid + } + } + + def disconnectNode(n: Node) = synchronized { + service.disconnectNode(n) + } + + def getLocalRef(locId: int): RemoteActor = + rtable.get(locId) match { + case None => + error("" + locId + " is not registered at " + this) + case Some(remoteActor: RemoteActor) => + remoteActor + } + + def localSend(localId: int, msg: AnyRef): unit = synchronized { + rtable.get(localId) match { + case None => + error("" + localId + " is not registered at " + this) + case Some(remoteActor: RemoteActor) => + //Console.println("local send to " + remoteActor) + remoteActor send msg + } + } + + def localSend(pid: RemotePid, msg: AnyRef): unit = + localSend(pid.localId, msg); + + def remoteSend(pid: RemotePid, msg: AnyRef) = synchronized { + //Console.println("NetKernel: Remote msg delivery to " + pid) + service.remoteSend(pid, msg) + } + + def namedSend(name: Name, msg: AnyRef): unit = { + if (name.node == this.node) { + // look-up name + nameTable.get(name.sym) match { + case None => + // message is lost + //Console.println("lost message " + msg + " because " + name + " not registered.") + case Some(localId) => localSend(localId, msg) + } + } + else { + // remote send + + // serialize msg + ///val sw = new StringWriter + val bytes = service.serializer.serialize(msg/*, sw*/) + + sendToNode(name.node, NamedSend(name.sym, bytes)) + //sendToNode(name.node, NamedSend(name.sym, sw.toString())) + } + } + + val nameTable = new HashMap[Symbol, int] + + def registerName(name: Symbol, pid: RemotePid): unit = synchronized { + nameTable += name -> pid.localId + } + + def registerName(name: Symbol, a: RemoteActor): unit = synchronized { + val pid = register(a) + registerName(name, pid) + a.start + } + + /*override def run: unit = receive { + case ForwardExc(destDesc, e) => + // TODO + case Spawn(reply: RemotePid, pname) => + val newPid = spawn(pname) + // need to send back the Pid + remoteSend(reply, newPid) + run + + case SpawnObject(reply: RemotePid, data: Array[byte]) => + //val sr = new StringReader(data) + //service.serializer.deserialize(sr) match { + + service.serializer.deserialize(data) match { + case RA(actor) => + val newPid = register(actor) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + actor.start + // need to send back the Pid + remoteSend(reply, newPid) + } + run + + case NamedSend(sym: Symbol, data) => + // look-up name + nameTable.get(sym) match { + case None => + // message is lost + Console.println("lost message " + data + " because " + sym + " not registered.") + case Some(localId) => + // deserialize data + //val sr = new StringReader(data) + //val msg = service.serializer.deserialize(sr) + + val msg = service.serializer.deserialize(data) + localSend(localId, msg) + } + run + + case Send(pid: RemotePid, data) => + // deserialize data + //val sr = new StringReader(data) + //val msg = service.serializer.deserialize(sr) + val msg = service.serializer.deserialize(data) + + Console.println("locally send " + msg + " to " + pid) + localSend(pid, msg) + run + + case Link(from:RemotePid, to:RemotePid) => + // assume from is local + linkFromLocal(from, to) + run + + case UnLink(from:RemotePid, to:RemotePid) => + // assume from is local + unlinkFromLocal(from, to) + run + + case Exit1(from:RemotePid, to:RemotePid, reason) => + // check if "to" traps exit signals + // if so send a message + if (trapExits.contains(to.localId)) + // convert signal into message + localSend(to, Exit1(from, to, reason)) + else + if (reason.name.equals("normal")) { + // ignore signal + } + else + exit(from, to, reason) + run + }*/ + + // TODO + /*def isReachable(remoteNode: Node): boolean = { + val pingMsg = new Ping(node) + val sw = new StringWriter + service.serializer.serialize(pingMsg, sw) + service.send(remoteNode, sw.toString()) + }*/ + + def processMsg(msg: AnyRef): unit = synchronized { + msg match { + case Spawn(reply: RemotePid, pname) => + val newPid = spawn(pname) + // need to send back the Pid + remoteSend(reply, newPid) + + case SpawnObject(reply: RemotePid, data) => + //val sr = new StringReader(data) + //service.serializer.deserialize(sr) match { + + service.serializer.deserialize(data) match { + case RA(actor) => + val newPid = register(actor) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + actor.start + // need to send back the Pid + remoteSend(reply, newPid) + } + + case NamedSend(sym: Symbol, data) => + // look-up name + nameTable.get(sym) match { + case None => + // message is lost + //Console.println("lost message " + msg + " because " + sym + " not registered.") + case Some(localId) => + // deserialize data + //val sr = new StringReader(data) + //val msg = service.serializer.deserialize(sr) + val msg = service.serializer.deserialize(data) + localSend(localId, msg) + } + + case Send(pid: RemotePid, data) => + // deserialize data + //val sr = new StringReader(data) + //val msg = service.serializer.deserialize(sr) + val msg = service.serializer.deserialize(data) + //Console.println("locally send " + msg + " to " + pid) + localSend(pid, msg) + + case Link(from:RemotePid, to:RemotePid) => + // assume from is local + linkFromLocal(from, to) + + case UnLink(from:RemotePid, to:RemotePid) => + // assume from is local + unlinkFromLocal(from, to) + + case Exit1(from:RemotePid, to:RemotePid, reason) => + // check if "to" traps exit signals + // if so send a message + if (trapExits.contains(to.localId)) { + // convert signal into message + // TODO: simpler message (w/o to) for actor! + localSend(to, Exit1(from, to, reason)) + } + else { + if (reason.name.equals("normal")) { + // ignore signal + } + else + exit(from, to, reason) + } + } + } + + /* Registers an instance of a remote actor inside this NetKernel. + */ + def register(newProc: RemoteActor): RemotePid = synchronized { + newProc.kernel = this + val newPid = service.createPid(newProc) + rtable += newPid.localId -> newProc + pidTable += newProc -> newPid + newPid + } + + // local spawn + def spawn(pname: String): RemotePid = synchronized { + // get constructor out of table + (ptable.get(pname)) match { + case None => + //error("No constructor found. Cannot start process.") + //null + + val newProc = Class.forName(pname).newInstance().asInstanceOf[RemoteActor] + // create Pid for remote communication and register process + val newPid = register(newProc) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + newProc.start + newPid + + case Some(cons) => + val newProc = cons() + // create Pid for remote communication and register process + val newPid = register(newProc) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + newProc.start + newPid + } + } + + // local spawn + def spawn(name: String, arg: RemotePid): RemotePid = synchronized { + val newPid = spawn(name) + localSend(newPid, arg) + newPid + } + + // assume this.node != node + def spawn(replyTo: RemotePid, node: Node, a: RemoteActor): unit = { + val ra = RA(a) + //val rsw = new StringWriter + //service.serializer.serialize(ra, rsw) + val bytes = service.serializer.serialize(ra) + //sendToNode(node, SpawnObject(replyTo, rsw.toString())) + sendToNode(node, SpawnObject(replyTo, bytes)) + } + + + def registerSerializer(index: String, rep: Serializer => AnyRef) = { + service.serializer.addRep(index, rep) + + // send registering requests to remote nodes + } + + // remote spawn + def spawn(replyTo: RemotePid, node: Node, name: String): RemotePid = synchronized { + // check if actor is to be spawned locally + if (node == this.node) { + val newPid = spawn(name) + newPid + } + else { + sendToNode(node, Spawn(replyTo, name)) + null // pid needs to be sent back + } + } + + /* Spawns a new actor (locally), executing "fun". + */ + def spawn(fun: RemoteActor => unit): RemotePid = synchronized { + val newProc = new RemoteActor { + override def run: unit = + fun(this); + } + + // create Pid for remote communication and register process + val newPid = register(newProc) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + newProc.start + newPid + } + + /* Spawns a new actor (locally), executing "fun". + */ + def spawnLink(pid: RemotePid, fun: RemoteActor => unit): RemotePid = synchronized { + val newProc = new RemoteActor { + override def run: unit = + fun(this); + } + + // create Pid for remote communication and register process + val newPid = register(newProc) + //Console.println("Spawned a new " + newProc + " (" + newPid + ")") + + // link new process to pid (assume pid is local) + link(pid, newPid) + + newProc.start + newPid + } + + // maps local ids to their linked pids + private val links = new HashMap[int,HashSet[RemotePid]]; + + // which of the local processes traps exit signals? + private val trapExits = new HashSet[int]; + + def processFlag(pid: RemotePid, flag: Symbol, set: boolean) = synchronized { + if (flag.name.equals("trapExit")) { + if (trapExits.contains(pid.localId) && !set) + trapExits -= pid.localId + else if (!trapExits.contains(pid.localId) && set) + trapExits += pid.localId + } + } + + // assume from.node == this.node + private def unlinkFromLocal(from: RemotePid, to: RemotePid): unit = + links.get(from.localId) match { + case None => + // has no links -> ignore + case Some(set) => + set -= to + if (set.size == 0) links -= from.localId + }; + + /* + unlinks bi-directional link + assume from.node == this.node + */ + def unlink(from: RemotePid, to: RemotePid): unit = synchronized { + unlinkFromLocal(from, to) + if (to.node == this.node) + unlinkFromLocal(to, from) + else + // (2) send message to NetKernel of "to" to unlink a + // uni-directional link from "to" to "from" + sendToNode(to.node, UnLink(to, from)) + } + + // assume from.node == this.node + private def linkFromLocal(from: RemotePid, to: RemotePid): unit = + // TODO: send Exit to from if to is invalid + links.get(from.localId) match { + case None => + // from has no links, yet + val linksTo = new HashSet[RemotePid] + linksTo += to + links += from.localId -> linksTo + case Some(set) => + set += to + }; + + /* + creates bi-directional link + assume from.node == this.node + */ + def link(from: RemotePid, to: RemotePid): unit = synchronized { + // (1) create locally a uni-directional link + linkFromLocal(from, to) + if (to.node == this.node) + linkFromLocal(to, from) + else + // (2) send message to NetKernel of "to" to create a + // uni-directional link from "to" to "from" + sendToNode(to.node, Link(to, from)) + } + + // Assume "to" is local. + def exit(from: RemotePid, to: RemotePid, reason: Symbol): unit = { + // remove link + unlinkFromLocal(to, from) + exit(to, reason) + } + + val exitMarks = new HashSet[RemotePid] + + /* + If reason is unequal to 'normal then + this will cause all linked processes to + (transitively) terminate abnormally. + + Assume pid is local. + */ + def exit(pid: RemotePid, reason: Symbol): unit = synchronized { + if (!(exitMarks contains pid)) { + exitMarks += pid // mark pid as exiting + //Console.println("" + pid + " is exiting (" + reason + ").") + + // first look-up remote actor in rtable + val actor = rtable(pid.localId) + // remove from table of running processes + rtable -= pid.localId + // remove from pid table + pidTable -= actor + + // send exit signals to linked processes + links.get(pid.localId) match { + case None => + //Console.println("no linked processes") + + case Some(set) => // set of remote pids that we want to terminate + //Console.println("sending exit signals to linked processes") + + val iter = set.elements + while (iter.hasNext) { + val linkedPid = iter.next + + unlinkFromLocal(pid, linkedPid) + + if (linkedPid.node == this.node) { + unlinkFromLocal(linkedPid, pid) + + if (trapExits.contains(linkedPid.localId)) + localSend(linkedPid, Exit1(pid, linkedPid, reason)) + else if (!reason.name.equals("normal")) + exit(linkedPid, reason) + } + else + sendToNode(linkedPid.node, + Exit1(pid, linkedPid, reason)) + } + exitMarks -= pid + } + } + } + + private val monNodes = + new HashMap[Node,HashMap[RemotePid,int]]; + + def monitorNode(client: RemotePid, mnode: Node, cond: boolean) = synchronized { + monNodes.get(mnode) match { + case None => + // nobody is monitoring this node + if (cond) { + val map = new HashMap[RemotePid,int] + map += client -> 1 + monNodes += mnode -> map + } + case Some(map) => + map.update(client, map(client) + (if (cond) 1 else -1)) + } + + // if no connection exists: + // try connecting, if it fails deliver nodedown msg + if (cond && !service.isConnected(mnode)) { + try { + service.connect(mnode) + } + catch { + case uhe: UnknownHostException => + nodeDown(mnode) + case ioe: IOException => + nodeDown(mnode) + case se: SecurityException => + nodeDown(mnode) + } + } + } + + def nodeDown(mnode: Node) = { + // send NodeDown msg to registered RemotePids + monNodes.get(mnode) match { + case None => + // nobody is monitoring this node + case Some(map) => + // iterate over keys (RemotePids of interested clients) + val iter = map.keys + while (iter.hasNext) { + val client = iter.next + for (val i <- List.range(0, map(client))) { + // send nodedown msg + client ! Pair(NodeDown(), mnode) + } + } + } + } + +} diff --git a/src/actors/scala/actors/distributed/Node.scala b/src/actors/scala/actors/distributed/Node.scala new file mode 100644 index 0000000000..f407bdfd21 --- /dev/null +++ b/src/actors/scala/actors/distributed/Node.scala @@ -0,0 +1,6 @@ +package scala.actors.distributed; + +[serializable] abstract class Node; + +[serializable] case class TcpNode(address: String, port: int) extends Node; +[serializable] case class JXTANode(name: String) extends Node; diff --git a/src/actors/scala/actors/distributed/NodeComb.scala b/src/actors/scala/actors/distributed/NodeComb.scala new file mode 100644 index 0000000000..c18540c6f9 --- /dev/null +++ b/src/actors/scala/actors/distributed/NodeComb.scala @@ -0,0 +1,12 @@ +package scala.actors.distributed; + +import scala.actors.distributed.picklers.BytePickle._; + +object NodeComb { + def tcpNodePU: SPU[TcpNode] = + wrap((p: Pair[String,int]) => TcpNode(p._1, p._2), + (n: TcpNode) => Pair(n.address, n.port), pair(string, nat)); + def jxtaNodePU: SPU[JXTANode] = + wrap((s: String) => JXTANode(s), + (n: JXTANode) => n.name, string); +} diff --git a/src/actors/scala/actors/distributed/RemoteActor.scala b/src/actors/scala/actors/distributed/RemoteActor.scala new file mode 100644 index 0000000000..ef3f98db70 --- /dev/null +++ b/src/actors/scala/actors/distributed/RemoteActor.scala @@ -0,0 +1,168 @@ +package scala.actors.distributed + +import scala.actors.multi.MailBox +import scala.actors.multi.Actor +import scala.actors.multi.Pid +import scala.actors.multi.LocalPid +import scala.actors.multi.ExcHandlerDesc + +import scala.collection.mutable.HashMap +import scala.collection.mutable.Stack + +abstract class ServiceName +case class JXTA(groupName: String) extends ServiceName +case class TCP() extends ServiceName + +class RemoteActor extends Actor { + override def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { + // locality check (handler local to this actor?) + if (destDesc.pid == self) + handleExc(destDesc, e) + else + kernel.forwardExc(destDesc, e) + } + + override def receive(f: PartialFunction[Message,unit]): scala.All = { + if (isAlive) { + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + try { + f(msg) + } + catch { + case d: Done => + throw new Done + case t: Throwable => + if (!excHandlerDescs.isEmpty) + forwardExc(excHandlerDescs.top, t) + else + die(Symbol(t.toString())) + } + die() + case None => + continuation = f + Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + var kernel: NetKernel = null; + + def node = + self.node + + def nodes = kernel.nodes + + private var selfCached: RemotePid = null + + override def self: RemotePid = { + if (selfCached == null) + selfCached = kernel pidOf this + selfCached + } + + def serialize(index: String, rep: Serializer => AnyRef) = + kernel.registerSerializer(index, rep); + + def alive(s: ServiceName): unit = { + var service: Service = null + s match { + case TCP() => + val serv = new TcpService(TcpService.generatePort) + service = serv + serv.start() + /*case JXTA(groupName) => + val serv = new ch.epfl.lamp.scala.actors.jxta.JXTAService("AliveActor" + + new java.util.Date().getTime() + "-" + + new java.util.Random().nextInt(1000), + java.util.logging.Level.FINEST) { + //millis before we give up group creation and create the group. + override def TIME_BEFORE_AUTO_GROUP_CREATE: long = 30000 + val PIPE_ID:String="1" + //val ADV_LIFETIME:long = 1 * 60 * 60 * 1000 //millis to keep advertisements ... + override def MY_GROUP_NAME:String = groupName + /*val SENDER_MESSAGE = "PalcomDemo"; //used to identify the message element in jxta messages + val PIPE_BASE_ID:String = "PIPE4Pal4"+MY_GROUP_NAME; + val MESSAGE_THRESHOLD = 5;*/ + } + service = serv + serv.start()*/ + case _ => throw new Exception ("Unknown Service in RemoteActor") + } + // create RemotePid + selfCached = service.kernel.register(this) + } + + def node(pid: Pid): Node = pid match { + case rpid: RemotePid => rpid.node + case lpid: LocalPid => null + } + + def disconnectNode(node: Node) = + kernel.disconnectNode(node); + //does not call start def of Actor + def register(name: Symbol, pid: RemotePid): unit = + kernel.registerName(name, pid); + + //calls start def of Actor + def register(name: Symbol, a: RemoteActor): unit = + kernel.registerName(name, a); + + def name(node: Node, sym: Symbol): Name = + Name(node, sym, kernel) + + def spawn(node: Node, name: String): RemotePid = + kernel.spawn(self, node, name); + + def spawn(node: Node, a: RemoteActor): unit = + kernel.spawn(self, node, a); + + def spawn(fun: RemoteActor => unit): RemotePid = + kernel.spawn(fun); + + def spawn(a: RemoteActor): RemotePid = { + val pid = kernel.register(a) + a.start + pid + } + + def spawnLink(fun: RemoteActor => unit): RemotePid = + kernel.spawnLink(self, fun); + + def monitorNode(node: Node, cond: boolean) = + kernel.monitorNode(self, node, cond); + + // this should be: + // self.link(pid) + // if self is RemotePid it will invoke NetKernel + + def link(pid: RemotePid): unit = + kernel.link(self, pid); + + def unlink(pid: RemotePid): unit = + kernel.unlink(self, pid); + + override def exit(reason: Symbol): unit = + kernel.exit(self, reason); + + override def processFlag(flag: Symbol, set: boolean) = + kernel.processFlag(self, flag, set); + + override def die(reason: Symbol) = { + if (isAlive) { + isAlive = false + Debug.info("" + this + " died.") + kernel.exit(self, reason) + } + } + + override def die() = { + if (isAlive) { + isAlive = false + Debug.info("" + this + " died.") + kernel.exit(self, 'normal) + } + } +} diff --git a/src/actors/scala/actors/distributed/RemotePid.scala b/src/actors/scala/actors/distributed/RemotePid.scala new file mode 100644 index 0000000000..bb70e3f4a5 --- /dev/null +++ b/src/actors/scala/actors/distributed/RemotePid.scala @@ -0,0 +1,184 @@ +package scala.actors.distributed + +import scala.actors.multi.Pid +import scala.actors.multi.MailBox +import scala.actors.multi.ExcHandlerDesc + +import java.io._ + +[serializable] +abstract class RemotePid(locId: int, kern: NetKernel, actor: RemoteActor) extends Pid { + def this() = this(0, null, null) // for serialization + + private var _locId = locId + + override def equals(that: Any) = that match { + case rpid: RemotePid => + (this.node == rpid.node && this.localId == rpid.localId) + case _ => false + } + + //[throws(classOf[IOException])] + private def writeObject(out: ObjectOutputStream): Unit = { + //Console.println("writing locID"+locId) + out.writeInt(locId) + } + + //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] + private def readObject(in: ObjectInputStream): Unit = { + _locId = in.readInt() + //Console.println("read _locID"+_locId) + } + + //[throws(classOf[ObjectStreamException])] + private def readResolve(): AnyRef = { + Console.println("readResolve") + null + //build nothing. Subclasses will do... + } + + def node: Node; + + def localId: int = locId; + + def kernel = kern; + + def !(msg: MailBox#Message): unit = { + //Console.println("! " + msg) + if (actor != null) + actor send msg + else + kernel.remoteSend(this, msg) + } + + def link(other: Pid): unit = + other match { + case rpid: RemotePid => + kernel.link(this, rpid) + }; + + //TODO (dont know if this is local to kernel.node) + def linkTo(other: Pid): unit = + other match { + case rpid: RemotePid => + // do nothing + }; + + def unlink(other: Pid): unit = + other match { + case rpid: RemotePid => + kernel.unlink(this, rpid) + }; + + //TODO (dont know if this is local to kernel.node) + def unlinkFrom(other: Pid): unit = + other match { + case rpid: RemotePid => + // do nothing + }; + + def exit(reason: Symbol): unit = kernel.exit(this, reason); + def exit(from: Pid, reason: Symbol): unit = { + from match { + case rpid: RemotePid => + kernel.exit(rpid, reason); + } + } + + def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = {} +} + +[serializable] case class TcpPid(n: TcpNode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) { + def node: TcpNode = n; + + private var _locId = locId + private var _node = n + + override def equals(that: Any) = + super.equals(that) + + //[throws(classOf[IOException])] + private def writeObject(out: ObjectOutputStream): Unit = { + out.writeInt(locId) + out.writeObject(n) + } + + //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] + private def readObject(in: ObjectInputStream): Unit = { + _locId = in.readInt() + _node = in.readObject().asInstanceOf[TcpNode] + } + + //[throws(classOf[ObjectStreamException])] + private def readResolve(): AnyRef = { + val kernel = NetKernel.kernel; + //TODO val actor = kernel.getLocalRef(_locId) + TcpPid(_node, _locId, kernel, actor) + } +} + +[serializable] case class JXTAPid(n: JXTANode, locId: int, kern: NetKernel, actor: RemoteActor) extends RemotePid(locId, kern, actor) { + def node: JXTANode = n; + + private var _locId = locId + private var _node = n + + override def equals(that: Any) = + super.equals(that) + + //[throws(classOf[IOException])] + private def writeObject(out: ObjectOutputStream): Unit = { + out.writeInt(locId) + out.writeObject(n) + } + + //[throws(classOf[IOException]), throws(classOf[ClassNotFoundException])] + private def readObject(in: ObjectInputStream): Unit = { + _locId = in.readInt() + _node = in.readObject().asInstanceOf[JXTANode] + } + + //[throws(classOf[ObjectStreamException])] + private def readResolve(): AnyRef = { + val kernel = NetKernel.kernel; + //TODO val actor = kernel.getLocalRef(_locId) + JXTAPid(_node, _locId, kernel, actor) + } +} + +//================================================================================ + +object CaseTest { + + def getBytes(obj: AnyRef): Array[byte] = { + val bos = new ByteArrayOutputStream() + val out = new ObjectOutputStream(bos) + out.writeObject(obj) + out.flush() + bos.toByteArray() + } + + def getObject(a: Array[byte]): AnyRef = { + val bis = new ByteArrayInputStream(a) + val in = new ObjectInputStream(bis) + val obj = in.readObject() + obj + } + + def main(args: Array[String]): Unit = { + val node = JXTANode ("test node"); + val pid1 = JXTAPid (node, 4, null, null); + val pid2 = JXTAPid (node, 4, new NetKernel(null), null); + + Console.println("node Before: " + node) + Console.println("node After : " + getObject(getBytes(node))) + + Console.println("pid1 Before: " + pid1) + Console.println("pid1 After : " + getObject(getBytes(pid1))) + + Console.println("pid2 Before: " + pid2) + Console.println("pid2 After : " + getObject(getBytes(pid2))) + + Console.println("pid2 After : " + getObject((new String (getBytes(pid2))).getBytes)) + } +} diff --git a/src/actors/scala/actors/distributed/Serializer.scala b/src/actors/scala/actors/distributed/Serializer.scala new file mode 100644 index 0000000000..868d996131 --- /dev/null +++ b/src/actors/scala/actors/distributed/Serializer.scala @@ -0,0 +1,53 @@ +package scala.actors.distributed; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.EOFException; + +import scala.actors.distributed.picklers.BytePickle.SPU; +import scala.actors.multi.Pid; + +abstract class Serializer(s: Service) { + def serialize(o: AnyRef/*, w: Writer*/): Array[byte]; + def deserialize(a: Array[byte]/*r: Reader*/): AnyRef; + + // throws IOException + def readBytes(inputStream: DataInputStream): Array[byte] = { + try { + val length = inputStream.readInt(); + val bytes = new Array[byte](length); + inputStream.readFully(bytes, 0, length); + return bytes; + } + catch { + case npe: NullPointerException => { + throw new EOFException("Connection closed."); + } + } + } + + // throws IOException, ClassNotFoundException + def readObject(inputStream: DataInputStream): AnyRef = { + val bytes = readBytes(inputStream); + deserialize(bytes); + } + + // throws IOException + def writeBytes(outputStream: DataOutputStream, bytes: Array[byte]): unit = { + val length = bytes.length; + // original length + outputStream.writeInt(length); + outputStream.write(bytes, 0, length); + outputStream.flush(); + } + + // throws IOException + def writeObject(outputStream: DataOutputStream, obj: AnyRef) = { + val bytes = serialize(obj); + writeBytes(outputStream, bytes); + } + + def pid: SPU[Pid]; + def service = s; + def addRep(name: String, repCons: Serializer => AnyRef): unit; +} diff --git a/src/actors/scala/actors/distributed/Service.scala b/src/actors/scala/actors/distributed/Service.scala new file mode 100644 index 0000000000..23a09a09c3 --- /dev/null +++ b/src/actors/scala/actors/distributed/Service.scala @@ -0,0 +1,72 @@ +package scala.actors.distributed; + +import java.io.StringWriter; + +trait Service { + val serializer: Serializer; + def node: Node; + def createPid(actor: RemoteActor): RemotePid; + def send(node: Node, data: Array[byte]): unit; + def connect(node: Node): unit; // non blocking. + def disconnectNode(node: Node): unit; + def isConnected(node: Node): boolean; + + //blocking. timeout depends on Implementation. + def isReachable(node: Node): boolean; + + def getRoundTripTimeMillis(node:Node): long; //blocking + + def nodes:List[Node] + +// implemented parts: + + private val kern = new NetKernel(this); + def kernel = kern; + + def spawn(name: String): RemotePid = + kern spawn name; + + def spawn(name: String, arg: RemotePid): RemotePid = + kern.spawn(name, arg); + + + //suggested addition by seb + def spawn(fun: RemoteActor => unit): RemotePid = + kernel.spawn(fun); + def spawn(a:RemoteActor):RemotePid = { + //Console.println("Service:spawn(RemoteActor)") + val pid = kernel.register(a) + //Console.println("RemoteActor("+a+") registered in kernel") + a.start + //Console.println("RemoteActor("+a+") started") + pid + } + + def send(pid: RemotePid, msg: AnyRef): unit = synchronized { + if (pid.node == this.node) + kernel.localSend(pid, msg) + else + kernel.remoteSend(pid, msg) + } + + def remoteSend(pid: RemotePid, msg: AnyRef): unit = synchronized { + //Console.println("Service: Sending " + msg + " to " + pid) + // lets try to serialize the message + //val sw = new StringWriter + //serializer.serialize(msg, sw) + val bytes = serializer.serialize(msg) + //val sendMsg = Send(pid, sw.toString()) + val sendMsg = Send(pid, bytes) + //val sw2 = new StringWriter + //serializer.serialize(sendMsg, sw2) + //send(pid.node, sw2.toString()) + val bytes2 = serializer.serialize(sendMsg) + send(pid.node, bytes2) + } + + private var idCnt = 0; + def makeUid = { idCnt = idCnt + 1; idCnt } + + + +} diff --git a/src/actors/scala/actors/distributed/TcpSerializerComb.scala b/src/actors/scala/actors/distributed/TcpSerializerComb.scala new file mode 100644 index 0000000000..9d806c8c9c --- /dev/null +++ b/src/actors/scala/actors/distributed/TcpSerializerComb.scala @@ -0,0 +1,126 @@ +package scala.actors.distributed; + +import java.io._; +import scala.collection.mutable._; + +import scala.actors.distributed.picklers.BytePickle._; + +import scala.actors.distributed.MessagesComb._; +import scala.actors.distributed.NodeComb._; +import scala.actors.multi._; + +//import scala.actors.distributed.examples.CounterMessagesComb._; + +//TODO: change Service to NetKernel in Serializer interface +class TcpSerializerComb(serv: Service) extends Serializer(serv) { + + private def lookup(typename: String): PU[AnyRef] = { + val op = table.get(typename) + op match { + case None => + error("No type representation found.") + null + case Some(rep) => + val repr = rep.asInstanceOf[PU[AnyRef]] + repr + } + } + + private def lookup(r: Reader): PU[AnyRef] = { + // read length of type name + val carr = new Array[char](8) + r.read(carr) + val len = Util.decode(new String(carr)) + val content = new Array[char](len) + r.read(content) + lookup(new String(content)) + } + + def pid: SPU[Pid] = { + val nodeIntPU = wrap((p: Pair[TcpNode,int]) => TcpPid(p._1, p._2, serv.kernel, + if (p._1 == serv.node) serv.kernel.getLocalRef(p._2) + else null), + (t: TcpPid) => Pair(t.node, t.localId), + pair(tcpNodePU, nat)); + + wrap((p:Pid) => p, (pid:Pid) => pid match { + case tpid: TcpPid => + tpid + case other => + error("no instance of TcpPid!!") + }, nodeIntPU) + } + + def anyRef: SPU[AnyRef] = + wrap((typename: String) => Class.forName(typename).newInstance().asInstanceOf[AnyRef], + (obj: AnyRef) => Util.baseName(obj), + string); + + def actorPU: SPU[RA] = + wrap((typename: String) => RA(Class.forName(typename).newInstance().asInstanceOf[RemoteActor]), + (obj: RA) => Util.baseName(obj.a), + string); + + val log = new Debug("TcpSerializerComb") + log.level = 3 + + val table = new HashMap[String, AnyRef] + + initialize + def initialize = { + table += "int" -> nat + table += "Send" -> sendPU(this) + table += "Spawn" -> spawnPU(this) + table += "TcpNode" -> tcpNodePU + table += "TcpPid" -> pid + table += "Exit" -> exitPU(this) + table += "AnyRef" -> anyRef + + table += "RA" -> actorPU + table += "SpawnObject" -> spawnObjectPU(this) + + //table += "Incr" -> incrPU(this) + //table += "Value" -> valuePU(this) + //table += "Result" -> resultPU(this) + } + + def addRep(name: String, repCons: Serializer => AnyRef) = + table.update(name, repCons(this)); + + def +=(name: String) = + new InternalMapTo(name); + + class InternalMapTo(name: String) { + def ->(repCons: Serializer => AnyRef): unit = + table.update(name, repCons(TcpSerializerComb.this)); + } + + def serialize(o: AnyRef): Array[byte] = { + log.info("pickling value of type " + Util.baseName(o)); + val op = table.get(Util.baseName(o)); + op match { + case None => error("No type representation for " + Util.baseName(o) + " found. Cannot serialize."); + case Some(rep) => + // first write type name + val bytes = pickle(string, Util.baseName(o)) + val repr = rep.asInstanceOf[SPU[AnyRef]]; + log.info("using type representation " + repr); + val res = repr.appP(o, new PicklerState(bytes, new PicklerEnv)).stream + res + } + + } + + def deserialize(bytes: Array[byte]): AnyRef = { + val ups = string.appU(new UnPicklerState(bytes, new UnPicklerEnv)) + val typename = ups._1 + table.get(typename) match { + case None => error("No type representation for " + typename + " found. Cannot deserialize.") + case Some(rep) => + val repr = rep.asInstanceOf[SPU[AnyRef]]; + val obj = repr.appU(ups._2)._1 + log.info("unpickling successful") + obj + } + } +} diff --git a/src/actors/scala/actors/distributed/TcpService.scala b/src/actors/scala/actors/distributed/TcpService.scala new file mode 100644 index 0000000000..d714eba314 --- /dev/null +++ b/src/actors/scala/actors/distributed/TcpService.scala @@ -0,0 +1,215 @@ +package scala.actors.distributed; + +import java.net._; +import java.io._; + +import java.util.logging._; + +object TcpService { + val random = new java.util.Random(0) + + def generatePort: int = { + var portnum = 0 + try { + portnum = 8000 + random.nextInt(500) + val socket = new ServerSocket(portnum) + socket.close() + } + catch { + case ioe: IOException => + // this happens when trying to open a socket twice at the same port + // try again + generatePort + case se: SecurityException => + // do nothing + } + portnum + } +} + +object TestPorts { + def main(args: Array[String]): unit = { + val random = new java.util.Random(0) + val socket = new ServerSocket(8000 + random.nextInt(500)) + Console.println(TcpService.generatePort) + } +} + +class TcpService(port: int) extends Thread with Service { + val serializer: JavaSerializer = new JavaSerializer(this); + + private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port); + def node: TcpNode = internalNode; + + def createPid(actor: RemoteActor): RemotePid = + new TcpPid(internalNode, makeUid, kernel, actor) + + def send(node: Node, data: String): unit = synchronized { + // retrieve worker thread (if any) that already has connection + node match { + case tnode: TcpNode => + getConnection(tnode) match { + case None => + // we are not connected, yet + Console.println("We are not connected, yet."); + val newWorker = connect(tnode); //bad in a sync BLOCK!!! + newWorker transmit data + case Some(worker) => worker transmit data + } + case any => error("no TcpNode!"); + } + } + + def send(node: Node, data: Array[byte]): unit = synchronized { + // retrieve worker thread (if any) that already has connection + node match { + case tnode: TcpNode => + getConnection(tnode) match { + case None => + // we are not connected, yet + Console.println("We are not connected, yet."); + val newWorker = connect(tnode); //bad in a sync BLOCK!!! + newWorker transmit data + case Some(worker) => worker transmit data + } + case any => error("no TcpNode!"); + } + } + + override def run(): unit = { + try { + val socket = new ServerSocket(port); + Console.println("Tcp Service started: " + node); + + while (true) { + val nextClient = socket.accept(); + Console.println("Received request from " + nextClient.getInetAddress() + ":" + nextClient.getPort()); + + // this is bad because client will have other port than actual node + // solution: worker should read node from stream + // and call main thread to update connection table + + // spawn new worker thread + val worker = new TcpServiceWorker(this, nextClient); + worker.readNode; + // start worker thread + worker.start() + } + } + catch { + case ioe:IOException => { + // do nothing + } + case sec:SecurityException => { + // do nothing + } + } + } + + // connection management + + private val connections = new scala.collection.mutable.HashMap[TcpNode,TcpServiceWorker]; + + def nodes:List[Node] = throw new Exception ("nodes need to be implemented in TcpService!") + + def addConnection(n: TcpNode, w: TcpServiceWorker) = synchronized { + connections += n -> w + } + + def getConnection(n: TcpNode) = synchronized { + connections.get(n) + } + + def isConnected(n: Node): boolean = synchronized { + n match { + case tnode: TcpNode => + connections.get(tnode) match { + case None => false + case Some(x) => true + } + case _ => false + } + } + + def connect(n: Node): unit = synchronized { + n match { + case tnode: TcpNode => + connect(tnode) + } + } + + def connect(n: TcpNode): TcpServiceWorker = synchronized { + Console.println("" + node + ": Connecting to node " + n + " ...") + val sock = new Socket(n.address, n.port) + Console.println("Connected.") + // spawn new worker thread + val worker = new TcpServiceWorker(this, sock) + worker.sendNode; + // start worker thread + worker.start() + // register locally (we want to reuse connections which correspond to connected sockets) + // update connection table + addConnection(n, worker) + worker + } + + def disconnectNode(n: Node) = synchronized { + n match { + case node: TcpNode => + Console.println("Disconnecting from " + node + " ...") + connections.get(node) match { + case None => Console.println("Cannot disconnect from " + node + ". Not connected.") + case Some(worker) => + //TODO: sending disconnect message + //worker.sendDisconnect; + // update table + connections -= node + Console.println("Halting worker...") + worker.halt + } + case any => error("No TcpNode!!"); + } + } + + def isReachable(node: Node): boolean = + if (isConnected(node)) true + else try { + connect(node) + return true + } + catch { + case uhe: UnknownHostException => + false + case ioe: IOException => + false + case se: SecurityException => + false + } + + def getRoundTripTimeMillis(node: Node): long = 0 + + def nodeDown(mnode: TcpNode): unit = synchronized { + kernel nodeDown mnode + connections -= mnode + } + + /*def closeConnection(worker: TcpServiceWorker): unit = synchronized { + connections.get(worker) match { + case None => + System.err.println("Worker " + worker + " not registered."); + case Some(socket) => { + try { + socket.close(); + connections -= worker + } + catch { + case ioe:IOException => + System.err.println("Couldn't close connection."); + connections -= worker + } + } + }; + System.out.println("OK. Connection closed.") + }*/ + +} diff --git a/src/actors/scala/actors/distributed/TcpServiceWorker.scala b/src/actors/scala/actors/distributed/TcpServiceWorker.scala new file mode 100644 index 0000000000..4346b09d61 --- /dev/null +++ b/src/actors/scala/actors/distributed/TcpServiceWorker.scala @@ -0,0 +1,86 @@ +package scala.actors.distributed; + +import java.net._; +import java.io._; + +class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread { + val in = so.getInputStream(); + val out = so.getOutputStream(); + + val datain = new DataInputStream(in); + val dataout = new DataOutputStream(out); + + val reader = new BufferedReader(new InputStreamReader(in)); + val writer = new PrintWriter(new OutputStreamWriter(out)); + + val log = new Debug("TcpServiceWorker") + log.level = 2 + + def transmit(msg: Send): unit = synchronized { + val data = parent.serializer.serialize(msg); + transmit(data); + } + + def transmit(data: String): unit = synchronized { + log.info("Transmitting " + data) + writer.write(data) + writer.flush() + } + + def transmit(data: Array[byte]): unit = synchronized { + log.info("Transmitting " + data) + dataout.writeInt(data.length); + dataout.write(data) + dataout.flush() + } + + def sendNode = { + Console.println("Sending our name " + parent.node); + parent.serializer.writeObject(dataout, parent.node); + } + + var connectedNode: TcpNode = _; + + def readNode = { + Console.println("" + parent.node + ": Reading node name..."); + //val node = parent.serializer.deserialize(reader); + val node = parent.serializer.readObject(datain); + Console.println("Connection from " + node); + node match { + case n: TcpNode => { + connectedNode = n + Console.println("Adding connection to " + node + " to table."); + parent.addConnection(n, this) + } + } + } + + var running = true; + def halt = synchronized { + so.close(); // close socket + running = false; // stop + } + + override def run(): unit = { + try { + while (running) { + if (in.available() > 0) { + log.info("deserializing..."); + //val msg = parent.serializer.deserialize(reader); + val msg = parent.serializer.readObject(datain); + log.info("Received object: " + msg); + parent.kernel.processMsg(msg) + } + } + } + catch { + case ioe:IOException => + Console.println("" + ioe + " while reading from socket."); + parent nodeDown connectedNode + case e:Exception => + // catch-all + Console.println("" + e + " while reading from socket."); + parent nodeDown connectedNode + } + } +} diff --git a/src/actors/scala/actors/distributed/Util.scala b/src/actors/scala/actors/distributed/Util.scala new file mode 100644 index 0000000000..a54e455edf --- /dev/null +++ b/src/actors/scala/actors/distributed/Util.scala @@ -0,0 +1,58 @@ +package scala.actors.distributed; + +import java.io._; +import scala.collection.mutable._; + +object Util { + def pad(s: String, req: int): String = { + val buf = new StringBuffer; + val add: int = req - s.length(); + for (val i <- List.range(1, add+1)) + buf append "0"; + buf append s; + buf.toString() + } + + def encode(i: int) = pad(Integer.toHexString(i), 8); + def encode(l: long) = pad(java.lang.Long.toHexString(l), 16); + def decode(s: String): int = Integer.decode("0x" + s).intValue(); + def decodeLong(s: String): long = java.lang.Long.decode("0x" + s).longValue(); + + def baseName(o: Any) = { + val s = o.toString(); + + def baseName(s: String): String = { + if (s.indexOf('$') != -1) + baseName(s.substring(0,s.indexOf('$'))) + else if (s.indexOf('(') != -1) + baseName(s.substring(0,s.indexOf('('))) + else if (s.indexOf('@') != -1) + baseName(s.substring(0,s.indexOf('@'))) + else s + } + + baseName(s) + } + + def extractArgs(s: String): Buffer[String] = { + // extract strings between first-level commas + var level: int = 0; + val carr: Array[char] = s.toCharArray(); + var buf = new StringBuffer; // current string + val args = new ArrayBuffer[String]; + + for (val i <- List.range(0,carr.length)) { + if ((level == 0) && (carr(i) == ',')) { + // argument finished + args += buf.toString(); + buf = new StringBuffer + } else { + if (carr(i) == '(') level = level + 1; + if (carr(i) == ')') level = level - 1; + buf append carr(i) + } + } + args += buf.toString(); + args + } +} diff --git a/src/actors/scala/actors/distributed/picklers/BytePickle.scala b/src/actors/scala/actors/distributed/picklers/BytePickle.scala new file mode 100644 index 0000000000..796fc9a353 --- /dev/null +++ b/src/actors/scala/actors/distributed/picklers/BytePickle.scala @@ -0,0 +1,436 @@ +package scala.actors.distributed.picklers + +import scala.collection.mutable.HashMap +import scala.collection.mutable.ArrayBuffer + +/** + Pickler combinators. + + Author: Philipp Haller + */ + +object BytePickle { + class PicklerState(val stream: Array[byte], val dict: PicklerEnv) {} + class UnPicklerState(val stream: Array[byte], val dict: UnPicklerEnv) {} + + abstract class PU[t] { + def appP(a: t, state: Array[byte]): Array[byte]; + def appU(state: Array[byte]): Pair[t, Array[byte]]; + } + + abstract class SPU[t] { + def appP(a: t, state: PicklerState): PicklerState; + def appU(state: UnPicklerState): Pair[t, UnPicklerState]; + } + + class PicklerEnv extends HashMap[Any, int] { + private var cnt: int = 64; + def nextLoc() = { cnt = cnt + 1; cnt }; + } + + class UnPicklerEnv extends HashMap[int, Any] { + private var cnt: int = 64; + def nextLoc() = { cnt = cnt + 1; cnt }; + } + + abstract class RefDef; + case class Ref() extends RefDef; + case class Def() extends RefDef; + + def refDef: PU[RefDef] = new PU[RefDef] { + def appP(b: RefDef, s: Array[byte]): Array[byte] = + b match { + case Ref() => Array.concat(s, (List[byte](0)).toArray) + case Def() => Array.concat(s, (List[byte](1)).toArray) + }; + def appU(s: Array[byte]): Pair[RefDef, Array[byte]] = + if (s(0) == 0) Pair(Ref(), s.subArray(1, s.length)) + else Pair(Def(), s.subArray(1, s.length)); + } + + val REF = 0 + val DEF = 1 + + def unat: PU[int] = new PU[int] { + def appP(n: int, s: Array[byte]): Array[byte] = + Array.concat(s, nat2Bytes(n)); + def appU(s: Array[byte]): Pair[int, Array[byte]] = { + var num = 0 + def readNat: int = { + var b = 0; + var x = 0; + do { + b = s(num) + num = num + 1 + x = (x << 7) + (b & 0x7f); + } while ((b & 0x80) != 0); + x + } + Pair(readNat, s.subArray(num, s.length)) + } + } + + def share[a](pa: SPU[a]): SPU[a] = new SPU[a] { + def appP(v: a, state: PicklerState): PicklerState = { + /* + - is there some value equal to v associated with a location l in the pickle environment? + - yes: write REF-tag to outstream together with l + - no: + write DEF-tag to outstream + record current location l of outstream + --> serialize value + add entry to pickle environment, mapping v onto l + */ + val pe = state.dict + pe.get(v) match { + case None => + //Console.println("" + v + " is new") + //Console.println("writing DEF...") + val sPrime = refDef.appP(Def(), state.stream) + val l = pe.nextLoc() + + //Console.println("applying pickler to state " + sPrime) + val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe)) + + //Console.println("updating dict (" + l + ") for " + v) + pe.update(v, l) + + return sPrimePrime + case Some(l) => + //Console.println("writing REF...") + val sPrime = refDef.appP(Ref(), state.stream) + + //Console.println("writing location to state " + sPrime) + return new PicklerState(unat.appP(l, sPrime), pe) + } + } + def appU(state: UnPicklerState): Pair[a, UnPicklerState] = { + /* + - first, read tag (i.e. DEF or REF) + - if REF: + read location l + look up resulting value in unpickler environment + - if DEF: + record location l of input stream + --> deserialize value v with argument deserializer + add entry to unpickler environment, mapping l onto v + */ + val upe = state.dict + val res = refDef.appU(state.stream) + res._1 match { + case Def() => + val l = upe.nextLoc + val res2 = pa.appU(new UnPicklerState(res._2, upe)) + upe.update(l, res2._1) + return res2 + case Ref() => + val res2 = unat.appU(res._2) // read location + upe.get(res2._1) match { // lookup value in unpickler env + case None => error("invalid unpickler environment"); return null + case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe)) + } + } + } + } + + def upickle[t](p: PU[t], a: t): Array[byte] = + p.appP(a, new Array[byte](0)); + + def uunpickle[t](p: PU[t], stream: Array[byte]): t = + p.appU(stream)._1; + + def pickle[t](p: SPU[t], a: t): Array[byte] = + p.appP(a, new PicklerState(new Array[byte](0), new PicklerEnv)).stream; + + def unpickle[t](p: SPU[t], stream: Array[byte]): t = + p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1; + + def ulift[t](x: t): PU[t] = new PU[t] { + def appP(a: t, state: Array[byte]): Array[byte] = + if (x != a) { error("value to be pickled (" + a + ") != " + x); state } + else state; + def appU(state: Array[byte]) = Pair(x, state); + } + + def lift[t](x: t): SPU[t] = new SPU[t] { + def appP(a: t, state: PicklerState): PicklerState = + if (x != a) { /*error("value to be pickled (" + a + ") != " + x);*/ state } + else state; + def appU(state: UnPicklerState) = Pair(x, state); + } + + def usequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] { + def appP(b: u, s: Array[byte]): Array[byte] = { + val a = f(b) + val sPrime = pa.appP(a, s) + val pb = k(a) + val sPrimePrime = pb.appP(b, sPrime) + sPrimePrime + } + def appU(s: Array[byte]): Pair[u, Array[byte]] = { + val resPa = pa.appU(s) + val a = resPa._1 + val sPrime = resPa._2 + val pb = k(a) + pb.appU(sPrime) + } + } + + def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] { + def appP(b: u, s: PicklerState): PicklerState = { + val a = f(b) + //Console.println("pickling " + a + ", s: " + s.stream) + val sPrime = pa.appP(a, s) + val pb = k(a) + //Console.println("pickling " + b + ", s: " + s.stream) + pb.appP(b, sPrime) + } + def appU(s: UnPicklerState): Pair[u, UnPicklerState] = { + val resPa = pa.appU(s) + val a = resPa._1 + val sPrime = resPa._2 + val pb = k(a) + pb.appU(sPrime) + } + } + + def upair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = { + def fst(p: Pair[a,b]): a = p._1; + def snd(p: Pair[a,b]): b = p._2; + usequ(fst, pa, (x: a) => usequ(snd, pb, (y: b) => ulift(Pair(x, y)))) + } + + def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = { + def fst(p: Pair[a,b]): a = p._1; + def snd(p: Pair[a,b]): b = p._2; + sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) + } + + def triple[a,b,c](pa: SPU[a], pb: SPU[b], pc: SPU[c]): SPU[Triple[a,b,c]] = { + def fst(p: Triple[a,b,c]): a = p._1; + def snd(p: Triple[a,b,c]): b = p._2; + def trd(p: Triple[a,b,c]): c = p._3; + + sequ(fst, pa, + (x: a) => sequ(snd, pb, + (y: b) => sequ(trd, pc, + (z: c) => lift(Triple(x, y, z))))) + } + + def uwrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] = + usequ(j, pa, (x: a) => ulift(i(x))); + + def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] = + sequ(j, pa, (x: a) => lift(i(x))); + + def appendByte(a: Array[byte], b: int): Array[byte] = { + Array.concat(a, (List[byte](b.asInstanceOf[byte])).toArray) + } + + def nat2Bytes(x: int): Array[byte] = { + val buf = new ArrayBuffer[byte] + def writeNatPrefix(x: int): unit = { + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf += ((x & 0x7f) | 0x80).asInstanceOf[byte]; + } + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf += (x & 0x7f).asInstanceOf[byte]; + buf.toArray + } + + def nat: SPU[int] = new SPU[int] { + def appP(n: int, s: PicklerState): PicklerState = { + new PicklerState(Array.concat(s.stream, nat2Bytes(n)), s.dict); + } + def appU(s: UnPicklerState): Pair[int,UnPicklerState] = { + var num = 0 + def readNat: int = { + var b = 0; + var x = 0; + do { + b = s.stream(num) + num = num + 1 + x = (x << 7) + (b & 0x7f); + } while ((b & 0x80) != 0); + x + } + Pair(readNat, new UnPicklerState(s.stream.subArray(num, s.stream.length), s.dict)) + } + } + + def byte: SPU[byte] = new SPU[byte] { + def appP(b: byte, s: PicklerState): PicklerState = + new PicklerState(Array.concat(s.stream, (List[byte](b)).toArray), s.dict); + def appU(s: UnPicklerState): Pair[byte, UnPicklerState] = + Pair(s.stream(0), new UnPicklerState(s.stream.subArray(1, s.stream.length), s.dict)); + } + + def string: SPU[String] = + share(wrap((a:Array[byte]) => UTF8Codec.decode(a, 0, a.length), (s:String) => UTF8Codec.encode(s), bytearray)); + + def bytearray: SPU[Array[byte]] = { + wrap((l:List[byte]) => l.toArray, .toList, list(byte)) + } + + def bool: SPU[boolean] = { + def toEnum(b: boolean) = if (b) 1 else 0; + def fromEnum(n: int) = if (n == 0) false else true; + wrap(fromEnum, toEnum, nat) + } + + def ufixedList[a](pa: PU[a])(n: int): PU[List[a]] = { + def pairToList(p: Pair[a,List[a]]): List[a] = + p._1 :: p._2; + def listToPair(l: List[a]): Pair[a,List[a]] = + l match { case x :: xs => Pair(x, xs) } + + if (n == 0) ulift(Nil) + else + uwrap(pairToList, listToPair, upair(pa, ufixedList(pa)(n-1))) + } + + def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = { + def pairToList(p: Pair[a,List[a]]): List[a] = + p._1 :: p._2; + def listToPair(l: List[a]): Pair[a,List[a]] = + l match { case x :: xs => Pair(x, xs) } + + if (n == 0) lift(Nil) + else + wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) + } + + def list[a](pa: SPU[a]): SPU[List[a]] = + sequ((l: List[a])=>l.length, nat, fixedList(pa)); + + def ulist[a](pa: PU[a]): PU[List[a]] = + usequ((l:List[a]) => l.length, unat, ufixedList(pa)); + + def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] = + sequ(tag, nat, (x: int)=> ps.apply(x)()); + + def printByteArray(a: Array[byte]) = { + val iter = a.elements + while (iter.hasNext) { + val el = iter.next + Console.print("" + el + ", ") + } + } + + def main(args: Array[String]) = { + // test nat2Bytes + + Console.println(printByteArray(nat2Bytes(1))) + Console.println(printByteArray(nat2Bytes(10))) + Console.println(printByteArray(nat2Bytes(16))) + Console.println(printByteArray(nat2Bytes(256))) + + Console.println(100000) + var res = pickle(nat, 100000) + Console.println(printByteArray(res)) + var up = unpickle(nat, res) + Console.println(up) + + // -- int list + val intList = List(1, 7, 13) + Console.println(intList) + val res9 = pickle(list(nat), intList) + Console.println(printByteArray(res9)) + val up9 = unpickle(list(nat), res9) + Console.println(up9) + + // --------------- + // -- boolean list + val bList = List(false, true, true) + Console.println(bList) + val res2 = pickle(list(bool), bList) + Console.println(printByteArray(res2)) + val up2 = unpickle(list(bool), res2) + Console.println(up2) + + // -- string + val s = "Hello" + Console.println(s) + val res3 = pickle(string, s) + Console.println(printByteArray(res3)) + val up3 = unpickle(string, res3) + Console.println(up3) + + val personPU = wrap((p:Pair[String,int]) => Person(p._1, p._2), (p:Person) => Pair(p.name, p.age), pair(string, nat)); + val p = Person("Philipp", 25) + Console.println(p) + val res4 = pickle(personPU, p) + Console.println(printByteArray(res4)) + val up4 = unpickle(personPU, res4) + Console.println(up4) + + val x = Var("x"); + val i = Lam("x", x); + val k = Lam("x", Lam("y", x)); + val kki = App(k, App(k, i)); + + /*def varPU: PU[Term] = wrap(Var, + (t: Term)=> t match {case Var(x)=>x}, + string); + def lamPU: PU[Term] = wrap(p: Pair[String,Term]=>Lam(p._1, p._2), + (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, + pair(string, termPU)); + def appPU: PU[Term] = wrap(p: Pair[Term,Term]=>App(p._1, p._2), + (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, + pair(termPU, termPU)); + def termPU: PU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, + List(()=>varPU, ()=>lamPU, ()=>appPU)); + + Console.println("\n" + k); + val res5 = pickle(termPU, k); + Console.println(res5); + val up5 = unpickle(termPU, res5); + Console.println(up5); + + Console.println("\n" + kki); + val res6 = pickle(termPU, kki); + Console.println(res6); + Console.println("len: " + res6.length) + val up6 = unpickle(termPU, res6); + Console.println(up6);*/ + + def varSPU: SPU[Term] = wrap(Var, + (t: Term)=> t match {case Var(x)=>x}, + string); + + def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2), + (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, + pair(string, termSPU)); + + def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2), + (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, + pair(termSPU, termSPU)); + + def termSPU: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, + List(()=>varSPU, ()=>lamSPU, ()=>appSPU))); + + Console.println("\n" + k); + val res8 = pickle(termSPU, k); + Console.println(printByteArray(res8)); + Console.println("len: " + res8.length) + val up8 = unpickle(termSPU, res8); + Console.println(up8); + + Console.println("\n" + kki); + val res7 = pickle(termSPU, kki); + Console.println(printByteArray(res7)); + Console.println("len: " + res7.length) + + val up7 = unpickle(termSPU, res7); + Console.println(up7); + } + + case class Person(name: String, age: int); + + abstract class Term; + case class Var(s: String) extends Term; + case class Lam(s: String, t: Term) extends Term; + case class App(t1: Term, t2: Term) extends Term; +} diff --git a/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala new file mode 100644 index 0000000000..c678e5acf8 --- /dev/null +++ b/src/actors/scala/actors/distributed/picklers/SStreamPickle.scala @@ -0,0 +1,519 @@ +package scala.actors.distributed.picklers; + +import scala.collection.mutable.HashMap; + +import java.io.StringReader; +import java.io.StringWriter; + +/** + Pickler combinators. + + Author: Philipp Haller + */ + +object SStreamPickle { + abstract class PU[t] { + def appP(a: t, state: OutStream): OutStream; + def appU(state: InStream): Pair[t,InStream]; + } + + //def pickle[t](p: PU[t], a: t): OutStream = + // p.appP(a, ""); + + def unpickle[t](p: PU[t], stream: InStream): t = + p.appU(stream)._1; + + def lift[t](x: t): PU[t] = new PU[t] { + def appP(a: t, state: OutStream): OutStream = state; + def appU(state: InStream) = Pair(x, state); + } + + def sequ[t,u](f: u => t, pa: PU[t], k: t => PU[u]): PU[u] = new PU[u] { + def appP(b: u, s: OutStream): OutStream = { + val a = f(b) + val sPrime = pa.appP(a, s) + val pb = k(a) + val sPrimePrime = pb.appP(b, sPrime) + sPrimePrime + } + def appU(s: InStream): Pair[u,InStream] = { + val resPa = pa.appU(s) + val a = resPa._1 + val sPrime = resPa._2 + val pb = k(a) + pb.appU(sPrime) + } + } + + def pair[a,b](pa: PU[a], pb: PU[b]): PU[Pair[a,b]] = { + def fst(p: Pair[a,b]): a = p._1; + def snd(p: Pair[a,b]): b = p._2; + sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) + } + + def triple[a,b,c](pa: PU[a], pb: PU[b], pc: PU[c]): PU[Triple[a,b,c]] = { + def fst(p: Triple[a,b,c]): a = p._1; + def snd(p: Triple[a,b,c]): b = p._2; + def trd(p: Triple[a,b,c]): c = p._3; + + sequ(fst, pa, + (x: a) => sequ(snd, pb, + (y: b) => sequ(trd, pc, + (z: c) => lift(Triple(x, y, z))))) + } + + def wrap[a,b](i: a => b, j: b => a, pa: PU[a]): PU[b] = + sequ(j, pa, (x: a) => lift(i(x))); + + def unit: PU[unit] = + lift(unit); + + def pad(s: String, req: int): String = { + val buf = new StringBuffer + for (val i <- List.range(1, req-s.length+1)) + buf append "0" + (buf append s).toString + } + def encode(i: int): String = pad(Integer.toHexString(i), 8); + def decode(s: String): int = Integer.decode("0x" + s).intValue(); + + def int: PU[int] = new PU[int] { + def appP(n: int, s: OutStream): OutStream = { + s.write(encode(n)) + s + } + def appU(s: InStream): Pair[int,InStream] = { + val substr = s.read(8) + //Console.println("unpickling " + substr) + Pair(decode(substr), s) + } + } + + def char: PU[char] = new PU[char] { + def appP(b: char, s: OutStream): OutStream = { + s.write(b) + s + } + def appU(s: InStream): Pair[char,InStream] = { + val carr = new Array[char](1) + s.read(carr) + //Console.println("unpickling " + carr(0)) + Pair(carr(0), s) + } + } + + def bool: PU[boolean] = { + def toEnum(b: boolean) = if (b) 1 else 0; + def fromEnum(n: int) = if (n == 0) false else true; + wrap(fromEnum, toEnum, nat) + } + + def fixedList[a](pa: PU[a])(n: int): PU[List[a]] = { + def pairToList(p: Pair[a,List[a]]): List[a] = + p._1 :: p._2; + def listToPair(l: List[a]): Pair[a,List[a]] = + l match { case x :: xs => Pair(x, xs) } + + if (n == 0) lift(Nil) + else + wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) + } + + def list[a](pa: PU[a]): PU[List[a]] = + sequ((l: List[a])=>l.length, nat, fixedList(pa)); + + def string: PU[String] = + wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char)); + + def alt[a](tag: a => int, ps: List[PU[a]]): PU[a] = + sequ(tag, int, ps.apply); + + def data[a](tag: a => int, ps: List[()=>PU[a]]): PU[a] = + sequ(tag, nat, (x: int)=> ps.apply(x)()); + + def option[a](pa: PU[a]): PU[Option[a]] = { + def tag(x: Option[a]) = x match { + case None => 0 + case Some(y) => 1 + } + def fromSome(x: Option[a]) = x match { + case Some(y) => y + case None => null + } + def toSome(x: a): Option[a] = Some(x); + val pnone: PU[Option[a]] = lift(None) + alt(tag, List(pnone, wrap(toSome, fromSome, pa))) + } + + def byteString(b: int) = + pad(Integer.toHexString(b), 2); + + def natString(x: int): String = { + val buf = new StringBuffer + + def writeNatPrefix(x: int): unit = { + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf.append(byteString((x & 0x7f) | 0x80)); + } + + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf.append(byteString(x & 0x7f)); + buf.toString() + } + + def nat: PU[int] = new PU[int] { + def appP(n: int, s: OutStream): OutStream = { + s.write(natString(n)) + s + } + def appU(s: InStream): Pair[int,InStream] = { + def readNat: int = { + var b = 0; + var x = 0; + do { + b = decode(s.read(2)); + x = (x << 7) + (b & 0x7f); + } while ((b & 0x80) != 0); + x + } + Pair(readNat, s) + } + } + + def main(args: Array[String]) = { + def testBase128(x: int) = { + Console.println(x) + + val sw = new StringWriter + val os = new OutStream(sw) + val res = nat.appP(x, os) + os.flush() + Console.println(sw.toString()) + + val up = nat.appU(new InStream(new StringReader(sw.toString()))) + Console.println(up._1) + } + + testBase128(0) + testBase128(1) + testBase128(64) + testBase128(127) + testBase128(128) + testBase128(8192) + + def pickleTest[a](x: a, pa: PU[a]) = { + Console.println(x) + + val sw = new StringWriter + val os = new OutStream(sw) + val res = pa.appP(x, os) + os.flush() + Console.println(sw.toString()) + + val up = pa.appU(new InStream(new StringReader(sw.toString()))) + Console.println(up._1) + } + + pickleTest(List(1, 7, 13), list(nat)) + + pickleTest(List(false, true, true), list(bool)) + + pickleTest("Hello", string) + + + val personPU = wrap((p: Pair[String,int]) => Person(p._1, p._2), (p: Person) => Pair(p.name, p.age), pair(string, nat)); + val p = Person("Philipp", 25) + pickleTest(p, personPU) + + + val x = Var("x"); + val i = Lam("x", x); + val k = Lam("x", Lam("y", x)); + val kki = App(k, App(k, i)); + + def varPU: PU[Term] = + wrap(Var, + (t: Term)=> t match {case Var(x)=>x}, + string); + def lamPU: PU[Term] = + wrap((p: Pair[String,Term])=>Lam(p._1, p._2), + (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, + pair(string, termPU)); + def appPU: PU[Term] = + wrap((p: Pair[Term,Term])=>App(p._1, p._2), + (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, + pair(termPU, termPU)); + def termPU: PU[Term] = + data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, + List(()=>varPU, ()=>lamPU, ()=>appPU)); + + pickleTest(k, termPU) + pickleTest(kki, termPU) + } + + case class Person(name: String, age: int); +} + +abstract class Term; +case class Var(s: String) extends Term; +case class Lam(s: String, t: Term) extends Term; +case class App(t1: Term, t2: Term) extends Term; + + +object ShareStreamPickle { + abstract class SPU[t] { + def appP(a: t, state: PicklerState): PicklerState; + def appU(state: UnPicklerState): Pair[t, UnPicklerState]; + } + + //def pickle[t](p: SPU[t], a: t): String = + // p.appP(a, new PicklerState("", new PicklerEnv)).stream; + + //def unpickle[t](p: SPU[t], stream: String): t = + // p.appU(new UnPicklerState(stream, new UnPicklerEnv))._1; + + class PicklerEnv extends HashMap[Any, int] { + private var cnt: int = 64; + def nextLoc() = { cnt = cnt + 1; cnt }; + } + + class UnPicklerEnv extends HashMap[int, Any] { + private var cnt: int = 64; + def nextLoc() = { cnt = cnt + 1; cnt }; + } + + class PicklerState(val stream: OutStream, val dict: PicklerEnv) {} + class UnPicklerState(val stream: InStream, val dict: UnPicklerEnv) {} + + abstract class RefDef; + case class Ref() extends RefDef; + case class Def() extends RefDef; + + def refDef: SStreamPickle.PU[RefDef] = new SStreamPickle.PU[RefDef] { + def appP(b: RefDef, s: OutStream): OutStream = + b match { + case Ref() => s.write("0"); s + case Def() => s.write("1"); s + }; + def appU(s: InStream): Pair[RefDef, InStream] = + if (s.readChar == '0') Pair(Ref(), s) + else Pair(Def(), s); + } + + def share[a](pa: SPU[a]): SPU[a] = new SPU[a] { + def appP(v: a, state: PicklerState): PicklerState = { + /* + - is there some value equal to v associated with a location l in the pickle environment? + - yes: write REF-tag to outstream together with l + - no: + write DEF-tag to outstream + record current location l of outstream + --> serialize value + add entry to pickle environment, mapping v onto l + */ + val pe = state.dict + pe.get(v) match { + case None => + //Console.println("" + v + " is new") + //Console.println("writing DEF...") + val sPrime = refDef.appP(Def(), state.stream) + val l = pe.nextLoc() + + //Console.println("applying pickler to state " + sPrime) + val sPrimePrime = pa.appP(v, new PicklerState(sPrime, pe)) + + //Console.println("updating dict (" + l + ") for " + v) + pe.update(v, l) + + return sPrimePrime + case Some(l) => + //Console.println("writing REF...") + val sPrime = refDef.appP(Ref(), state.stream) + //Console.println("writing location to state " + sPrime) + return new PicklerState(SStreamPickle.nat.appP(l, sPrime), pe) + } + } + def appU(state: UnPicklerState): Pair[a, UnPicklerState] = { + /* + - first, read tag (i.e. DEF or REF) + - if REF: + read location l + look up resulting value in unpickler environment + - if DEF: + record location l of input stream + --> deserialize value v with argument deserializer + add entry to unpickler environment, mapping l onto v + */ + val upe = state.dict + val res = refDef.appU(state.stream) + res._1 match { + case Def() => + val l = upe.nextLoc + val res2 = pa.appU(new UnPicklerState(res._2, upe)) + upe.update(l, res2._1) + return res2 + case Ref() => + val res2 = SStreamPickle.nat.appU(res._2) // read location + upe.get(res2._1) match { // lookup value in unpickler env + case None => error("invalid unpickler environment"); return null + case Some(v) => return Pair(v.asInstanceOf[a], new UnPicklerState(res2._2, upe)) + } + } + } + } + + def lift[t](x: t): SPU[t] = new SPU[t] { + def appP(a: t, state: PicklerState): PicklerState = state; + def appU(state: UnPicklerState) = Pair(x, state); + } + + def sequ[t,u](f: u => t, pa: SPU[t], k: t => SPU[u]): SPU[u] = new SPU[u] { + def appP(b: u, s: PicklerState): PicklerState = { + val a = f(b) + //Console.println("pickling " + a + ", s: " + s.stream) + val sPrime = pa.appP(a, s) + val pb = k(a) + //Console.println("pickling " + b + ", s: " + s.stream) + pb.appP(b, sPrime) + } + def appU(s: UnPicklerState): Pair[u, UnPicklerState] = { + val resPa = pa.appU(s) + val a = resPa._1 + val sPrime = resPa._2 + val pb = k(a) + pb.appU(sPrime) + } + } + + def pair[a,b](pa: SPU[a], pb: SPU[b]): SPU[Pair[a,b]] = { + def fst(p: Pair[a,b]): a = p._1; + def snd(p: Pair[a,b]): b = p._2; + sequ(fst, pa, (x: a) => sequ(snd, pb, (y: b) => lift(Pair(x, y)))) + } + + def wrap[a,b](i: a => b, j: b => a, pa: SPU[a]): SPU[b] = + sequ(j, pa, (x: a) => lift(i(x))); + + def char: SPU[char] = new SPU[char] { + def appP(b: char, s: PicklerState): PicklerState = { + s.stream.write(b) + new PicklerState(s.stream, s.dict) + } + def appU(s: UnPicklerState): Pair[char, UnPicklerState] = + Pair(s.stream.readChar, new UnPicklerState(s.stream, s.dict)); + } + + def pad(s: String, req: int): String = { + val buf = new StringBuffer + for (val i <- List.range(1, req-s.length+1)) + buf append "0" + (buf append s).toString + } + def encode(i: int): String = pad(Integer.toHexString(i), 8); + def decode(s: String): int = Integer.decode("0x" + s).intValue(); + + def byteString(b: int) = + pad(Integer.toHexString(b), 2); + + def natString(x: int): String = { + val buf = new StringBuffer + + def writeNatPrefix(x: int): unit = { + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf.append(byteString((x & 0x7f) | 0x80)); + } + + val y = x >>> 7; + if (y != 0) writeNatPrefix(y); + buf.append(byteString(x & 0x7f)); + buf.toString() + } + + def nat: SPU[int] = new SPU[int] { + def appP(n: int, s: PicklerState): PicklerState = { + s.stream.write(natString(n)) + new PicklerState(s.stream, s.dict) + } + def appU(s: UnPicklerState): Pair[int,UnPicklerState] = { + def readNat: int = { + var b = 0; + var x = 0; + do { + b = decode(s.stream.read(2)); + x = (x << 7) + (b & 0x7f); + } while ((b & 0x80) != 0); + x + } + Pair(readNat, new UnPicklerState(s.stream, s.dict)) + } + } + + def fixedList[a](pa: SPU[a])(n: int): SPU[List[a]] = { + def pairToList(p: Pair[a,List[a]]): List[a] = + p._1 :: p._2; + def listToPair(l: List[a]): Pair[a,List[a]] = + l match { case x :: xs => Pair(x, xs) } + + if (n == 0) lift(Nil) + else + wrap(pairToList, listToPair, pair(pa, fixedList(pa)(n-1))) + } + + def list[a](pa: SPU[a]): SPU[List[a]] = + sequ((l: List[a])=>l.length, nat, fixedList(pa)); + + def string: SPU[String] = + wrap(List.toString, (str: String)=>str.toCharArray().toList, list(char)); + + def alt[a](tag: a => int, ps: List[SPU[a]]): SPU[a] = + sequ(tag, nat, ps.apply); + + def data[a](tag: a => int, ps: List[()=>SPU[a]]): SPU[a] = + sequ(tag, nat, (x: int)=> ps.apply(x)()); + + def main(args: Array[String]) = { + def pickleTest[a](x: a, pa: SPU[a]) = { + Console.println(x) + + val sw = new StringWriter + val os = new OutStream(sw) + val res = pa.appP(x, new PicklerState(os, new PicklerEnv)) + os.flush() + Console.println(sw.toString()) + + val up = pa.appU(new UnPicklerState(new InStream(new StringReader(sw.toString())), new UnPicklerEnv)) + Console.println(up._1) + } + + val x = Var("x"); + val i = Lam("x", x); + val k = Lam("x", Lam("y", x)); + val kki = App(k, App(k, i)); + + def varSPU: SPU[Term] = wrap(Var, + (t: Term)=> t match {case Var(x)=>x}, + string); + + def lamSPU: SPU[Term] = wrap((p: Pair[String,Term])=>Lam(p._1, p._2), + (t: Term)=> t match {case Lam(s, t)=>Pair(s, t)}, + pair(string, termSPU)); + + def appSPU: SPU[Term] = wrap((p: Pair[Term,Term])=>App(p._1, p._2), + (t: Term)=> t match {case App(t1, t2)=>Pair(t1, t2)}, + pair(termSPU, termSPU)); + + def termSPU: SPU[Term] = data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, + List(()=>varSPU, ()=>lamSPU, ()=>appSPU)); + + def termSPUshared: SPU[Term] = share(data((t: Term)=> t match {case Var(_)=>0; case Lam(_,_)=>1; case App(_,_)=>2}, + List(()=>varSPU, ()=>lamSPU, ()=>appSPU))); + + pickleTest(k, termSPU) + pickleTest(k, termSPUshared) + pickleTest(kki, termSPU) + pickleTest(kki, termSPUshared) + } +} diff --git a/src/actors/scala/actors/distributed/picklers/Streams.scala b/src/actors/scala/actors/distributed/picklers/Streams.scala new file mode 100644 index 0000000000..fa294785ab --- /dev/null +++ b/src/actors/scala/actors/distributed/picklers/Streams.scala @@ -0,0 +1,87 @@ +package scala.actors.distributed.picklers; + +import java.io.Reader; +import java.io.Writer; +import java.io.BufferedReader; +import java.io.BufferedWriter; + +import scala.collection.mutable._; + +class OutStream(writer: Writer) { + val picklerEnv = new PicklerEnv; + + private var loc: int = 0; + + def getLocation = loc; + + def write(s: String): unit = { + loc = loc + s.length() + writer.write(s) + //Console.println("new loc: " + loc) + } + + def write(c: char): unit = { + loc = loc + 1 + writer.write(c) + //Console.println("new loc: " + loc) + } + + def flush(): unit = + writer.flush(); +} + +class InStream(reader: Reader) { + val unpicklerEnv = new UnpicklerEnv; + + private var loc: int = 0; + + def getLocation = loc; + + def read(num: int): String = { + val carr = new Array[char](num) + val cnt = reader.read(carr) + loc = loc + num + //Console.println("new loc: " + loc) + new String(carr) + + /*val buf = new StringBuffer + var ch = r.read() + loc = loc + 1 + if (num > 1) { + var cnt = 1 + while (cnt < num && ch != -1) { + buf.append(ch) + ch = r.read() + loc = loc + 1 + cnt = cnt + 1 + } + if (cnt == num) + buf.toString() + else + buf.toString() // error + } + else + new String(ch.asInstanceOf[char])*/ + } + + def read(cbuf: Array[char]): int = { + loc = loc + cbuf.length + reader.read(cbuf) + } + + def readChar: char = { + val carr = new Array[char](1) + read(carr) + carr(0) + } +} + +class PicklerEnv[a] extends HashMap[a, int] { + private var cnt: int = 0; + def nextLoc() = { cnt = cnt + 1; cnt }; +} + +class UnpicklerEnv[a] extends HashMap[int, a] { + private var cnt: int = 0; + def nextLoc() = { cnt = cnt + 1; cnt }; +} diff --git a/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala new file mode 100644 index 0000000000..36ad115cdd --- /dev/null +++ b/src/actors/scala/actors/distributed/picklers/UTF8Codec.scala @@ -0,0 +1,77 @@ +/* ____ ____ ____ ____ ______ *\ +** / __// __ \/ __// __ \/ ____/ SOcos COmpiles Scala ** +** __\_ \/ /_/ / /__/ /_/ /\_ \ (c) 2002-2006, LAMP/EPFL ** +** /_____/\____/\___/\____/____/ ** +\* */ + +// $Id: UTF8Codec.scala 7116 2006-04-11 15:36:05Z mihaylov $ + + +package scala.actors.distributed.picklers + +object UTF8Codec { + + def encode(src: Array[Char], from: Int, dst: Array[Byte], to: Int, len: Int): Int = { + var i = from; + var j = to; + val end = from + len; + while (i < end) { + val ch = src(i); + i = i + 1; + if (ch < 128) { + dst(j) = ch.toByte; + j = j + 1; + } + else if (ch <= 0x3FF) { + dst(j) = (0xC0 | (ch >> 6)).toByte; + dst(j+1) = (0x80 | (ch & 0x3F)).toByte; + j = j + 2; + } else { + dst(j) = (0xE0 | (ch >> 12)).toByte; + dst(j+1) = (0x80 | ((ch >> 6) & 0x3F)).toByte; + dst(j+2) = (0x80 | (ch & 0x3F)).toByte; + j = j + 3; + } + } + j + } + + def encode(s: String, dst: Array[Byte], to: Int): Int = + encode(s.toCharArray(), 0, dst, to, s.length()); + + + def encode(s: String): Array[Byte] = { + val dst = new Array[Byte](s.length() * 3); + val len = encode(s, dst, 0); + dst.subArray(0, len) + } + + def decode(src: Array[Byte], from: Int, + dst: Array[Char], to: Int, len: Int): Int = + { + var i = from; + var j = to; + val end = from + len; + while (i < end) { + var b = src(i) & 0xFF; + i = i + 1; + if (b >= 0xE0) { + b = ((b & 0x0F) << 12) | (src(i) & 0x3F) << 6; + b = b | (src(i+1) & 0x3F); + i = i + 2; + } else if (b >= 0xC0) { + b = ((b & 0x1F) << 6) | (src(i) & 0x3F); + i = i + 1; + } + dst(j) = b.toChar; + j = j + 1; + } + j + } + + def decode(src: Array[Byte], from: Int, len: Int): String = { + val cs = new Array[Char](len); + String.copyValueOf(cs, 0, decode(src, 0, cs, 0, len)); + } + +} diff --git a/src/actors/scala/actors/gui/Button.scala b/src/actors/scala/actors/gui/Button.scala new file mode 100644 index 0000000000..99b74d27be --- /dev/null +++ b/src/actors/scala/actors/gui/Button.scala @@ -0,0 +1,20 @@ +package scala.actors.gui + +import javax.swing._ +import event._ + +/** A class for buttons; standard constructor wraps around a swing button */ +class Button(val jbutton: JButton) extends Container(jbutton) with SwingComponent with Publisher { + def this(txt: String) = this(new JButton(txt)) + def this() = this(new JButton()) + def text: String = jbutton.getText() + def text_=(s: String) = jbutton.setText(s) + def icon: Icon = jbutton.getIcon() + def icon_=(i: Icon) = jbutton.setIcon(i) + jbutton.addActionListener { + new java.awt.event.ActionListener { + def actionPerformed(e: java.awt.event.ActionEvent): unit = + publish(ButtonPressed(Button.this)) + } + } +} diff --git a/src/actors/scala/actors/gui/Caret.scala b/src/actors/scala/actors/gui/Caret.scala new file mode 100644 index 0000000000..05cfc2659a --- /dev/null +++ b/src/actors/scala/actors/gui/Caret.scala @@ -0,0 +1,8 @@ +package scala.actors.gui; + +import javax.swing + +class Caret(val jcaret: swing.text.Caret) { + def dot: int = jcaret.getDot() + def mark: int = jcaret.getMark() +} diff --git a/src/actors/scala/actors/gui/Component.scala b/src/actors/scala/actors/gui/Component.scala new file mode 100644 index 0000000000..9610b433fa --- /dev/null +++ b/src/actors/scala/actors/gui/Component.scala @@ -0,0 +1,9 @@ +package scala.actors.gui; + +import javax.swing._; +import java.awt._; + +class Component(val acomponent: java.awt.Component) extends Subscriber { + def show: this.type = { acomponent.setVisible(true); this } +} + diff --git a/src/actors/scala/actors/gui/Container.scala b/src/actors/scala/actors/gui/Container.scala new file mode 100644 index 0000000000..e30ac78caa --- /dev/null +++ b/src/actors/scala/actors/gui/Container.scala @@ -0,0 +1,18 @@ +package scala.actors.gui; + +import javax.swing._ +import scala.collection.mutable.ListBuffer + +class Container(val jcontainer: java.awt.Container) extends Component(jcontainer) { + def this() = this(new java.awt.Container()) + val elems = new ListBuffer[Component] + def += (c: Component) = { + elems += c + jcontainer.add(c.acomponent) + } + def -= (c: Component) = { + elems -= c + jcontainer.remove(c.acomponent) + } +} + diff --git a/src/actors/scala/actors/gui/EmptyBorder.scala b/src/actors/scala/actors/gui/EmptyBorder.scala new file mode 100644 index 0000000000..01ff7481cb --- /dev/null +++ b/src/actors/scala/actors/gui/EmptyBorder.scala @@ -0,0 +1,8 @@ +package scala.actors.gui; + +import javax.swing._ + +class EmptyBorder(_top: int, _left: int, _bottom: int, _right: int) +extends border.EmptyBorder(_top, _left, _bottom, _right) { + def this() = this(0, 0, 0, 0) +} diff --git a/src/actors/scala/actors/gui/FormattedTextField.scala b/src/actors/scala/actors/gui/FormattedTextField.scala new file mode 100644 index 0000000000..74b0c6e359 --- /dev/null +++ b/src/actors/scala/actors/gui/FormattedTextField.scala @@ -0,0 +1,9 @@ +package scala.actors.gui; + +import javax.swing._ +import java.awt.event._ +import event._ + +class FormattedTextField(val jftextfield: JFormattedTextField) extends TextComponent(jftextfield) { + def this(format: java.text.Format) = this(new JFormattedTextField(format)); +} diff --git a/src/actors/scala/actors/gui/Frame.scala b/src/actors/scala/actors/gui/Frame.scala new file mode 100644 index 0000000000..10b79f54c3 --- /dev/null +++ b/src/actors/scala/actors/gui/Frame.scala @@ -0,0 +1,33 @@ +package scala.actors.gui; + +import javax.swing._; +import event._; + +class Frame(val jframe: JFrame) extends Container(jframe) with Publisher { + def this() = this(new JFrame("Untitled Frame")) + def title: String = jframe.getTitle() + def title_=(s: String) = jframe.setTitle(s) + val contents = new Container(jframe.getContentPane()) + private var default_button: Button = null + def defaultButton = default_button + def defaultButton_=(b: Button) = { default_button = b; jframe.getRootPane().setDefaultButton(b.jbutton) } + def pack: this.type = { jframe.pack(); this } + jframe.addWindowListener { + new java.awt.event.WindowListener { + def windowActivated(e: java.awt.event.WindowEvent) = publish(WindowActivated(Frame.this)) + def windowClosed(e: java.awt.event.WindowEvent) = publish(WindowClosed(Frame.this)) + def windowClosing(e: java.awt.event.WindowEvent) = publish(WindowClosing(Frame.this)) + def windowDeactivated(e: java.awt.event.WindowEvent) = publish(WindowDeactivated(Frame.this)) + def windowDeiconified(e: java.awt.event.WindowEvent) = publish(WindowDeiconified(Frame.this)) + def windowIconified(e: java.awt.event.WindowEvent) = publish(WindowIconified(Frame.this)) + def windowOpened(e: java.awt.event.WindowEvent) = publish(WindowOpened(Frame.this)) + } + } + + /*jframe.addMouseMotionListener ( + new java.awt.event.MouseMotionListener { + def mouseDragged(e: java.awt.event.MouseEvent) = publish(MouseDragged(e)) + def mouseMoved(e: java.awt.event.MouseEvent) = publish(MouseMoved(e)) + } + )*/ +} diff --git a/src/actors/scala/actors/gui/GUIApplication.scala b/src/actors/scala/actors/gui/GUIApplication.scala new file mode 100644 index 0000000000..7d52e75628 --- /dev/null +++ b/src/actors/scala/actors/gui/GUIApplication.scala @@ -0,0 +1,20 @@ +package scala.actors.gui + +import javax.swing._ +import event.Event + +class GUIApplication { + def defaultLookAndFeelDecorated: boolean = true + + def init() = { + UIManager.setLookAndFeel(UIManager.getSystemLookAndFeelClassName()) + JFrame.setDefaultLookAndFeelDecorated(defaultLookAndFeelDecorated) + } + + def run(prog: => unit): unit = + SwingUtilities.invokeLater { + new Runnable() { + def run() = { init(); prog } + } + } +} diff --git a/src/actors/scala/actors/gui/Label.scala b/src/actors/scala/actors/gui/Label.scala new file mode 100644 index 0000000000..5094f9436c --- /dev/null +++ b/src/actors/scala/actors/gui/Label.scala @@ -0,0 +1,14 @@ +package scala.actors.gui; + +import javax.swing._; + +class Label(val jlabel: JLabel) extends Container(jlabel) with SwingComponent { + def this(txt: String) = this(new JLabel(txt)) + def this() = this("Untitled Label") + def text: String = jlabel.getText() + def text_=(s: String) = jlabel.setText(s) + def halign: Orientation.Value = Orientation(jlabel.getHorizontalAlignment()) + def halign_=(x: Orientation.Value) = jlabel.setHorizontalAlignment(x.id) + def valign: Orientation.Value = Orientation(jlabel.getVerticalAlignment()) + def valign_=(x: Orientation.Value) = jlabel.setVerticalAlignment(x.id) +} diff --git a/src/actors/scala/actors/gui/MainFrame.scala b/src/actors/scala/actors/gui/MainFrame.scala new file mode 100644 index 0000000000..b76a170e71 --- /dev/null +++ b/src/actors/scala/actors/gui/MainFrame.scala @@ -0,0 +1,14 @@ +package scala.actors.gui; + +import javax.swing._; +import scala.actors.gui.event._; + +class MainFrame(jframe: JFrame) extends Frame(jframe) { + def this() = this(new JFrame("Untitled Frame")) + + addHandler { + case WindowClosing(_) => System.exit(1) + } + + subscribe(this) +} diff --git a/src/actors/scala/actors/gui/Orientation.scala b/src/actors/scala/actors/gui/Orientation.scala new file mode 100644 index 0000000000..a71157380a --- /dev/null +++ b/src/actors/scala/actors/gui/Orientation.scala @@ -0,0 +1,11 @@ +package scala.actors.gui + +import javax.swing.SwingConstants._ + +object Orientation extends Enumeration { + val left = Value(LEFT, "left") + val right = Value(RIGHT, "right") + val bottom = Value(BOTTOM, "bottom") + val top = Value(TOP, "top") + val center = Value(CENTER, "center") +} diff --git a/src/actors/scala/actors/gui/Panel.scala b/src/actors/scala/actors/gui/Panel.scala new file mode 100644 index 0000000000..59cbff6784 --- /dev/null +++ b/src/actors/scala/actors/gui/Panel.scala @@ -0,0 +1,15 @@ +package scala.actors.gui; + +import javax.swing._ +import java.awt.event._ + +class Panel(val jpanel: JPanel) extends Container(jpanel) with SwingComponent { + def this(layout: java.awt.LayoutManager, elements: Component*) = { + this(new JPanel(layout)); + for (val elem <- elements) this += elem + } + def this(elements: Component*) = this(new java.awt.FlowLayout, elements: _*) + + def layout: java.awt.LayoutManager = jpanel.getLayout() + def layout_=(x: java.awt.LayoutManager) = jpanel.setLayout(x) +} diff --git a/src/actors/scala/actors/gui/Publisher.scala b/src/actors/scala/actors/gui/Publisher.scala new file mode 100644 index 0000000000..0b71bc30f6 --- /dev/null +++ b/src/actors/scala/actors/gui/Publisher.scala @@ -0,0 +1,126 @@ +package scala.actors.gui + +import scala.collection.mutable.ListBuffer + +import scala.actors.single.Actor +import scala.actors.single.Pid + +import scala.actors.gui.event.Event + +class EventHandlers { + type Handler = PartialFunction[AnyRef,unit] + + private val handlers = new ListBuffer[Handler] + + def += (h: Handler) = { handlers += h } + def -= (h: Handler) = { handlers -= h } + + def compoundHandler = new Handler { + def isDefinedAt(e: AnyRef): boolean = handlers.exists(.isDefinedAt(e)) + + def apply(e: AnyRef): unit = + handlers.find(.isDefinedAt(e)) match { + case Some(h) => h.apply(e) + case None => // do nothing + } + } +} + +trait Responder extends Actor { + protected val handlers = new EventHandlers + + final def eventloop(f: PartialFunction[Message,unit]): scala.All = + receive(new RecursiveProxyHandler(this, f)) + + def eventblock(f: PartialFunction[Message,unit]): unit = { + try { + receive(new RecursiveProxyHandler(this, f)) + } + catch { + case d: Done => + // do nothing + } + } + + private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] { + def isDefinedAt(m: Message): boolean = + true // events should be removed from the mailbox immediately! + + def apply(m: Message): unit = { + if (f.isDefinedAt(m)) f(m) // overrides any installed handler + else + if (handlers.compoundHandler.isDefinedAt(m)) + handlers.compoundHandler(m) + else { + // do nothing + } + a receive this + } + } +} + +case class Subscribe(s: Subscriber) +case class Publish(e: Event) + +trait Subscriber extends Responder { + type Handler = PartialFunction[AnyRef,unit] + def subscribe(ps: Publisher*) = for (val p <- ps) p send Subscribe(this) +} + +trait Publisher extends Responder { + case class HandlerAdded() + + private val subscribers = new ListBuffer[Subscriber] + + handlers += { // installs _permanent_ handler! + case Subscribe(s) => + //Console.println("" + this + ": rec subscription from " + s) + subscribers += s + case Publish(e) => for (val s <- subscribers) s send e + } + + //Console.println("" + this + ": exec toplevel eventloop (Publisher)") + + eventblock { + case HandlerAdded() => + //Console.println("" + this + " received HandlerAdded()") + } + + def addHandler(h: EventHandlers#Handler) = { + //Console.println("" + this + ": installing new handler") + handlers += h + this send HandlerAdded() // causes currently active eventloop to recursively call itself + } + + def publish(e: Event) = { + //Console.println("Publishing event: " + e) + for (val s <- subscribers) s send e + } + + // TODO: super.receive might already be overridden! + //final override def receive(f: PartialFunction[Message,unit]): scala.All = + //super.receive(new ProxyPubSubHandler(f)) + + private class ProxyPubSubHandler(f: PartialFunction[Message,unit]) extends PartialFunction[Message,unit] { + def isDefinedAt(m: Message): boolean = + if (f.isDefinedAt(m)) true + else m match { + case Subscribe(s) => true + case Publish(e) => true + case other => false + } + + def apply(m: Message): unit = { + m match { + case Subscribe(s) => + //Console.println("Rec subscription: " + s) + subscribers += s + case Publish(e) => + for (val s <- subscribers) s send e + case other => + // do nothing + } + if (f.isDefinedAt(m)) f(m) + } + } +} diff --git a/src/actors/scala/actors/gui/SimpleGUIApplication.scala b/src/actors/scala/actors/gui/SimpleGUIApplication.scala new file mode 100644 index 0000000000..30154932cc --- /dev/null +++ b/src/actors/scala/actors/gui/SimpleGUIApplication.scala @@ -0,0 +1,14 @@ +package scala.actors.gui; + +import javax.swing._ + +abstract class SimpleGUIApplication extends GUIApplication { + + def top: Frame; + + def main(args: Array[String]) = { + run { top.pack.show } + } + + implicit def string2label(s: String): Label = new Label(s) +} diff --git a/src/actors/scala/actors/gui/SwingComponent.scala b/src/actors/scala/actors/gui/SwingComponent.scala new file mode 100644 index 0000000000..cbe6478c32 --- /dev/null +++ b/src/actors/scala/actors/gui/SwingComponent.scala @@ -0,0 +1,11 @@ +package scala.actors.gui + +import javax.swing._ +import java.awt._ + +trait SwingComponent extends Component { + val jcomponent = acomponent.asInstanceOf[JComponent]; + def border: javax.swing.border.Border = jcomponent.getBorder() + def border_=(x: javax.swing.border.Border): unit = jcomponent.setBorder(x) +} + diff --git a/src/actors/scala/actors/gui/TextComponent.scala b/src/actors/scala/actors/gui/TextComponent.scala new file mode 100644 index 0000000000..955745045f --- /dev/null +++ b/src/actors/scala/actors/gui/TextComponent.scala @@ -0,0 +1,22 @@ +package scala.actors.gui + +import javax.swing._ +import javax.swing.text.JTextComponent +import javax.swing.event.{CaretEvent,CaretListener} +import event.CaretUpdate + +class TextComponent(val jtextcomponent: JTextComponent) +extends Container(jtextcomponent) with SwingComponent with Publisher { + + def text: String = jtextcomponent.getText() + def text_=(x: String) = jtextcomponent.setText(x) + + val caret = new Caret(jtextcomponent.getCaret()) + + jtextcomponent.addCaretListener { + new CaretListener { + def caretUpdate(e: CaretEvent) = + publish(CaretUpdate(TextComponent.this)) + } + } +} diff --git a/src/actors/scala/actors/gui/TextField.scala b/src/actors/scala/actors/gui/TextField.scala new file mode 100644 index 0000000000..0e32fc29f5 --- /dev/null +++ b/src/actors/scala/actors/gui/TextField.scala @@ -0,0 +1,22 @@ +package scala.actors.gui; + +import javax.swing._ +import java.awt.event._ +import event._ + +class TextField(val jtextfield: JTextField) extends TextComponent(jtextfield) { + def this(text: String, columns: int) = this(new JTextField(text, columns)); + def this(text: String) = this(new JTextField(text)); + def this(columns: int) = this(new JTextField(columns)); + def this() = this(new JTextField()); + + def columns: int = jtextfield.getColumns() + def columns_=(x: int) = jtextfield.setColumns(x) + + jtextfield.addActionListener { + new ActionListener { + def actionPerformed(e: ActionEvent) = + publish(TextModified(TextField.this)) + } + } +} diff --git a/src/actors/scala/actors/gui/event/ButtonPressed.scala b/src/actors/scala/actors/gui/event/ButtonPressed.scala new file mode 100644 index 0000000000..9d120314fb --- /dev/null +++ b/src/actors/scala/actors/gui/event/ButtonPressed.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class ButtonPressed(b: Button) extends Event diff --git a/src/actors/scala/actors/gui/event/CaretUpdate.scala b/src/actors/scala/actors/gui/event/CaretUpdate.scala new file mode 100644 index 0000000000..15c48385d9 --- /dev/null +++ b/src/actors/scala/actors/gui/event/CaretUpdate.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class CaretUpdate(text: TextComponent) extends Event diff --git a/src/actors/scala/actors/gui/event/Event.scala b/src/actors/scala/actors/gui/event/Event.scala new file mode 100644 index 0000000000..949cf2cb4d --- /dev/null +++ b/src/actors/scala/actors/gui/event/Event.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +abstract class Event diff --git a/src/actors/scala/actors/gui/event/MouseDragged.scala b/src/actors/scala/actors/gui/event/MouseDragged.scala new file mode 100644 index 0000000000..37b768b922 --- /dev/null +++ b/src/actors/scala/actors/gui/event/MouseDragged.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class MouseDragged(override val event: java.awt.event.MouseEvent) extends MouseEvent(event); diff --git a/src/actors/scala/actors/gui/event/MouseEvent.scala b/src/actors/scala/actors/gui/event/MouseEvent.scala new file mode 100644 index 0000000000..da43fbd8b8 --- /dev/null +++ b/src/actors/scala/actors/gui/event/MouseEvent.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +abstract class MouseEvent(val event: java.awt.event.MouseEvent) extends Event; diff --git a/src/actors/scala/actors/gui/event/MouseMoved.scala b/src/actors/scala/actors/gui/event/MouseMoved.scala new file mode 100644 index 0000000000..bf2028f159 --- /dev/null +++ b/src/actors/scala/actors/gui/event/MouseMoved.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class MouseMoved(override val event: java.awt.event.MouseEvent) extends MouseEvent(event); diff --git a/src/actors/scala/actors/gui/event/TextModified.scala b/src/actors/scala/actors/gui/event/TextModified.scala new file mode 100644 index 0000000000..0af1b49838 --- /dev/null +++ b/src/actors/scala/actors/gui/event/TextModified.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class TextModified(text: TextComponent) extends Event diff --git a/src/actors/scala/actors/gui/event/WindowActivated.scala b/src/actors/scala/actors/gui/event/WindowActivated.scala new file mode 100644 index 0000000000..797a2d0fc5 --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowActivated.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowActivated(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowClosed.scala b/src/actors/scala/actors/gui/event/WindowClosed.scala new file mode 100644 index 0000000000..7676809299 --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowClosed.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowClosed(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowClosing.scala b/src/actors/scala/actors/gui/event/WindowClosing.scala new file mode 100644 index 0000000000..bd63987c4d --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowClosing.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowClosing(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowDeactivated.scala b/src/actors/scala/actors/gui/event/WindowDeactivated.scala new file mode 100644 index 0000000000..4452f792d6 --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowDeactivated.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowDeactivated(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowDeiconified.scala b/src/actors/scala/actors/gui/event/WindowDeiconified.scala new file mode 100644 index 0000000000..1c7cbca57c --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowDeiconified.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowDeiconified(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowEvent.scala b/src/actors/scala/actors/gui/event/WindowEvent.scala new file mode 100644 index 0000000000..25177d8e2a --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowEvent.scala @@ -0,0 +1,5 @@ +package scala.actors.gui.event + +abstract class WindowEvent extends Event { + val window: Frame +} diff --git a/src/actors/scala/actors/gui/event/WindowIconified.scala b/src/actors/scala/actors/gui/event/WindowIconified.scala new file mode 100644 index 0000000000..abb6362188 --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowIconified.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowIconified(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/event/WindowOpened.scala b/src/actors/scala/actors/gui/event/WindowOpened.scala new file mode 100644 index 0000000000..f2656497a3 --- /dev/null +++ b/src/actors/scala/actors/gui/event/WindowOpened.scala @@ -0,0 +1,3 @@ +package scala.actors.gui.event + +case class WindowOpened(window: Frame) extends WindowEvent; diff --git a/src/actors/scala/actors/gui/layout.scala b/src/actors/scala/actors/gui/layout.scala new file mode 100644 index 0000000000..6d53d267ae --- /dev/null +++ b/src/actors/scala/actors/gui/layout.scala @@ -0,0 +1,12 @@ +package scala.actors.gui + +import java.awt._ + +object layout { + + val flex = 0 + + def grid(rows: int, columns: int) = new GridLayout(rows, columns) + def row = new FlowLayout() + def column = grid(flex, 1) +} diff --git a/src/actors/scala/actors/multi/AbstractPid.scala b/src/actors/scala/actors/multi/AbstractPid.scala new file mode 100644 index 0000000000..41cb745b57 --- /dev/null +++ b/src/actors/scala/actors/multi/AbstractPid.scala @@ -0,0 +1,10 @@ +package scala.actors.multi; + +/** + * @author Philipp Haller + */ +trait AbstractPid { + def !(msg: MailBox#Message): unit + def become(clos: Actor => Unit): unit + def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit +} diff --git a/src/actors/scala/actors/multi/Actor.scala b/src/actors/scala/actors/multi/Actor.scala new file mode 100644 index 0000000000..f1aa354880 --- /dev/null +++ b/src/actors/scala/actors/multi/Actor.scala @@ -0,0 +1,300 @@ +package scala.actors.multi + +import scala.collection.mutable.HashMap +import scala.collection.mutable.HashSet +import scala.collection.mutable.Stack + +case class ExcHandlerDesc(pid: Pid, eid: int) + +class ExcHandler(actions: PartialFunction[Throwable, unit], + actor: Actor, + parent: ExcHandlerDesc) { + def handle(e: Throwable): unit = { + if (!actions.isDefinedAt(e)) { + if (parent != null) actor.forwardExc(parent, e) + } + else + actions(e) + } +} + +/** + * @author Philipp Haller + */ +abstract class Actor extends MailBox { + def run(): Unit = {} + + def start(): Unit = { + var finished = true + try { run } + catch { + case d: Done => + finished = false + case t: Throwable => + if (!excHandlerDescs.isEmpty) + forwardExc(excHandlerDescs.top, t) + else + exit(new Symbol(t.toString())) + } + if (finished) die() + } + + case class Exit(from: Pid, reason: Symbol) extends Message + + private var pid: Pid = null + def self: Pid = { + if (pid == null) pid = new LocalPid(this) + pid + } + def self_= (p: Pid) = pid = p + + + private val links = new HashSet[Pid] + + def link(to: Pid): unit = { + // TODO: check if exists (eff: dont need to.linkTo...) + links += to + to.linkTo(self) + } + def linkTo(to: Pid): unit = links += to + + def unlink(from: Pid): unit = { + // TODO: check if exists (eff) + links -= from + from.unlinkFrom(self) + } + def unlinkFrom(from: Pid): unit = links -= from + + + private var trapExit = false + + def processFlag(flag: Symbol, set: boolean) = { + if (flag.name.equals("trapExit")) trapExit = set + } + + def exit(reason: Symbol): unit = { + exitLinked(reason, new HashSet[Pid]) + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + } + } + + def exit(from: Pid, reason: Symbol): unit = { + if (from == self) { + exit(reason) + } + else { + if (trapExit) + this send Exit(from, reason) + else if (!reason.name.equals("normal")) + exit(reason) + } + } + + def exitLinked(reason: Symbol, exitMarks: HashSet[Pid]): unit = { + if (exitMarks contains self) { + // we are marked, do nothing + } + else { + exitMarks += self // mark self as exiting + //Console.println("" + self + " is exiting (" + reason + ").") + + // exit linked scala.actors + val iter = links.elements + while (iter.hasNext) { + val linkedPid = iter.next + unlink(linkedPid) + linkedPid.exit(self, reason) + } + exitMarks -= self + } + } + + def spawnLink(body: Actor => unit): Pid = { + val a = new Actor { + override def run = body(this) + } + if (!excHandlerDescs.isEmpty) + a.pushExcHandlerDesc(excHandlerDescs.top) + // link new process to self + link(a.self) + a.start + a.self + } + + val excHandlerDescs = new Stack[ExcHandlerDesc] + + // exception handler table + val excHandlers = new HashMap[int, ExcHandler] + + def forwardExc(destDesc: ExcHandlerDesc, e: Throwable) = { + // locality check (handler local to this actor?) + if (destDesc.pid == self) { + handleExc(destDesc, e) + } + else { + // forward to pid of destination descriptor + destDesc.pid.handleExc(destDesc, e) + } + } + + def pushExcHandlerDesc(desc: ExcHandlerDesc) = { + excHandlerDescs += desc + } + + /** is only called for local handlers + (i.e. destDesc.pid == self) */ + def handleExc(destDesc: ExcHandlerDesc, e: Throwable) = + (excHandlers get destDesc.eid) match { + case Some(handler) => + handler.handle(e) + case None => + error("exc desc refers to non-registered handler") + }; + + var excCnt = 0 + def freshExcId = { excCnt = excCnt + 1; excCnt } + + def tryAsync(block: => unit, + handlerFun: PartialFunction[Throwable, unit]) = { + val excHandler = + new ExcHandler(handlerFun, + this, + if (excHandlerDescs.isEmpty) null + else excHandlerDescs.top) + + val desc = ExcHandlerDesc(self, freshExcId) + // associate desc with handler + excHandlers += desc.eid -> excHandler + // push desc onto stack + excHandlerDescs += desc + //Console.println("desc stack height: " + excHandlerDescs.length) + // execute code block + block + } + + def die(reason: Symbol) = { + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + exit(reason) + } + } + + override def die() = { + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + exit('normal) + } + } + + def process(f: PartialFunction[Message,unit], msg: Message): unit = { + try { + f(msg) + } + catch { + case d: Done => + throw new Done + case t: Throwable => + throw t + if (!excHandlerDescs.isEmpty) + forwardExc(excHandlerDescs.top, t) + else + die(Symbol(t.toString())) + } + } + + override def receive(f: PartialFunction[Message,unit]): scala.All = { + if (isAlive) { + Scheduler.tick(this) + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + process(f, msg) + die() + case None => + continuation = f + //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + override def receiveMsg(msg: MailBox#Message) = { + //Debug.info("" + Thread.currentThread() + ": Resuming " + this) + if (continuation != null) { + val f = continuation + continuation = null + scheduled = false + process(f, msg) + die() + } + else { + // use more complex receive-and-return continuation + val cases = contCases + val then = contThen + contCases = null + contThen = null + scheduled = false + val result = cases(msg) + then(result) + die() + } + } + + def spawn(a: Actor): Pid = { + // let "a" inherit active exception handler + if (!excHandlerDescs.isEmpty) + a.pushExcHandlerDesc(excHandlerDescs.top) + a.start + a.self + } + + def spawn(body: Actor => unit): Pid = { + val a = new Actor { + override def run = body(this) + } + if (!excHandlerDescs.isEmpty) + a.pushExcHandlerDesc(excHandlerDescs.top) + a.start + a.self + } + + def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = { + val a = new Actor { + override def run = receive(cases) + } + if (!excHandlerDescs.isEmpty) + a.pushExcHandlerDesc(excHandlerDescs.top) + a.start + a.self + } + + def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = { + receive { + case Pair(pid1, msg1) => receive { + case Pair(pid2, msg2) => cont match { + case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) + } + } + case Pair(pid2, msg2) => receive { + case Pair(pid1, msg1) => cont match { + case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) + } + } + } + } + + def makeRef = Actor.makeRef +} + +object Actor { + private var counter = 0 + type Tag = int + def makeRef: Tag = { + counter = counter + 1 + counter + } +} diff --git a/src/actors/scala/actors/multi/LocalPid.scala b/src/actors/scala/actors/multi/LocalPid.scala new file mode 100644 index 0000000000..5201abe22e --- /dev/null +++ b/src/actors/scala/actors/multi/LocalPid.scala @@ -0,0 +1,94 @@ +package scala.actors.multi; + +import scala.collection.mutable.Queue; + +/** + * @author Philipp Haller + */ +class LocalPid(actor: Actor) extends Pid { + var target = actor + + def !(msg: MailBox#Message): unit = target send msg + + def link(other: Pid): unit = target link other + def linkTo(other: Pid): unit = target linkTo other + def unlink(other: Pid): unit = target unlink other + def unlinkFrom(other: Pid): unit = target unlinkFrom other + def exit(reason: Symbol): unit = target exit reason + def exit(from: Pid, reason: Symbol): unit = target.exit(from, reason) + + def spawn(body: Actor => Unit): Pid = { + val a = new Actor { + override def run: Unit = body(this) + } + a.start + a.self + } + + def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = { + val a = new Actor { + override def run: Unit = receive(cases) + } + a.start + a.self + } + + override def toString() = "LocalPid(" + target + ")" + + def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit = + actor.handleExc(destDesc, e); + + def become(clos: Actor => Unit) = { + // old actor should become anonymous (cannot receive any messages any more) + // achieved by removing it from target of pid. + + val oldActor = target + //Debug.info("old actor: " + oldActor); + + // change our target to point to a newly created actor with the same mailbox. + + val newActor = new Actor { + override def run: Unit = clos(this) + } + newActor.sent = oldActor.sent + target = newActor + newActor.self = this + + //Debug.info("new actor: " + newActor); + + // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor) + oldActor.sent = new Queue[MailBox#Message] + + //Debug.info("Starting new actor."); + newActor.start // important to start after changing pid because actor may send messages to itself. + } + + private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] { + def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m) + def apply(m: MailBox#Message): unit = { + f(m) + a receive this + } + } + + def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = { + + become(a => a receive new ProxyPartialFunction(a, f)) + + /*become( + a:Actor => { + def loop: unit = { + def proxyFun(m: MailBox#Message): unit = { + if (f.isDefinedAt(m)) { + f(m); + loop + } + }; + //a receive proxyFun + } + loop + } + )*/ + } + +} diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala new file mode 100644 index 0000000000..cf721403cb --- /dev/null +++ b/src/actors/scala/actors/multi/MailBox.scala @@ -0,0 +1,175 @@ +package scala.actors.multi; + +import scala.collection.mutable.Queue; + +/** + * @author Philipp Haller + */ +class MailBox { + type Message = AnyRef + case class TIMEOUT() extends Message + + /** Unconsumed messages. */ + var sent = new Queue[Message] + + var continuation: PartialFunction[Message,Unit] = null + // more complex continuation + var contCases: PartialFunction[Message,Message] = null + var contThen: Message => unit = null + + def hasCont = + if ((continuation == null) && (contCases == null)) false + else true + + def contDefinedAt(msg: Message) = + if (((continuation != null) && continuation.isDefinedAt(msg)) || + ((contCases != null) && contCases.isDefinedAt(msg))) + true + else + false + + var isAlive = true + var scheduled = false + + private var pendingSignal = false + + def send(msg: Message): unit = synchronized { + if (isAlive) { + if (!hasCont || scheduled) { + //Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") + msg match { + case Signal() => + // do not add to mailbox + case _ => + sent += msg + } + } + else + msg match { + case Signal() => + if (!contDefinedAt(TIMEOUT())) die() + else { + val task = new ReceiverTask(this, TIMEOUT()) + //Debug.info("ready to receive. dispatch new task " + task) + scheduled = true + Scheduler.execute(task) + } + case _ => + if (!contDefinedAt(msg)) + sent += msg + else { + if (pendingSignal) { + pendingSignal = false + TimerThread.trashRequest(this) + } + val task = new ReceiverTask(this, msg) + //Debug.info("ready to receive. dispatch new task " + task) + scheduled = true + Scheduler.execute(task) + } + } + } + } + + def receiveMsg(msg: MailBox#Message) = { + //Debug.info("" + Thread.currentThread() + ": Resuming " + this) + if (continuation != null) { + val f = continuation + continuation = null + scheduled = false + f(msg) + die() + } + else { + // use more complex receive-and-return continuation + val cases = contCases + val then = contThen + contCases = null + contThen = null + scheduled = false + val result = cases(msg) + then(result) + die() + } + } + + def receive(f: PartialFunction[Message,unit]): scala.All = { + if (isAlive) { + Scheduler.tick(this) + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + f(msg) + die() + case None => + continuation = f + //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = { + Scheduler.tick(this) + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + f(msg) + die() + case None => + // if timeout == 0 then execute timeout action if specified (see Erlang book) + if (msec == 0) { + if (f.isDefinedAt(TIMEOUT())) + f(TIMEOUT()) + die() + } + else { + if (msec > 0) { + TimerThread.requestTimeout(this, msec) + pendingSignal = true + } + continuation = f + //Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + // original wish: + // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B + // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit + + def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): unit = { + contCases = null + contThen = null + sent.dequeueFirst(cases.isDefinedAt) match { + case Some(msg) => { + val result = cases(msg) + then(result) + die() + } + case None => { + contCases = cases + contThen = then + //Debug.info("No msg found. Saved complex continuation.") + } + } + throw new Done + } + + // receiv {...} then (msg => {...msg...}) + + class ReceiveAndReturn(cases: PartialFunction[Message,Message]) { + def then(body: Message => unit): unit = receiveAndReturn(cases, body) + } + + def receiv(cases: PartialFunction[Message,Message]): ReceiveAndReturn = + new ReceiveAndReturn(cases) + + def die() = { + if (isAlive) { + isAlive = false + //Debug.info("" + this + " died.") + } + } +} diff --git a/src/actors/scala/actors/multi/Pid.scala b/src/actors/scala/actors/multi/Pid.scala new file mode 100644 index 0000000000..99e5c59913 --- /dev/null +++ b/src/actors/scala/actors/multi/Pid.scala @@ -0,0 +1,20 @@ +package scala.actors.multi + +/** + * @author Philipp Haller + */ +[serializable]abstract class Pid { + def !(msg: MailBox#Message): unit; + + def link(other: Pid): unit; + def linkTo(other: Pid): unit; // uni-directional + def unlink(other: Pid): unit; + def unlinkFrom(other: Pid): unit; // uni-directional + def exit(reason: Symbol): unit; + def exit(from: Pid, reason: Symbol): unit; + + def handleExc(destDesc: ExcHandlerDesc, e: Throwable): unit; + + //def become(clos: Actor => Unit): unit + //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit +} diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala new file mode 100644 index 0000000000..218ab000de --- /dev/null +++ b/src/actors/scala/actors/multi/ReceiverTask.scala @@ -0,0 +1,16 @@ +package scala.actors.multi; + +/** + * @author Philipp Haller + */ +class ReceiverTask(val actor: MailBox, msg: MailBox#Message) extends Runnable { + def run(): unit = { + try { + actor receiveMsg msg + } + catch { + case d: Done => + // do nothing (continuation is already saved) + } + } +} diff --git a/src/actors/scala/actors/single/AbstractPid.scala b/src/actors/scala/actors/single/AbstractPid.scala new file mode 100644 index 0000000000..28f972148d --- /dev/null +++ b/src/actors/scala/actors/single/AbstractPid.scala @@ -0,0 +1,10 @@ +package scala.actors.single; + +/** + * @author Philipp Haller + */ +trait AbstractPid { + def !(msg: MailBox#Message): unit + def become(clos: Actor => Unit): unit + def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit +} diff --git a/src/actors/scala/actors/single/Actor.scala b/src/actors/scala/actors/single/Actor.scala new file mode 100644 index 0000000000..41d1f4a59e --- /dev/null +++ b/src/actors/scala/actors/single/Actor.scala @@ -0,0 +1,72 @@ +package scala.actors.single + +/** + * @author Philipp Haller + */ +abstract class Actor extends MailBox { + def run: Unit = {} + + def start: Unit = { + try { run } + catch { + case d:Done => + // do nothing + } + } + + private var pid: Pid = null + + def self: Pid = { + if (pid == null) pid = new LocalPid(this) + pid + } + + def self_= (p: Pid) = pid = p + + def join(pid1: Pid, pid2: Pid, cont: List[Pid]): unit = { + receive { + case Pair(pid1, msg1) => receive { + case Pair(pid2, msg2) => cont match { + case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) + } + } + case Pair(pid2, msg2) => receive { + case Pair(pid1, msg1) => cont match { + case x::xs => x ! Pair(self, Pair(Pair(msg1, msg2), xs)) + } + } + } + } + + def spawn(body: Actor => unit): Pid = { + val a = new Actor { + override def run = body(this) + } + a.start + a.self + } + + def spawn(a: Actor): Pid = { + a.start + a.self + } + + def spawnReceive(cases: PartialFunction[MailBox#Message,unit]) = { + val a = new Actor { + override def run = receive(cases) + } + a.start + a.self + } + + def makeRef = Actor.makeRef +} + +object Actor { + private var counter = 0 + type Tag = int + def makeRef: Tag = { + counter = counter + 1 + counter + } +} diff --git a/src/actors/scala/actors/single/LocalPid.scala b/src/actors/scala/actors/single/LocalPid.scala new file mode 100644 index 0000000000..8186dca426 --- /dev/null +++ b/src/actors/scala/actors/single/LocalPid.scala @@ -0,0 +1,83 @@ +package scala.actors.single; + +import scala.collection.mutable.Queue; + +/** + * @author Philipp Haller + */ +class LocalPid(actor: Actor) extends Pid { + var target = actor + + def !(msg: MailBox#Message): unit = target send msg + + def become(clos: Actor => Unit) = { + // old actor should become anonymous (cannot receive any messages any more) + // achieved by removing it from target of pid. + + val oldActor = target + //Debug.info("old actor: " + oldActor); + + // change our target to point to a newly created actor with the same mailbox. + + val newActor = new Actor { + override def run: Unit = clos(this) + } + newActor.sent = oldActor.sent + target = newActor + newActor.self = this + + //Debug.info("new actor: " + newActor); + + // clean mailbox of now anonymous actor (must not receive any messages any more; pending messages are for new actor) + oldActor.sent = new Queue[MailBox#Message] + + //Debug.info("Starting new actor."); + newActor.start // important to start after changing pid because actor may send messages to itself. + } + + private class ProxyPartialFunction(a: Actor, f: PartialFunction[MailBox#Message,unit]) extends PartialFunction[MailBox#Message, unit] { + def isDefinedAt(m: MailBox#Message): boolean = f.isDefinedAt(m) + def apply(m: MailBox#Message): unit = { + f(m) + a receive this + } + } + + def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]) = { + + become(a => a receive new ProxyPartialFunction(a, f)) + + /*become( + a:Actor => { + def loop: unit = { + def proxyFun(m: MailBox#Message): unit = { + if (f.isDefinedAt(m)) { + f(m); + loop + } + }; + //a receive proxyFun + } + loop + } + )*/ + } + + def spawn(body: Actor => Unit): Pid = { + val a = new Actor { + override def run: Unit = body(this) + } + a.start + a.self + } + + def spawnReceive(cases: PartialFunction[MailBox#Message,Unit]) = { + val a = new Actor { + override def run: Unit = receive(cases) + } + a.start + a.self + } + + override def toString() = "LocalPid(" + target + ")" +} diff --git a/src/actors/scala/actors/single/MailBox.scala b/src/actors/scala/actors/single/MailBox.scala new file mode 100644 index 0000000000..68d6c5e784 --- /dev/null +++ b/src/actors/scala/actors/single/MailBox.scala @@ -0,0 +1,155 @@ +package scala.actors.single; + +import scala.collection.mutable.Queue; + +/** + * @author Philipp Haller + */ +class MailBox { + + type Message = AnyRef + case class TIMEOUT() extends Message + + /** Unconsumed messages. */ + var sent = new Queue[Message] + + var continuation: PartialFunction[Message,Unit] = null + // more complex continuation + var contCases: PartialFunction[Message,Message] = null + var contThen: Message => unit = null + + def hasCont = + if ((continuation == null) && (contCases == null)) false + else true + + def contDefinedAt(msg: Message) = + if (((continuation != null) && continuation.isDefinedAt(msg)) || + ((contCases != null) && contCases.isDefinedAt(msg))) + true + else + false + + var isAlive = true + + private var duration: long = 0 + private var timeInitial: long = 0 + private var timeoutEnabled: boolean = false + + def send(msg: Message): unit = synchronized { + if (isAlive) + if (!hasCont) { + Debug.info("no cont avail/task already scheduled. appending msg to mailbox.") + sent += msg + } + else { + var message = msg + var timeoutOccurred = false + + if (timeoutEnabled && (System.currentTimeMillis() - timeInitial > duration)) + timeoutOccurred = true + + if (timeoutOccurred && !contDefinedAt(TIMEOUT())) + die() + else { + if (timeoutOccurred) message = TIMEOUT() + + if (contDefinedAt(message)) { + // we exit receive, so reset timeoutEnabled + timeoutEnabled = false + + try { + if (continuation != null) { + val f = continuation + continuation = null + f(msg) + die() + } + else { + // use more complex receive-and-return continuation + val cases = contCases + val then = contThen + contCases = null + contThen = null + val result = cases(msg) + then(result) + die() + } + } + catch { + case d: Done => + // do nothing (continuation is already saved) + } + } + else { + Debug.info("cont not defined at msg. appending to mailbox.") + if (!timeoutOccurred) sent += message + } + } + } + } + + def receive(f: PartialFunction[Message,unit]): scala.All = { + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + f(msg) + die() + case None => + continuation = f + Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + throw new Done + } + + def receiveWithin(msec: long)(f: PartialFunction[Message, unit]): scala.All = { + timeInitial = System.currentTimeMillis() + duration = msec + + continuation = null + sent.dequeueFirst(f.isDefinedAt) match { + case Some(msg) => + f(msg) + die() + case None => + // if timeout == 0 then execute timeout action if specified (see Erlang book) + if (duration == 0) { + if (f.isDefinedAt(TIMEOUT())) + f(TIMEOUT()) + die() + } + else { + timeoutEnabled = true + continuation = f + Debug.info("No msg found. " + this + " has continuation " + continuation + ".") + } + } + throw new Done + } + + // original wish: + // receiveAndReturn[A, B](cases: PartialFunction[Message, A], then: A => B): B + // receiveAndReturn[A](cases: PartialFunction[Message, A], then: A => unit): unit + + def receiveAndReturn(cases: PartialFunction[Message,Message], then: Message => unit): scala.All = { + contCases = null + contThen = null + sent.dequeueFirst(cases.isDefinedAt) match { + case Some(msg) => { + val result = cases(msg) + then(result) + die() + } + case None => { + contCases = cases + contThen = then + Debug.info("No msg found. Saved complex continuation.") + } + } + throw new Done + } + + def die() = { + isAlive = false + Debug.info("" + this + " died.") + } +} diff --git a/src/actors/scala/actors/single/Pid.scala b/src/actors/scala/actors/single/Pid.scala new file mode 100644 index 0000000000..2c7ab20031 --- /dev/null +++ b/src/actors/scala/actors/single/Pid.scala @@ -0,0 +1,10 @@ +package scala.actors.single; + +/** + * @author Philipp Haller + */ +abstract class Pid { + def !(msg: MailBox#Message): unit; + //def become(clos: Actor => Unit): unit + //def becomeReceiveLoop(f: PartialFunction[MailBox#Message,unit]): unit +} -- cgit v1.2.3