summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-09-29 16:50:14 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-09-29 16:50:14 +0000
commit499d7f10e23549ef30a61d13fc7f4203145f14f1 (patch)
treea69dde160c1496d07ea2d1d0edc25ab2cee1a26c
parent8abd909119cb9a3011d6c141138f2e2a5af5e304 (diff)
downloadscala-499d7f10e23549ef30a61d13fc7f4203145f14f1.tar.gz
scala-499d7f10e23549ef30a61d13fc7f4203145f14f1.tar.bz2
scala-499d7f10e23549ef30a61d13fc7f4203145f14f1.zip
Checked in new actors lib.
-rw-r--r--src/actors/scala/actors/Actor.scala269
-rw-r--r--src/actors/scala/actors/Channel.scala185
-rw-r--r--src/actors/scala/actors/Reactor.scala97
-rw-r--r--src/actors/scala/actors/Scheduler.scala276
-rw-r--r--src/actors/scala/actors/ThreadedActor.scala26
-rw-r--r--src/actors/scala/actors/TimerThread.scala108
-rw-r--r--src/actors/scala/actors/remote/FreshNameCreator.scala26
-rw-r--r--src/actors/scala/actors/remote/JavaSerializer.scala19
-rw-r--r--src/actors/scala/actors/remote/NetKernel.scala65
-rw-r--r--src/actors/scala/actors/remote/Serializer.scala43
-rw-r--r--src/actors/scala/actors/remote/Service.scala8
-rw-r--r--src/actors/scala/actors/remote/TcpService.scala191
12 files changed, 1313 insertions, 0 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
new file mode 100644
index 0000000000..7159cb890e
--- /dev/null
+++ b/src/actors/scala/actors/Actor.scala
@@ -0,0 +1,269 @@
+package scala.actors
+
+object Actor {
+
+ private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)
+
+ def self: Actor = synchronized {
+ val t = Thread.currentThread()
+ if (t.isInstanceOf[ActorThread])
+ t.asInstanceOf[ActorThread]
+ else {
+ var a = selfs.get(t).asInstanceOf[Actor]
+ if (a == null) {
+ a = new ActorProxy(t)
+ selfs.put(t, a)
+ }
+ a
+ }
+ }
+
+ def actor(body: => Unit): ActorThread = synchronized {
+ val actor = new ActorThread {
+ override def run(): Unit = {
+ body
+ this.kill()
+ }
+ }
+ actor.start()
+ actor
+ }
+
+ def reactor(body: => Unit): Reactor = synchronized {
+ val reactor = new Reactor {
+ override def run(): Unit = body
+ }
+ reactor.start()
+ reactor
+ }
+
+ def receive[a](f: PartialFunction[Any, a]): a =
+ self.in.receive(f)
+
+ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R =
+ self.in.receiveWithin(msec)(f)
+
+ def react(f: PartialFunction[Any, Unit]): Nothing =
+ self.in.react(f)
+
+ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing =
+ self.in.reactWithin(msec)(f)
+
+ def eventloop(f: PartialFunction[Any, Unit]): Nothing =
+ self.in.react(new RecursiveProxyHandler(self, f))
+
+ private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit])
+ extends PartialFunction[Any, Unit] {
+ def isDefinedAt(m: Any): boolean =
+ true // events are immediately removed from the mailbox
+ def apply(m: Any): Unit = {
+ if (f.isDefinedAt(m)) f(m)
+ self.in.react(this)
+ }
+ }
+
+ def from(r: Actor): FromReceive =
+ new FromReceive(r)
+
+ private[actors] class FromReceive(r: Actor) {
+ def receive[a](f: PartialFunction[Any, a]): a =
+ self.in.receiveFrom(r)(f)
+ }
+
+ def sender: Actor = self.sender
+
+ def reply(msg: Any): Unit = sender.reply ! msg
+
+ def reply(): Unit = reply(())
+
+ def forward(msg: Any): Unit = self.in.forward(msg)
+
+ private[actors] trait Body[T] {
+ def orElse(other: => T): T
+ def andThen(other: => T): T
+ }
+
+ implicit def mkBody(body: => Unit) = new Body[Unit] {
+ def orElse(other: => Unit): Unit = choose(body, other)
+ def andThen(other: => Unit): Unit = seq(body, other)
+ }
+
+ def choose(alt1: => Unit, alt2: => Unit): Unit = {
+ val s = self
+ // save former custom suspendActor function
+ // (e.g. from further orElse)
+ val suspendNext = s.suspendActor
+ val detachNext = s.detachActor
+
+ // have to get out of the point of suspend in alt1's
+ // receive
+ s.suspendActor = () => { throw new SuspendActorException }
+ s.detachActor = f => { throw new SuspendActorException }
+
+ try { alt1 } catch {
+ case d: SuspendActorException => {
+ s.suspendActor = suspendNext
+ s.detachActor = detachNext
+ alt2
+ }
+ }
+ }
+
+ def loop(b: => Unit): Unit = {
+ val s = self
+ s.kill = () => { b; s.kill() }
+ b
+ }
+
+ def seq(b1: => Unit, b2: => Unit): Unit = {
+ val s = self
+ s.kill = () => { b2 }
+ b1
+ }
+}
+
+trait Actor {
+
+ private[actors] val in = new Channel[Any]
+ in.receiver = this
+
+ private var rc: Channel[Any] = null
+ private[actors] def reply: Channel[Any] = {
+ if (rc == null) {
+ rc = new Channel[Any]
+ rc.receiver = this
+ }
+ rc
+ }
+
+ private[actors] def freshReply(): Unit = {
+ rc = new Channel[Any]
+ rc.receiver = this
+ }
+
+ def !(msg: Any): Unit = in ! msg
+ def !?(msg: Any): Any = in !? msg
+
+ private[actors] def sender: Actor
+ private[actors] def pushSender(sender: Actor): unit
+ private[actors] def popSender(): unit
+
+ private[actors] var kill: () => Unit = _
+ private[actors] var suspendActor: () => unit = _
+ private[actors] var suspendActorFor: long => unit = _
+ private[actors] var detachActor: PartialFunction[Any, unit] => unit = _
+
+ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)
+
+ private[actors] def isThreaded: boolean
+ private[actors] def resetActor(): unit
+
+ resetActor()
+}
+
+class ActorThread extends Thread with ThreadedActor
+
+class ActorProxy(t: Thread) extends ThreadedActor
+
+
+object RemoteActor {
+ import remote.NetKernel
+ import remote.TcpService
+
+ private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel]
+
+ def alive(port: int): Unit = {
+ val serv = new TcpService(port)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ }
+
+ def register(name: Symbol, a: Actor): Unit = {
+ val kernel = kernels.get(Actor.self) match {
+ case None => {
+ val serv = new TcpService(TcpService.generatePort)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ serv.kernel
+ }
+ case Some(k) => k
+ }
+ kernel.register(name, a)
+ }
+
+ def select(node: Node, name: Symbol): Actor =
+ new Reactor {
+ override def !(msg: Any): Unit = msg match {
+ case a: AnyRef => {
+ // establish remotely accessible
+ // return path (sender)
+ val kernel = kernels.get(Actor.self) match {
+ case None => {
+ val serv = new TcpService(TcpService.generatePort)
+ serv.start()
+ kernels += Actor.self -> serv.kernel
+ serv.kernel
+ }
+ case Some(k) => k
+ }
+ kernel.send(node, name, a)
+ }
+ case other =>
+ error("Cannot send non-AnyRef value remotely.")
+ }
+ override def !?(msg: Any): Any =
+ error("!? not implemented for remote actors.")
+ }
+}
+
+abstract class Node
+case class TcpNode(address: String, port: Int) extends Node
+case class JxtaNode(group: String) extends Node
+
+
+private[actors] abstract class MessageQueueResult[Msg] {
+ def msg: Msg
+ def sender: Actor
+}
+
+private[actors] class MessageQueue[Msg] extends MessageQueueResult[Msg] {
+ var msg: Msg = _
+ var sender: Actor = _
+ private var next: MessageQueue[Msg] = this
+
+ def append(msg: Msg, sender: Actor) = {
+ val q = new MessageQueue[Msg]
+ q.msg = msg
+ q.sender = sender
+ q.next = next
+ next = q
+ }
+
+ def extractFirst(p: Msg => boolean): MessageQueueResult[Msg] = {
+ var q = this
+ var qnext = q.next
+ while (qnext != this) {
+ if (p(qnext.msg)) {
+ q.next = qnext.next
+ return qnext
+ }
+ q = qnext
+ qnext = qnext.next
+ }
+ null
+ }
+
+ def dequeueFirst(p: MessageQueueResult[Msg] => boolean): MessageQueueResult[Msg] = {
+ var q = this
+ var qnext = q.next
+ while (qnext != this) {
+ if (p(qnext)) {
+ q.next = qnext.next
+ return qnext
+ }
+ q = qnext
+ qnext = qnext.next
+ }
+ null
+ }
+}
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
new file mode 100644
index 0000000000..445d08b6ba
--- /dev/null
+++ b/src/actors/scala/actors/Channel.scala
@@ -0,0 +1,185 @@
+package scala.actors
+
+import Actor._
+
+case object TIMEOUT
+
+class SuspendActorException extends Throwable {
+ override def fillInStackTrace(): Throwable = {
+ this
+ }
+}
+
+class Channel[Msg] {
+
+ private[actors] var receiver: Actor = synchronized {
+ // basically Actor.self, but can be null
+ val t = Thread.currentThread()
+ if (t.isInstanceOf[ActorThread])
+ t.asInstanceOf[ActorThread]
+ else {
+ val a = selfs.get(t).asInstanceOf[Actor]
+ a
+ }
+ }
+
+ private var received: Msg = _
+
+ private val waitingForNone = (m: Msg) => false
+ private var waitingFor: Msg => boolean = waitingForNone
+ private var waitingForSender: Actor = null
+
+ private val messageQueue = new MessageQueue[Msg]
+
+ private def send(msg: Msg, sender: Actor) = receiver.synchronized {
+ if (waitingFor(msg) && ((waitingForSender == null) ||
+ (waitingForSender == sender))) {
+ received = msg
+ receiver.pushSender(sender)
+ waitingFor = waitingForNone
+ waitingForSender = null
+
+ if (receiver.isInstanceOf[Reactor]) {
+ val myReactor = receiver.asInstanceOf[Reactor]
+ if (myReactor.timeoutPending) {
+ myReactor.timeoutPending = false
+ TimerThread.trashRequest(myReactor)
+ }
+ }
+
+ receiver.scheduleActor(null, msg)
+ } else {
+ messageQueue.append(msg, sender)
+ }
+ }
+
+ def !(msg: Msg): unit = send(msg, self)
+
+ def !?(msg: Msg): Any = {
+ self.freshReply()
+ this ! msg
+ self.reply.receiveFrom(receiver) {
+ case x => x
+ }
+ }
+
+ def forward(msg: Msg): unit = send(msg, receiver.sender)
+
+ def receive[R](f: PartialFunction[Msg, R]): R = {
+ assert(self == receiver, "receive from channel belonging to other actor")
+ assert(receiver.isThreaded, "receive invoked from reactor")
+ receiver.synchronized {
+ waitingFor = f.isDefinedAt
+ val q = messageQueue.extractFirst(waitingFor)
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ }
+ else synchronized {
+ receiver.suspendActor()
+ }
+ waitingFor = waitingForNone
+ }
+ receiver.resetActor()
+ val result = f(received)
+ receiver.popSender()
+ result
+ }
+
+ private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = {
+ assert(self == receiver, "receive from channel belonging to other actor")
+ assert(receiver.isThreaded, "receive invoked from reactor")
+ receiver.synchronized {
+ waitingFor = f.isDefinedAt
+ waitingForSender = r
+ var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => {
+ waitingFor(item.msg) && item.sender == r
+ })
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ }
+ else synchronized {
+ receiver.suspendActor()
+ }
+ waitingFor = waitingForNone
+ waitingForSender = null
+ }
+ receiver.resetActor()
+ val result = f(received)
+ receiver.popSender()
+ result
+ }
+
+ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
+ assert(self == receiver, "receive from channel belonging to other actor")
+ assert(receiver.isThreaded, "receive invoked from reactor")
+ receiver.synchronized {
+ waitingFor = f.isDefinedAt
+ val q = messageQueue.extractFirst(waitingFor)
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ }
+ else synchronized {
+ receiver.suspendActorFor(msec)
+ if (received == null)
+ if (f.isDefinedAt(TIMEOUT)) {
+ receiver.resetActor()
+ val result = f(TIMEOUT)
+ return result
+ }
+ else
+ error("unhandled timeout")
+ }
+ waitingFor = waitingForNone
+ }
+ receiver.resetActor()
+ val result = f(received)
+ receiver.popSender()
+ result
+ }
+
+ def react(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(self == receiver, "react on channel belonging to other actor")
+ receiver.synchronized {
+ waitingFor = f.isDefinedAt
+ val q = messageQueue.extractFirst(waitingFor)
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ waitingFor = waitingForNone
+ receiver.scheduleActor(f, received)
+
+ // would like:
+ // receiver.continuation = f
+ // receiver.message = received
+ // receiver.resume()
+ }
+ else synchronized {
+ receiver.detachActor(f)
+ }
+ throw new SuspendActorException
+ }
+ }
+
+ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
+ assert(self == receiver, "react on channel belonging to other actor")
+ receiver.synchronized {
+ waitingFor = f.isDefinedAt
+ val q = messageQueue.extractFirst(waitingFor)
+ if (q != null) {
+ received = q.msg
+ receiver.pushSender(q.sender)
+ waitingFor = waitingForNone
+ receiver.scheduleActor(f, received)
+ }
+ else synchronized {
+ TimerThread.requestTimeout(receiver.asInstanceOf[Reactor], f, msec)
+ receiver.asInstanceOf[Reactor].timeoutPending = true
+ receiver.detachActor(f)
+ }
+ throw new SuspendActorException
+ }
+ }
+}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
new file mode 100644
index 0000000000..b0cfdf62b4
--- /dev/null
+++ b/src/actors/scala/actors/Reactor.scala
@@ -0,0 +1,97 @@
+package scala.actors
+
+trait Reactor extends Actor {
+ private var lastSender: Actor = null
+ def sender: Actor = lastSender
+ def pushSender(sender: Actor): Unit = lastSender = sender
+ def popSender(): Unit = lastSender = null
+
+ def isThreaded = false
+
+ private[actors] var continuation: PartialFunction[Any, Unit] = null
+ private[actors] var timeoutPending = false
+
+ //def resumeActor = () => Unit = () => {
+ //}
+
+ def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ if (f == null && continuation == null) {
+ // do nothing (timeout is handled instead)
+ }
+ else {
+ val task = new ActorTask(this,
+ if (f == null) continuation else f,
+ msg)
+ Scheduler.execute(task)
+ }
+ }
+
+ def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
+ (f: PartialFunction[Any, Unit]) => {
+ continuation = f
+ throw new SuspendActorException
+ }
+
+ def resetActor(): Unit = {
+ detachActor = defaultDetachActor
+ suspendActor = () => error("suspendActor called on reactor.")
+ suspendActorFor = (msec: long) => error("suspendActorFor called on reactor.")
+ kill = () => {}
+ }
+
+ resetActor()
+
+ def run(): Unit = {}
+
+ def start(): Unit = {
+ Scheduler.execute(new StartTask(this))
+ }
+}
+
+abstract class Reaction extends Runnable {
+ def actor: Reactor
+}
+
+class StartTask(a: Reactor) extends Reaction {
+ def actor = a
+
+ def run(): Unit = {
+ val t = Thread.currentThread()
+ val saved = Actor.selfs.get(t).asInstanceOf[Actor]
+ Actor.selfs.put(t, a)
+ try {
+ a.run()
+ a.kill()
+ }
+ catch {
+ case d: SuspendActorException =>
+ // do nothing (continuation is already saved)
+ }
+ finally {
+ Actor.selfs.put(t, saved)
+ }
+ }
+}
+
+class ActorTask(a: Reactor,
+ f: PartialFunction[Any, Unit],
+ msg: Any) extends Reaction {
+ def actor = a
+
+ def run(): Unit = {
+ val t = Thread.currentThread()
+ val saved = Actor.selfs.get(t).asInstanceOf[Actor]
+ Actor.selfs.put(t, a)
+ try {
+ f(msg)
+ a.kill()
+ }
+ catch {
+ case d: SuspendActorException =>
+ // do nothing (continuation is already saved)
+ }
+ finally {
+ Actor.selfs.put(t, saved)
+ }
+ }
+}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
new file mode 100644
index 0000000000..24875d6090
--- /dev/null
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -0,0 +1,276 @@
+package scala.actors
+
+import scala.collection.mutable._
+
+object Scheduler /*extends java.util.concurrent.Executor*/ {
+ private var sched: /*java.util.concurrent.Executor*/ IScheduler =
+ //java.util.concurrent.Executors.newFixedThreadPool(4);
+ //new FixedWorkersScheduler(2);
+ //new SpareWorkerScheduler2;
+ new SpareWorkerScheduler
+ //new SingleThreadedScheduler
+
+ def impl = sched
+ def impl_= (scheduler: /*java.util.concurrent.Executor*/ IScheduler) = {
+ sched = scheduler
+ }
+
+ def execute(task: Reaction) = synchronized {
+ sched.execute(task)
+ }
+
+ def tick(a: Reactor) = {
+ sched.tick(a)
+ }
+
+ def shutdown(): Unit = sched.shutdown()
+}
+
+abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
+ def execute(task: Reaction): Unit
+ def getTask(worker: WorkerThread): Runnable
+ def tick(a: Reactor): Unit
+
+ def shutdown(): Unit
+
+ val QUIT_TASK = new Reaction() {
+ def actor: Reactor = null
+ def run(): Unit = {}
+ override def toString() = "QUIT_TASK"
+ }
+}
+
+class SingleThreadedScheduler extends IScheduler {
+ def execute(task: Reaction): Unit = {
+ // execute task immediately on same thread
+ task.run()
+ }
+
+ def getTask(worker: WorkerThread): Runnable = { null }
+
+ def tick(a: Reactor): Unit = {}
+
+ def shutdown(): Unit = {}
+}
+
+abstract class FixedWorkersScheduler(workercnt: int) extends IScheduler {
+ private var canQuit = false;
+ private val tasks = new Queue[ActorTask];
+ private val idle = new Queue[WorkerThread];
+
+ private var workers = new Array[WorkerThread](workercnt);
+
+ def init = {
+ for (val i <- List.range(0, workers.length)) {
+ workers(i) = new WorkerThread(this)
+ workers(i).start()
+ }
+ }
+ init;
+
+ def execute(item: ActorTask): unit = synchronized {
+ if (workers.length == 0) item.run
+ else {
+ canQuit = true
+ if (idle.length > 0) idle.dequeue.execute(item)
+ else tasks += item
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ //if ((idle.length == workers.length) && canQuit) haltExcept(worker)
+ //else null
+ }
+ }
+
+ def tick(a: Reactor): unit = {}
+}
+
+abstract class SpareWorkerScheduler2 extends IScheduler {
+ private val tasks = new Queue[ActorTask];
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
+
+ val idle = new Queue[WorkerThread];
+ val ticks = new HashMap[WorkerThread, long]
+ val executing = new HashMap[Reactor, WorkerThread]
+
+ var TICKFREQ = 50
+
+ def init = {
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init;
+
+ var maxWorkers = 0;
+ var ticksCnt = 0;
+
+ def tick(a: Reactor): unit = synchronized {
+ ticksCnt = ticksCnt + 1
+ executing.get(a) match {
+ case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
+ case Some(wt) =>
+ ticks.update(wt, System.currentTimeMillis)
+ }
+ }
+
+ def execute(item: ActorTask): unit = synchronized {
+ if (idle.length > 0) {
+ val wt = idle.dequeue
+ executing.update(item.actor, wt)
+ wt.execute(item)
+ }
+ else {
+ /* only create new worker thread
+ when all are blocked according to heuristic
+
+ we check time stamps of latest send/receive ops
+ of ALL workers
+
+ we stop if there is one which is not blocked */
+
+ val iter = workers.elements
+ var foundBusy = false
+ while (iter.hasNext && !foundBusy) {
+ val wt = iter.next
+ ticks.get(wt) match {
+ case None => foundBusy = true // assume not blocked
+ case Some(ts) => {
+ val currTime = System.currentTimeMillis
+ if (currTime - ts < TICKFREQ)
+ foundBusy = true
+ }
+ }
+ }
+
+ if (!foundBusy) {
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ maxWorkers = workers.length // statistics
+
+ executing.update(item.actor, newWorker)
+
+ newWorker.execute(item)
+ newWorker.start()
+ }
+ else {
+ // wait assuming busy thread will be finished soon
+ // and ask for more work
+ tasks += item
+ }
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) {
+ val item = tasks.dequeue
+ executing.update(item.actor, worker)
+ item
+ }
+ else {
+ idle += worker
+ null
+ }
+ }
+}
+
+class SpareWorkerScheduler extends IScheduler {
+
+ private val tasks = new Queue[Reaction]
+ private val idle = new Queue[WorkerThread]
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
+
+ private var maxWorkers = 2
+
+ def init() = {
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init()
+
+ def execute(task: Reaction): Unit = synchronized {
+ //Console.println("received new task " + task)
+
+ if (!terminating) {
+ if (idle.length == 0) {
+ tasks += task
+ // create new worker
+ //Console.println("create new worker thread")
+ maxWorkers = maxWorkers + 1
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ newWorker.start()
+ }
+ else {
+ idle.dequeue.execute(task)
+ }
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ }
+ }
+
+ def tick(a: Reactor): Unit = {}
+
+ private var terminating = false
+
+ def shutdown(): Unit = synchronized {
+ terminating = true
+
+ val numNonIdle = workers.length - idle.length
+
+ for (val i <- List.range(0, numNonIdle))
+ tasks += QUIT_TASK
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.interrupt()
+ worker.join()
+ }
+ }
+}
+
+
+class WorkerThread(sched: IScheduler) extends Thread {
+
+ private var task: Runnable = null
+
+ def execute(r: Runnable) = synchronized {
+ task = r
+ notify()
+ }
+
+ private var running = true
+
+ override def run(): Unit = synchronized {
+ try {
+ while (running) {
+ if (task != null) {
+ task.run()
+ }
+ task = sched.getTask(this)
+ if (task == sched.QUIT_TASK) {
+ running = false
+ } else if (task == null) wait()
+ }
+ } catch {
+ case consumed: InterruptedException =>
+ // allow thread to quit
+ }
+ }
+}
diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala
new file mode 100644
index 0000000000..5d3deadae4
--- /dev/null
+++ b/src/actors/scala/actors/ThreadedActor.scala
@@ -0,0 +1,26 @@
+package scala.actors
+
+trait ThreadedActor extends Actor {
+ private val lastSenders = new scala.collection.mutable.Stack[Actor]
+ def sender: Actor = {
+ if (lastSenders.isEmpty) null
+ else lastSenders.top
+ }
+ def pushSender(sender: Actor) = { lastSenders.push(sender) }
+ def popSender(): Unit = { lastSenders.pop }
+
+ def isThreaded = true
+
+ def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ notify()
+ }
+
+ def resetActor() = {
+ suspendActor = () => wait()
+ suspendActorFor = (msec: long) => wait(msec)
+ detachActor = (f: PartialFunction[Any, Unit]) => wait()
+ kill = () => {}
+ }
+
+ resetActor()
+}
diff --git a/src/actors/scala/actors/TimerThread.scala b/src/actors/scala/actors/TimerThread.scala
new file mode 100644
index 0000000000..205789415e
--- /dev/null
+++ b/src/actors/scala/actors/TimerThread.scala
@@ -0,0 +1,108 @@
+package scala.actors
+
+/**
+ * This class allows the (local) sending of a message to an actor after
+ * a timeout. Used by the library to build receiveWithin(time: long).
+ * Note that the library deletes non-received TIMEOUT message if a
+ * message is received before the time-out occurs.
+ *
+ * @author Sebastien Noir
+ * @author Philipp Haller
+ */
+
+object TimerThread extends AnyRef with Runnable {
+
+ case class WakedActor(actor: Reactor, f: PartialFunction[Any, Unit], time: long)
+ extends Ordered[WakedActor] {
+ var valid = true
+ def compare(that: WakedActor): int = -(this.time compare that.time)
+ }
+
+ var queue = new scala.collection.mutable.PriorityQueue[WakedActor]
+ val t = new Thread(this); t.start
+
+ var lateList: List[WakedActor] = Nil
+
+ def trashRequest(a: Reactor) = synchronized {
+ // keep in mind: killing dead people is a bad idea!
+ queue.elements.find((wa: WakedActor) => wa.actor == a && wa.valid) match {
+ case Some(b) =>
+ b.valid = false
+ case None =>
+ lateList.find((wa2: WakedActor) => wa2.actor == a && wa2.valid) match {
+ case Some(b2) =>
+ b2.valid = false
+ case None =>
+ }
+ }
+ }
+
+ override def run = {
+ try {
+ while(true) {
+ this.synchronized {
+ try {
+ val sleepTime = dequeueLateAndGetSleepTime
+ if (lateList.isEmpty) wait(sleepTime)
+ } catch {
+ case t: Throwable => { t.printStackTrace(); throw t }
+ }
+ }
+
+ // process guys waiting for signal and empty list
+ for (val wa <- lateList) {
+ if (wa.valid) {
+ wa.actor.continuation = null
+ wa.actor.scheduleActor(wa.f, TIMEOUT)
+ }
+ }
+ lateList = Nil
+ }
+ } catch {
+ case consumed: InterruptedException =>
+ // allow thread to quit
+ }
+ }
+
+ def requestTimeout(a: Reactor, f: PartialFunction[Any, Unit], waitMillis: long): unit = synchronized {
+ val wakeTime = now + waitMillis
+ if (waitMillis <= 0) {
+ a.continuation = null
+ a.scheduleActor(f, TIMEOUT)
+ return
+ }
+
+ if (queue.isEmpty) { // add to queue and restart sleeping
+ queue += WakedActor(a, f, wakeTime)
+ notify()
+ } else
+ if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
+ queue += WakedActor (a, f, wakeTime)
+ notify()
+ }
+ else // simply add to queue
+ queue += WakedActor (a, f, wakeTime)
+ }
+
+ private def dequeueLateAndGetSleepTime: long = {
+ val FOREVER: long = 0
+ var waitingList: List[WakedActor] = Nil
+
+ while (!queue.isEmpty) {
+ val next = queue.max.time
+ val amount = next - now
+ if (amount > 0) { // guy in queue is not late
+ lateList = waitingList // give back the list of waiting guys for signaling
+ return amount
+ }
+ else // we're late: dequeue and examine next guy
+ waitingList = queue.dequeue :: waitingList
+ }
+
+ // empty queue => sleep forever
+ lateList = waitingList
+ return FOREVER
+ }
+
+ def now = new java.util.Date().getTime()
+}
diff --git a/src/actors/scala/actors/remote/FreshNameCreator.scala b/src/actors/scala/actors/remote/FreshNameCreator.scala
new file mode 100644
index 0000000000..2e30c69f1f
--- /dev/null
+++ b/src/actors/scala/actors/remote/FreshNameCreator.scala
@@ -0,0 +1,26 @@
+package scala.actors.remote
+
+object FreshNameCreator {
+
+ protected var counter = 0
+ protected val counters = new scala.collection.mutable.HashMap[String,int]
+
+ /**
+ * Create a fresh name with the given prefix. It is guaranteed
+ * that the returned name has never been returned by a previous
+ * call to this function (provided the prefix does not end in a digit).
+ */
+ def newName(prefix: String): Symbol = {
+ val count = counters.get(prefix) match {
+ case Some(last) => last + 1
+ case None => 0
+ }
+ counters.update(prefix, count)
+ new Symbol(prefix + count)
+ }
+
+ def newName(): Symbol = {
+ counter = counter + 1
+ new Symbol("$" + counter + "$")
+ }
+}
diff --git a/src/actors/scala/actors/remote/JavaSerializer.scala b/src/actors/scala/actors/remote/JavaSerializer.scala
new file mode 100644
index 0000000000..94dabc3b47
--- /dev/null
+++ b/src/actors/scala/actors/remote/JavaSerializer.scala
@@ -0,0 +1,19 @@
+package scala.actors.remote
+
+import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
+
+class JavaSerializer(serv: Service) extends Serializer(serv) {
+ def serialize(o: AnyRef): Array[Byte] = {
+ val bos = new ByteArrayOutputStream()
+ val out = new ObjectOutputStream(bos)
+ out.writeObject(o)
+ out.flush()
+ bos.toByteArray()
+ }
+
+ def deserialize(bytes: Array[Byte]): AnyRef = {
+ val bis = new ByteArrayInputStream(bytes)
+ val in = new ObjectInputStream(bis)
+ in.readObject()
+ }
+}
diff --git a/src/actors/scala/actors/remote/NetKernel.scala b/src/actors/scala/actors/remote/NetKernel.scala
new file mode 100644
index 0000000000..12e9e5a304
--- /dev/null
+++ b/src/actors/scala/actors/remote/NetKernel.scala
@@ -0,0 +1,65 @@
+package scala.actors.remote
+
+import java.io.{IOException,StringReader,StringWriter}
+import java.net.UnknownHostException
+import scala.collection.mutable.{HashMap,HashSet}
+
+case class NamedSend(senderName: Symbol, receiver: Symbol, data: Array[Byte])
+
+class NetKernel(service: Service) {
+
+ def sendToNode(node: Node, msg: AnyRef) = {
+ val bytes = service.serializer.serialize(msg)
+ service.send(node, bytes)
+ }
+
+ def namedSend(node: Node, senderName: Symbol, receiver: Symbol, msg: AnyRef): Unit = {
+ val bytes = service.serializer.serialize(msg)
+ sendToNode(node, NamedSend(senderName, receiver, bytes))
+ }
+
+ def send(node: Node, name: Symbol, msg: AnyRef): Unit = {
+ val senderName = names.get(Actor.self) match {
+ case None => {
+ val freshName = FreshNameCreator.newName("remotesender")
+ register(freshName, Actor.self)
+ freshName
+ }
+ case Some(name) => name
+ }
+ namedSend(node, senderName, name, msg)
+ }
+
+ def processMsg(senderNode: Node, msg: AnyRef): Unit = synchronized {
+ msg match {
+ case NamedSend(senderName, receiver, data) =>
+ actors.get(receiver) match {
+ case Some(a) => {
+ val msg = service.serializer.deserialize(data)
+ val senderProxy = new Reactor {
+ override def run() = { a ! msg }
+ override def !(msg: Any): Unit = {
+ msg match {
+ case refmsg: AnyRef => {
+ namedSend(senderNode, receiver, senderName, refmsg)
+ }
+ }
+ }
+ override def !?(msg: Any): Any =
+ error("!? not implemented for remote actors.")
+ }
+ senderProxy.start()
+ }
+ case None => // message is lost
+ }
+ }
+ }
+
+ private val actors = new HashMap[Symbol, Actor]
+ private val names = new HashMap[Actor, Symbol]
+
+ private[actors] def register(name: Symbol, a: Actor): Unit = synchronized {
+ actors += name -> a
+ names += a -> name
+ }
+}
diff --git a/src/actors/scala/actors/remote/Serializer.scala b/src/actors/scala/actors/remote/Serializer.scala
new file mode 100644
index 0000000000..f68fdce263
--- /dev/null
+++ b/src/actors/scala/actors/remote/Serializer.scala
@@ -0,0 +1,43 @@
+package scala.actors.remote
+
+import java.io.{DataInputStream,DataOutputStream,EOFException,IOException}
+
+abstract class Serializer(val service: Service) {
+ def serialize(o: AnyRef): Array[byte]
+ def deserialize(a: Array[byte]): AnyRef
+
+ [throws(classOf[IOException])]
+ def readBytes(inputStream: DataInputStream): Array[byte] = {
+ try {
+ val length = inputStream.readInt()
+ val bytes = new Array[byte](length)
+ inputStream.readFully(bytes, 0, length)
+ return bytes
+ }
+ catch {
+ case npe: NullPointerException =>
+ throw new EOFException("Connection closed.")
+ }
+ }
+
+ [throws(classOf[IOException]), throws(classOf[ClassNotFoundException])]
+ def readObject(inputStream: DataInputStream): AnyRef = {
+ val bytes = readBytes(inputStream)
+ deserialize(bytes)
+ }
+
+ [throws(classOf[IOException])]
+ def writeBytes(outputStream: DataOutputStream, bytes: Array[byte]): unit = {
+ val length = bytes.length;
+ // original length
+ outputStream.writeInt(length)
+ outputStream.write(bytes, 0, length)
+ outputStream.flush()
+ }
+
+ [throws(classOf[IOException])]
+ def writeObject(outputStream: DataOutputStream, obj: AnyRef) = {
+ val bytes = serialize(obj)
+ writeBytes(outputStream, bytes)
+ }
+}
diff --git a/src/actors/scala/actors/remote/Service.scala b/src/actors/scala/actors/remote/Service.scala
new file mode 100644
index 0000000000..a9ccb3240c
--- /dev/null
+++ b/src/actors/scala/actors/remote/Service.scala
@@ -0,0 +1,8 @@
+package scala.actors.remote
+
+trait Service {
+ val kernel = new NetKernel(this)
+ val serializer: Serializer
+ def node: Node
+ def send(node: Node, data: Array[Byte]): Unit
+}
diff --git a/src/actors/scala/actors/remote/TcpService.scala b/src/actors/scala/actors/remote/TcpService.scala
new file mode 100644
index 0000000000..c8c987e75a
--- /dev/null
+++ b/src/actors/scala/actors/remote/TcpService.scala
@@ -0,0 +1,191 @@
+package scala.actors.remote
+
+import java.io.{DataInputStream,DataOutputStream,BufferedReader,PrintWriter,
+ IOException,InputStreamReader,OutputStreamWriter}
+import java.net.{InetAddress,ServerSocket,Socket,UnknownHostException}
+
+object TcpService {
+ val random = new java.util.Random(System.currentTimeMillis())
+
+ def generatePort: int = {
+ var portnum = 0
+ try {
+ portnum = 8000 + random.nextInt(500)
+ val socket = new ServerSocket(portnum)
+ socket.close()
+ }
+ catch {
+ case ioe: IOException =>
+ // this happens when trying to open a socket twice at the same port
+ // try again
+ generatePort
+ case se: SecurityException =>
+ // do nothing
+ }
+ portnum
+ }
+}
+
+class TcpService(port: Int) extends Thread with Service {
+ val serializer: JavaSerializer = new JavaSerializer(this)
+
+ private val internalNode = new TcpNode(InetAddress.getLocalHost().getHostAddress(), port)
+ def node: TcpNode = internalNode
+
+ def send(node: Node, data: Array[byte]): unit = synchronized {
+ // retrieve worker thread (if any) that already has connection
+ node match {
+ case tnode: TcpNode =>
+ getConnection(tnode) match {
+ case None =>
+ // we are not connected, yet
+ val newWorker = connect(tnode)
+ newWorker transmit data
+ case Some(worker) => worker transmit data
+ }
+ case any => error("no TcpNode!")
+ }
+ }
+
+ override def run(): Unit =
+ try {
+ val socket = new ServerSocket(port)
+
+ while (true) {
+ val nextClient = socket.accept()
+ val worker = new TcpServiceWorker(this, nextClient)
+ worker.readNode
+ worker.start()
+ }
+ } catch {
+ case ioe: IOException => // do nothing
+ case sec: SecurityException => // do nothing
+ }
+
+ // connection management
+
+ private val connections =
+ new scala.collection.mutable.HashMap[TcpNode, TcpServiceWorker]
+
+ private[actors] def addConnection(node: TcpNode, worker: TcpServiceWorker) = synchronized {
+ connections += node -> worker
+ }
+
+ def getConnection(n: TcpNode) = synchronized {
+ connections.get(n)
+ }
+
+ def isConnected(n: Node): Boolean = synchronized {
+ n match {
+ case tnode: TcpNode => !connections.get(tnode).isEmpty
+ case _ => false
+ }
+ }
+
+ def connect(n: Node): Unit = synchronized {
+ n match {
+ case tnode: TcpNode =>
+ connect(tnode)
+ }
+ }
+
+ def connect(n: TcpNode): TcpServiceWorker = synchronized {
+ val sock = new Socket(n.address, n.port)
+ val worker = new TcpServiceWorker(this, sock)
+ worker.sendNode(n)
+ worker.start()
+ addConnection(n, worker)
+ worker
+ }
+
+ def disconnectNode(n: Node) = synchronized {
+ n match {
+ case node: TcpNode =>
+ connections.get(node) match {
+ case None => // do nothing
+ case Some(worker) => {
+ connections -= node
+ worker.halt
+ }
+ }
+ case any => error("no TcpNode.")
+ }
+ }
+
+ def isReachable(node: Node): boolean =
+ if (isConnected(node)) true
+ else try {
+ connect(node)
+ return true
+ } catch {
+ case uhe: UnknownHostException => false
+ case ioe: IOException => false
+ case se: SecurityException => false
+ }
+
+ def nodeDown(mnode: TcpNode): Unit = synchronized {
+ connections -= mnode
+ }
+}
+
+
+class TcpServiceWorker(parent: TcpService, so: Socket) extends Thread {
+ val in = so.getInputStream()
+ val out = so.getOutputStream()
+
+ val datain = new DataInputStream(in)
+ val dataout = new DataOutputStream(out)
+
+ val reader = new BufferedReader(new InputStreamReader(in))
+ val writer = new PrintWriter(new OutputStreamWriter(out))
+
+ var connectedNode: TcpNode = _
+
+ def sendNode(n: TcpNode) = {
+ connectedNode = n
+ parent.serializer.writeObject(dataout, parent.node)
+ }
+
+ def readNode = {
+ //val node = parent.serializer.deserialize(reader)
+ val node = parent.serializer.readObject(datain)
+ node match {
+ case n: TcpNode => {
+ connectedNode = n
+ parent.addConnection(n, this)
+ }
+ }
+ }
+
+ def transmit(data: Array[byte]): Unit = synchronized {
+ dataout.writeInt(data.length)
+ dataout.write(data)
+ dataout.flush()
+ }
+
+ var running = true
+
+ def halt = synchronized {
+ so.close()
+ running = false
+ }
+
+ override def run(): Unit = {
+ try {
+ while (running) {
+ if (in.available() > 0) {
+ //val msg = parent.serializer.deserialize(reader);
+ val msg = parent.serializer.readObject(datain);
+ parent.kernel.processMsg(connectedNode, msg)
+ }
+ }
+ }
+ catch {
+ case ioe: IOException =>
+ parent nodeDown connectedNode
+ case e: Exception =>
+ // catch-all
+ parent nodeDown connectedNode
+ }
+ }
+}