summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-10-09 21:49:03 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-10-09 21:49:03 +0000
commit7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb (patch)
tree1ddd2f1afbdb026eaf32695e0ed7af7ae7d16c8c /src
parent5f951ae31656ce06fc631ff17bc6df9077e66693 (diff)
downloadscala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.tar.gz
scala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.tar.bz2
scala-7096ee3e73cb2b9fb4603ce2b9af0b997eca60fb.zip
Fixed eeeeeeeeeevil deadlock in scheduler/Worke...
Fixed eeeeeeeeeevil deadlock in scheduler/WorkerThread + some clean-ups.
Diffstat (limited to 'src')
-rw-r--r--src/actors/scala/actors/Actor.scala6
-rw-r--r--src/actors/scala/actors/Channel.scala29
-rw-r--r--src/actors/scala/actors/InputChannel.scala8
-rw-r--r--src/actors/scala/actors/OutputChannel.scala6
-rw-r--r--src/actors/scala/actors/Reactor.scala3
-rw-r--r--src/actors/scala/actors/Scheduler.scala183
-rw-r--r--src/actors/scala/actors/ThreadedActor.scala14
7 files changed, 209 insertions, 40 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 33e0016ef5..1caa6d6d8e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -259,7 +259,7 @@ object Actor {
*
* @author Philipp Haller
*/
-trait Actor {
+trait Actor extends OutputChannel[Any] {
private[actors] val in = new Channel[Any]
in.receiver = this
@@ -291,6 +291,8 @@ trait Actor {
*/
def !(msg: Any): Unit = in ! msg
+ def forward(msg: Any): Unit = in forward msg
+
/**
* Sends <code>msg</code> to this actor and awaits reply
* (synchronous).
@@ -307,7 +309,7 @@ trait Actor {
private[actors] var kill: () => Unit = _
private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)
-
+ private[actors] def tick(): Unit
private[actors] def isThreaded: boolean
private[actors] def resetActor(): unit
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index a8f63750e5..4bb4205614 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -10,8 +10,6 @@
package scala.actors
-import Actor._
-
case object TIMEOUT
class SuspendActorException extends Throwable {
@@ -31,7 +29,7 @@ class SuspendActorException extends Throwable {
*
* @author Philipp Haller
*/
-class Channel[Msg] {
+class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private[actors] var receiver: Actor = synchronized {
// basically Actor.self, but can be null
@@ -39,7 +37,7 @@ class Channel[Msg] {
if (t.isInstanceOf[ActorThread])
t.asInstanceOf[ActorThread]
else {
- val a = selfs.get(t).asInstanceOf[Actor]
+ val a = Actor.selfs.get(t).asInstanceOf[Actor]
a
}
}
@@ -53,6 +51,7 @@ class Channel[Msg] {
private val messageQueue = new MessageQueue[Msg]
private def send(msg: Msg, sender: Actor) = receiver.synchronized {
+ receiver.tick()
if (waitingFor(msg) && ((waitingForSender == null) ||
(waitingForSender == sender))) {
received = msg
@@ -77,16 +76,16 @@ class Channel[Msg] {
/**
* Sends <code>msg</code> to this <code>Channel</code>.
*/
- def !(msg: Msg): unit = send(msg, self)
+ def !(msg: Msg): unit = send(msg, Actor.self)
/**
* Sends <code>msg</code> to this <code>Channel</code> and
* awaits reply.
*/
def !?(msg: Msg): Any = {
- self.freshReply()
+ Actor.self.freshReply()
this ! msg
- self.reply.receiveFrom(receiver) {
+ Actor.self.reply.receiveFrom(receiver) {
case x => x
}
}
@@ -101,15 +100,17 @@ class Channel[Msg] {
* Receives a message from this <code>Channel</code>.
*/
def receive[R](f: PartialFunction[Msg, R]): R = {
- assert(self == receiver, "receive from channel belonging to other actor")
+ assert(Actor.self == receiver, "receive from channel belonging to other actor")
assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
+ receiver.tick()
waitingFor = f.isDefinedAt
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
received = q.msg
receiver.pushSender(q.sender)
}
+ // acquire lock because we might call wait()
else synchronized {
receiver.suspendActor()
}
@@ -122,9 +123,10 @@ class Channel[Msg] {
}
private[actors] def receiveFrom[R](r: Actor)(f: PartialFunction[Msg, R]): R = {
- assert(self == receiver, "receive from channel belonging to other actor")
+ assert(Actor.self == receiver, "receive from channel belonging to other actor")
assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
+ receiver.tick()
waitingFor = f.isDefinedAt
waitingForSender = r
var q = messageQueue.dequeueFirst((item: MessageQueueResult[Msg]) => {
@@ -153,9 +155,10 @@ class Channel[Msg] {
* executed if specified.
*/
def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R = {
- assert(self == receiver, "receive from channel belonging to other actor")
+ assert(Actor.self == receiver, "receive from channel belonging to other actor")
assert(receiver.isThreaded, "receive invoked from reactor")
receiver.synchronized {
+ receiver.tick()
waitingFor = f.isDefinedAt
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -185,8 +188,9 @@ class Channel[Msg] {
* <code>receive</code> for reactors.
*/
def react(f: PartialFunction[Any, Unit]): Nothing = {
- assert(self == receiver, "react on channel belonging to other actor")
+ assert(Actor.self == receiver, "react on channel belonging to other actor")
receiver.synchronized {
+ receiver.tick()
waitingFor = f.isDefinedAt
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
@@ -206,8 +210,9 @@ class Channel[Msg] {
* <code>receiveWithin</code> for reactors.
*/
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = {
- assert(self == receiver, "react on channel belonging to other actor")
+ assert(Actor.self == receiver, "react on channel belonging to other actor")
receiver.synchronized {
+ receiver.tick()
waitingFor = f.isDefinedAt
val q = messageQueue.extractFirst(waitingFor)
if (q != null) {
diff --git a/src/actors/scala/actors/InputChannel.scala b/src/actors/scala/actors/InputChannel.scala
new file mode 100644
index 0000000000..ba97703b1a
--- /dev/null
+++ b/src/actors/scala/actors/InputChannel.scala
@@ -0,0 +1,8 @@
+package scala.actors
+
+trait InputChannel[Msg] {
+ def receive[R](f: PartialFunction[Msg, R]): R
+ def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R
+ def react(f: PartialFunction[Any, Unit]): Nothing
+ def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing
+}
diff --git a/src/actors/scala/actors/OutputChannel.scala b/src/actors/scala/actors/OutputChannel.scala
new file mode 100644
index 0000000000..e189571378
--- /dev/null
+++ b/src/actors/scala/actors/OutputChannel.scala
@@ -0,0 +1,6 @@
+package scala.actors
+
+trait OutputChannel[Msg] {
+ def !(msg: Msg): Unit
+ def forward(msg: Msg): Unit
+}
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala
index 55639910a3..015dbd82f2 100644
--- a/src/actors/scala/actors/Reactor.scala
+++ b/src/actors/scala/actors/Reactor.scala
@@ -41,6 +41,9 @@ trait Reactor extends Actor {
Scheduler.execute(task)
}
+ private[actors] def tick(): Unit =
+ Scheduler.tick(this)
+
private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
(f: PartialFunction[Any, Unit]) => {
continuation = f
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index e908108b9e..e595cb73c8 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -21,7 +21,8 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, Queue}
*/
object Scheduler {
private var sched: IScheduler =
- new SpareWorkerScheduler
+ //new SpareWorkerScheduler
+ new TickedScheduler
def impl = sched
def impl_= (scheduler: IScheduler) = {
@@ -87,7 +88,7 @@ class SpareWorkerScheduler extends IScheduler {
private val idle = new Queue[WorkerThread]
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
- private var maxWorkers = 2
+ private var terminating = false
def init() = {
for (val i <- 0 until 2) {
@@ -102,44 +103,165 @@ class SpareWorkerScheduler extends IScheduler {
if (!terminating) {
if (idle.length == 0) {
tasks += task
- // create new worker
- maxWorkers = maxWorkers + 1
val newWorker = new WorkerThread(this)
workers += newWorker
newWorker.start()
}
else {
- idle.dequeue.execute(task)
+ val worker = idle.dequeue
+ worker.execute(task)
}
}
}
def getTask(worker: WorkerThread) = synchronized {
- if (tasks.length > 0) tasks.dequeue
+ if (terminating)
+ QUIT_TASK
else {
- idle += worker
- null
+ if (tasks.length > 0) tasks.dequeue
+ else {
+ idle += worker
+ null
+ }
}
}
def tick(a: Reactor): Unit = {}
+ def shutdown(): Unit = synchronized {
+ terminating = true
+
+ val idleThreads = idle.elements
+ while (idleThreads.hasNext) {
+ val worker = idleThreads.next
+ worker.running = false
+ worker.interrupt()
+ // caused deadlock (tries to acquire lock of worker)
+ //worker.join()
+ }
+ }
+}
+
+
+class TickedScheduler extends IScheduler {
+ private val tasks = new Queue[Reaction]
+ private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
+
+ private val idle = new Queue[WorkerThread]
+ private val ticks = new scala.collection.mutable.HashMap[WorkerThread, long]
+ private val executing = new scala.collection.mutable.HashMap[Reactor, WorkerThread]
+
private var terminating = false
+ var TICKFREQ = 50
+
+ def init() = {
+ for (val i <- List.range(0, 2)) {
+ val worker = new WorkerThread(this)
+ workers += worker
+ worker.start()
+ }
+ }
+ init()
+
+ def execute(item: Reaction): unit = synchronized {
+ if (!terminating)
+ if (idle.length > 0) {
+ val wt = idle.dequeue
+ executing.update(item.actor, wt)
+ wt.execute(item)
+ }
+ else {
+ /*
+ only create new worker thread when all are blocked
+ according to heuristic
+
+ we check time stamps of latest send/receive ops of ALL
+ workers
+
+ we stop if there is one which is not blocked
+ */
+
+ val iter = workers.elements
+ var foundBusy = false
+ while (iter.hasNext && !foundBusy) {
+ val wt = iter.next
+ ticks.get(wt) match {
+ case None => foundBusy = true // assume not blocked
+ case Some(ts) => {
+ val currTime = System.currentTimeMillis
+ if (currTime - ts < TICKFREQ)
+ foundBusy = true
+ }
+ }
+ }
+
+ if (!foundBusy) {
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+ executing.update(item.actor, newWorker)
+ newWorker.execute(item)
+ newWorker.start()
+ }
+ else {
+ // wait assuming busy thread will be finished soon
+ // and ask for more work
+ tasks += item
+ }
+ }
+ }
+
+ def getTask(worker: WorkerThread) = synchronized {
+ if (terminating)
+ QUIT_TASK
+ if (tasks.length > 0) {
+ val item = tasks.dequeue
+ executing.update(item.actor, worker)
+ item
+ }
+ else {
+ idle += worker
+ null
+ }
+ }
+
+ var ticksCnt = 0
+
+ def tick(a: Reactor): unit = synchronized {
+ ticksCnt = ticksCnt + 1
+ executing.get(a) match {
+ case None => // thread outside of scheduler; error("No worker thread associated with actor " + a)
+ case Some(wt) =>
+ ticks.update(wt, System.currentTimeMillis)
+ }
+ }
+
def shutdown(): Unit = synchronized {
terminating = true
- val numNonIdle = workers.length - idle.length
- for (val i <- 0 until numNonIdle)
- tasks += QUIT_TASK
+
val idleThreads = idle.elements
while (idleThreads.hasNext) {
val worker = idleThreads.next
+ worker.running = false
worker.interrupt()
- worker.join()
+ // caused deadlock (tries to acquire lock of worker)
+ //worker.join()
}
}
}
+
+class QuitException extends Throwable {
+ /*
+ For efficiency reasons we do not fill in
+ the execution stack trace.
+ */
+ override def fillInStackTrace(): Throwable = {
+ this
+ }
+}
+
+
/**
* This class is used by schedulers to execute reactor tasks on
* multiple threads.
@@ -148,24 +270,45 @@ class SpareWorkerScheduler extends IScheduler {
*/
class WorkerThread(sched: IScheduler) extends Thread {
private var task: Runnable = null
- private var running = true
+ private[actors] var running = true
def execute(r: Runnable) = synchronized {
task = r
notify()
}
- override def run(): Unit = synchronized {
+ override def run(): Unit = {
try {
while (running) {
- if (task != null) task.run()
- task = sched.getTask(this)
- if (task == sched.QUIT_TASK) {
- running = false
- } else if (task == null) wait()
+ if (task != null) {
+ try {
+ task.run()
+ } catch {
+ case consumed: InterruptedException => {
+ if (!running) throw new QuitException
+ }
+ }
+ }
+ this.synchronized {
+ task = sched.getTask(this)
+
+ while (task == null) {
+ try {
+ wait()
+ } catch {
+ case consumed: InterruptedException => {
+ if (!running) throw new QuitException
+ }
+ }
+ }
+
+ if (task == sched.QUIT_TASK) {
+ running = false
+ }
+ }
}
} catch {
- case consumed: InterruptedException =>
+ case consumed: QuitException =>
// allow thread to quit
}
}
diff --git a/src/actors/scala/actors/ThreadedActor.scala b/src/actors/scala/actors/ThreadedActor.scala
index d6914d2f72..7059b70f3e 100644
--- a/src/actors/scala/actors/ThreadedActor.scala
+++ b/src/actors/scala/actors/ThreadedActor.scala
@@ -18,20 +18,22 @@ package scala.actors
*/
trait ThreadedActor extends Actor {
private val lastSenders = new scala.collection.mutable.Stack[Actor]
- def sender: Actor = {
+ private[actors] def sender: Actor = {
if (lastSenders.isEmpty) null
else lastSenders.top
}
- def pushSender(sender: Actor) = { lastSenders.push(sender) }
- def popSender(): Unit = { lastSenders.pop }
+ private[actors] def pushSender(sender: Actor) = { lastSenders.push(sender) }
+ private[actors] def popSender(): Unit = { lastSenders.pop }
- def isThreaded = true
+ private[actors] def isThreaded = true
- def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
+ private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = {
notify()
}
- def resetActor() = {
+ private[actors] def tick(): Unit = {}
+
+ private[actors] def resetActor() = {
suspendActor = () => wait()
suspendActorFor = (msec: long) => wait(msec)
detachActor = (f: PartialFunction[Any, Unit]) => wait()