diff options
author | Philipp Haller <hallerp@gmail.com> | 2006-10-09 21:49:03 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2006-10-09 21:49:03 +0000 |
commit | 7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb (patch) | |
tree | 1ddd2f1afbdb026eaf32695e0ed7af7ae7d16c8c | |
parent | 5f951ae31656ce06fc631ff17bc6df9077e66693 (diff) | |
download | scala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.tar.gz scala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.tar.bz2 scala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.zip |
Fixed eeeeeeeeeevil deadlock in scheduler/Worke...
Fixed eeeeeeeeeevil deadlock in scheduler/WorkerThread + some clean-ups.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/Channel.scala | 29 | ||||
-rw-r--r-- | src/actors/scala/actors/InputChannel.scala | 8 | ||||
-rw-r--r-- | src/actors/scala/actors/OutputChannel.scala | 6 | ||||
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 3 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 183 | ||||
-rw-r--r-- | src/actors/scala/actors/ThreadedActor.scala | 14 |
7 files changed, 209 insertions, 40 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 33e0016ef5..1caa6d6d8e 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -259,7 +259,7 @@ object Actor { * * @author Philipp Haller */ -trait Actor { +trait Actor extends OutputChannel[Any] { private[actors] val in = new Channel[Any] in.receiver = this @@ -291,6 +291,8 @@ trait Actor { */ def !(msg: Any): Unit = in ! msg + def forward(msg: Any): Unit = in forward msg + /** * Sends <code>msg</code> to this actor and awaits reply * (synchronous). @@ -307,7 +309,7 @@ trait Actor { private[actors] var kill: () => Unit = _ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) - + private[actors] def tick(): Unit private[actors] def isThreaded: boolean private[actors] def resetActor(): unit diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala index a8f63750e5..4bb4205614 100644 --- a/src/actors/scala/actors/Channel.scala +++ b/src/actors/scala/actors/Channel.scala @@ -10,8 +10,6 @@ package scala.actors -import Actor._ - case object TIMEOUT class SuspendActorException extends Throwable { @@ -31,7 +29,7 @@ class SuspendActorException extends Throwable { * * @author Philipp Haller */ -class Channel[Msg] { +class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] { private[actors] var receiver: Actor = synchronized { // basically Actor.self, but can be null @@ -39,7 +37,7 @@ class Channel[Msg] { if (t.isInstanceOf[ActorThread]) t.asInstanceOf[ActorThread] else { - val a = selfs.get(t).asInstanceOf[Actor] + val a = Actor.selfs.get(t).asInstanceOf[Actor] a } } @@ -53,6 +51,7 @@ class Channel[Msg] { private val messageQueue = new MessageQueue[Msg] private def send(msg: Msg, sender: Actor) = receiver.synchronized { + receiver.tick() if (waitingFor(msg) && ((waitingForSender == null) || (waitingForSender == sender))) { received = msg @@ -77,16 +76,16 @@ class Channel[Msg] { /** * Sends <code>msg</code> to this <code>Channel</code>. */ - def !(msg: Msg): unit = send(msg, self) + def !(msg: Msg): unit = send(msg, Actor.self) /** * Sends <code>msg</code> to this <code>Channel</code> and * awaits reply. */ def !?(msg: Msg): Any = { - self.freshReply() + Actor.self.freshReply() this ! msg - self.reply.receiveFrom(receiver) { + Actor.self.reply.receiveFrom(receiver) { case x => x } } @@ -101,15 +100,17 @@ class Channel[Msg] { * Receives a message from this <code>Channel</code>. */ def receive[R](f: PartialFunction[Msg, R]): R = { - assert(self == receiver, "receive from channel belonging to other actor") + assert(Actor.self == receiver, "receive from channel belonging to other actor") assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { + receiver.tick() waitingFor = f.isDefinedAt val q = messageQueue.extractFirst(waitingFor) if (q != null) { received = q.msg receiver.pushSender(q.sender) } + // acquire lock because we might call wait() else synchronized { receiver.suspendActor() } @@ -122,9 +123,10 @@ class Channel[Msg] { } private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = { - assert(self == receiver, "receive from channel belonging to other actor") + assert(Actor.self == receiver, "receive from channel belonging to other actor") assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { + receiver.tick() waitingFor = f.isDefinedAt waitingForSender = r var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => { @@ -153,9 +155,10 @@ class Channel[Msg] { * executed if specified. */ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = { - assert(self == receiver, "receive from channel belonging to other actor") + assert(Actor.self == receiver, "receive from channel belonging to other actor") assert(receiver.isThreaded, "receive invoked from reactor") receiver.synchronized { + receiver.tick() waitingFor = f.isDefinedAt val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -185,8 +188,9 @@ class Channel[Msg] { * <code>receive</code> for reactors. */ def react(f: PartialFunction[Any, Unit]): Nothing = { - assert(self == receiver, "react on channel belonging to other actor") + assert(Actor.self == receiver, "react on channel belonging to other actor") receiver.synchronized { + receiver.tick() waitingFor = f.isDefinedAt val q = messageQueue.extractFirst(waitingFor) if (q != null) { @@ -206,8 +210,9 @@ class Channel[Msg] { * <code>receiveWithin</code> for reactors. */ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = { - assert(self == receiver, "react on channel belonging to other actor") + assert(Actor.self == receiver, "react on channel belonging to other actor") receiver.synchronized { + receiver.tick() waitingFor = f.isDefinedAt val q = messageQueue.extractFirst(waitingFor) if (q != null) { diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala new file mode 100644 index 0000000000..ba97703b1a --- /dev/null +++ b/src/actors/scala/actors/InputChannel.scala @@ -0,0 +1,8 @@ +package scala.actors + +trait InputChannel[Msg] { + def receive[R](f: PartialFunction[Msg, R]): R + def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R + def react(f: PartialFunction[Any, Unit]): Nothing + def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing +} diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala new file mode 100644 index 0000000000..e189571378 --- /dev/null +++ b/src/actors/scala/actors/OutputChannel.scala @@ -0,0 +1,6 @@ +package scala.actors + +trait OutputChannel[Msg] { + def !(msg: Msg): Unit + def forward(msg: Msg): Unit +} diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala index 55639910a3..015dbd82f2 100644 --- a/src/actors/scala/actors/Reactor.scala +++ b/src/actors/scala/actors/Reactor.scala @@ -41,6 +41,9 @@ trait Reactor extends Actor { Scheduler.execute(task) } + private[actors] def tick(): Unit = + Scheduler.tick(this) + private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit = (f: PartialFunction[Any, Unit]) => { continuation = f diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index e908108b9e..e595cb73c8 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -21,7 +21,8 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, Queue} */ object Scheduler { private var sched: IScheduler = - new SpareWorkerScheduler + //new SpareWorkerScheduler + new TickedScheduler def impl = sched def impl_= (scheduler: IScheduler) = { @@ -87,7 +88,7 @@ class SpareWorkerScheduler extends IScheduler { private val idle = new Queue[WorkerThread] private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] - private var maxWorkers = 2 + private var terminating = false def init() = { for (val i <- 0 until 2) { @@ -102,44 +103,165 @@ class SpareWorkerScheduler extends IScheduler { if (!terminating) { if (idle.length == 0) { tasks += task - // create new worker - maxWorkers = maxWorkers + 1 val newWorker = new WorkerThread(this) workers += newWorker newWorker.start() } else { - idle.dequeue.execute(task) + val worker = idle.dequeue + worker.execute(task) } } } def getTask(worker: WorkerThread) = synchronized { - if (tasks.length > 0) tasks.dequeue + if (terminating) + QUIT_TASK else { - idle += worker - null + if (tasks.length > 0) tasks.dequeue + else { + idle += worker + null + } } } def tick(a: Reactor): Unit = {} + def shutdown(): Unit = synchronized { + terminating = true + + val idleThreads = idle.elements + while (idleThreads.hasNext) { + val worker = idleThreads.next + worker.running = false + worker.interrupt() + // caused deadlock (tries to acquire lock of worker) + //worker.join() + } + } +} + + +class TickedScheduler extends IScheduler { + private val tasks = new Queue[Reaction] + private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread] + + private val idle = new Queue[WorkerThread] + private val ticks = new scala.collection.mutable.HashMap[WorkerThread, long] + private val executing = new scala.collection.mutable.HashMap[Reactor, WorkerThread] + private var terminating = false + var TICKFREQ = 50 + + def init() = { + for (val i <- List.range(0, 2)) { + val worker = new WorkerThread(this) + workers += worker + worker.start() + } + } + init() + + def execute(item: Reaction): unit = synchronized { + if (!terminating) + if (idle.length > 0) { + val wt = idle.dequeue + executing.update(item.actor, wt) + wt.execute(item) + } + else { + /* + only create new worker thread when all are blocked + according to heuristic + + we check time stamps of latest send/receive ops of ALL + workers + + we stop if there is one which is not blocked + */ + + val iter = workers.elements + var foundBusy = false + while (iter.hasNext && !foundBusy) { + val wt = iter.next + ticks.get(wt) match { + case None => foundBusy = true // assume not blocked + case Some(ts) => { + val currTime = System.currentTimeMillis + if (currTime - ts < TICKFREQ) + foundBusy = true + } + } + } + + if (!foundBusy) { + val newWorker = new WorkerThread(this) + workers += newWorker + executing.update(item.actor, newWorker) + newWorker.execute(item) + newWorker.start() + } + else { + // wait assuming busy thread will be finished soon + // and ask for more work + tasks += item + } + } + } + + def getTask(worker: WorkerThread) = synchronized { + if (terminating) + QUIT_TASK + if (tasks.length > 0) { + val item = tasks.dequeue + executing.update(item.actor, worker) + item + } + else { + idle += worker + null + } + } + + var ticksCnt = 0 + + def tick(a: Reactor): unit = synchronized { + ticksCnt = ticksCnt + 1 + executing.get(a) match { + case None => // thread outside of scheduler; error("No worker thread associated with actor " + a) + case Some(wt) => + ticks.update(wt, System.currentTimeMillis) + } + } + def shutdown(): Unit = synchronized { terminating = true - val numNonIdle = workers.length - idle.length - for (val i <- 0 until numNonIdle) - tasks += QUIT_TASK + val idleThreads = idle.elements while (idleThreads.hasNext) { val worker = idleThreads.next + worker.running = false worker.interrupt() - worker.join() + // caused deadlock (tries to acquire lock of worker) + //worker.join() } } } + +class QuitException extends Throwable { + /* + For efficiency reasons we do not fill in + the execution stack trace. + */ + override def fillInStackTrace(): Throwable = { + this + } +} + + /** * This class is used by schedulers to execute reactor tasks on * multiple threads. @@ -148,24 +270,45 @@ class SpareWorkerScheduler extends IScheduler { */ class WorkerThread(sched: IScheduler) extends Thread { private var task: Runnable = null - private var running = true + private[actors] var running = true def execute(r: Runnable) = synchronized { task = r notify() } - override def run(): Unit = synchronized { + override def run(): Unit = { try { while (running) { - if (task != null) task.run() - task = sched.getTask(this) - if (task == sched.QUIT_TASK) { - running = false - } else if (task == null) wait() + if (task != null) { + try { + task.run() + } catch { + case consumed: InterruptedException => { + if (!running) throw new QuitException + } + } + } + this.synchronized { + task = sched.getTask(this) + + while (task == null) { + try { + wait() + } catch { + case consumed: InterruptedException => { + if (!running) throw new QuitException + } + } + } + + if (task == sched.QUIT_TASK) { + running = false + } + } } } catch { - case consumed: InterruptedException => + case consumed: QuitException => // allow thread to quit } } diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala index d6914d2f72..7059b70f3e 100644 --- a/src/actors/scala/actors/ThreadedActor.scala +++ b/src/actors/scala/actors/ThreadedActor.scala @@ -18,20 +18,22 @@ package scala.actors */ trait ThreadedActor extends Actor { private val lastSenders = new scala.collection.mutable.Stack[Actor] - def sender: Actor = { + private[actors] def sender: Actor = { if (lastSenders.isEmpty) null else lastSenders.top } - def pushSender(sender: Actor) = { lastSenders.push(sender) } - def popSender(): Unit = { lastSenders.pop } + private[actors] def pushSender(sender: Actor) = { lastSenders.push(sender) } + private[actors] def popSender(): Unit = { lastSenders.pop } - def isThreaded = true + private[actors] def isThreaded = true - def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { + private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = { notify() } - def resetActor() = { + private[actors] def tick(): Unit = {} + + private[actors] def resetActor() = { suspendActor = () => wait() suspendActorFor = (msec: long) => wait(msec) detachActor = (f: PartialFunction[Any, Unit]) => wait() |