summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-07-30 19:56:54 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-07-30 19:56:54 +0000
commit55561538cdf099875c3f0a1c45ade11c0979916f (patch)
tree57767e6bf6fe693b87dfb24af57bdea65cb073fe
parent9934175fadf078a77a70c13bf136f3d2268df143 (diff)
downloadscala-55561538cdf099875c3f0a1c45ade11c0979916f.tar.gz
scala-55561538cdf099875c3f0a1c45ade11c0979916f.tar.bz2
scala-55561538cdf099875c3f0a1c45ade11c0979916f.zip
Resolved deadlock and stack overflow problems i...
Resolved deadlock and stack overflow problems in multi-threaded actors.
-rw-r--r--src/actors/scala/actors/multi/MailBox.scala100
-rw-r--r--src/actors/scala/actors/multi/Process.scala16
-rw-r--r--src/actors/scala/actors/multi/ReceiverTask.scala1
-rw-r--r--src/actors/scala/actors/multi/Scheduler.scala62
-rw-r--r--src/actors/scala/actors/multi/TimerThread.scala4
5 files changed, 88 insertions, 95 deletions
diff --git a/src/actors/scala/actors/multi/MailBox.scala b/src/actors/scala/actors/multi/MailBox.scala
index 7d6da8924f..52aa0a7486 100644
--- a/src/actors/scala/actors/multi/MailBox.scala
+++ b/src/actors/scala/actors/multi/MailBox.scala
@@ -19,10 +19,10 @@ trait MailBox {
/** Unconsumed messages. */
var sent = new Queue[Any]
- var continuation: PartialFunction[Any,Unit] = null
+ var continuation: PartialFunction[Any, Unit] = null
// more complex continuation
- var contCases: PartialFunction[Any,Any] = null
- var contThen: Any => unit = null
+ var contCases: PartialFunction[Any, Any] = null
+ var contThen: Any => Unit = null
def hasCont =
if ((continuation == null) && (contCases == null)) false
@@ -40,7 +40,14 @@ trait MailBox {
private var pendingSignal = false
- def send(msg: Any): unit = synchronized {
+ def scheduleContinuation(msg: Any): Unit = {
+ val task = new ReceiverTask(this, msg)
+ //Debug.info("ready to receive. dispatch new task " + task)
+ scheduled = true
+ Scheduler.execute(task)
+ }
+
+ def send(msg: Any): Unit = synchronized {
if (isAlive) {
if (!hasCont || scheduled) {
//Debug.info("no cont avail/task already scheduled. appending msg to mailbox.")
@@ -55,12 +62,8 @@ trait MailBox {
msg match {
case Signal() =>
if (!contDefinedAt(TIMEOUT())) die()
- else {
- val task = new ReceiverTask(this, TIMEOUT())
- //Debug.info("ready to receive. dispatch new task " + task)
- scheduled = true
- Scheduler.execute(task)
- }
+ else
+ scheduleContinuation(TIMEOUT())
case _ =>
if (!contDefinedAt(msg))
sent += msg
@@ -69,10 +72,7 @@ trait MailBox {
pendingSignal = false
TimerThread.trashRequest(this)
}
- val task = new ReceiverTask(this, msg)
- //Debug.info("ready to receive. dispatch new task " + task)
- scheduled = true
- Scheduler.execute(task)
+ scheduleContinuation(msg)
}
}
}
@@ -82,8 +82,10 @@ trait MailBox {
//Debug.info("" + Thread.currentThread() + ": Resuming " + this)
if (continuation != null) {
val f = continuation
- continuation = null
- scheduled = false
+ this.synchronized {
+ continuation = null
+ scheduled = false
+ }
f(msg)
die()
}
@@ -100,14 +102,14 @@ trait MailBox {
}
}
- def receive(f: PartialFunction[Any,Unit]): Nothing = {
+ def receive(f: PartialFunction[Any, Unit]): Nothing = synchronized {
if (isAlive) {
Scheduler.tick(this)
continuation = null
sent.dequeueFirst(f.isDefinedAt) match {
case Some(msg) =>
- f(msg)
- die()
+ continuation = f
+ scheduleContinuation(msg)
case None =>
continuation = f
//Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
@@ -116,33 +118,37 @@ trait MailBox {
throw new Done
}
- def receiveWithin(msec: long)(f: PartialFunction[Any, unit]): Nothing = {
- Scheduler.tick(this)
- continuation = null
- sent.dequeueFirst(f.isDefinedAt) match {
- case Some(msg) =>
- f(msg)
- die()
- case None =>
- // if timeout == 0 then execute timeout action if specified (see Erlang book)
- if (msec == 0) {
- if (f.isDefinedAt(TIMEOUT()))
- f(TIMEOUT())
- die()
- }
- else {
- if (msec > 0) {
- TimerThread.requestTimeout(this, msec)
- pendingSignal = true
- }
+ def receiveWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing = synchronized {
+ if (isAlive) {
+ Scheduler.tick(this)
+ continuation = null
+ sent.dequeueFirst(f.isDefinedAt) match {
+ case Some(msg) => {
continuation = f
- //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ scheduleContinuation(msg)
}
+ case None =>
+ // if timeout == 0 then execute timeout action if specified (see Erlang book)
+ if (msec == 0) {
+ if (f.isDefinedAt(TIMEOUT())) {
+ continuation = f
+ scheduleContinuation(TIMEOUT())
+ }
+ die()
+ } else {
+ if (msec > 0) {
+ TimerThread.requestTimeout(this, msec)
+ pendingSignal = true
+ }
+ continuation = f
+ //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
+ }
+ }
}
throw new Done
}
- def receiveAndReturn(cases: PartialFunction[Any,Any], then: Any => unit): unit = {
+ def receiveAndReturn(cases: PartialFunction[Any, Any], then: Any => Unit): Unit = {
contCases = null
contThen = null
sent.dequeueFirst(cases.isDefinedAt) match {
@@ -162,17 +168,15 @@ trait MailBox {
// receiv {...} then (msg => {...msg...})
- class ReceiveAndReturn(cases: PartialFunction[Any,Any]) {
- def then(body: Any => unit): unit = receiveAndReturn(cases, body)
+ class ReceiveAndReturn(cases: PartialFunction[Any, Any]) {
+ def then(body: Any => Unit): Unit = receiveAndReturn(cases, body)
}
- def receiv(cases: PartialFunction[Any,Any]): ReceiveAndReturn =
+ def receiv(cases: PartialFunction[Any, Any]): ReceiveAndReturn =
new ReceiveAndReturn(cases)
- def die() = {
- if (isAlive) {
- isAlive = false
- //Debug.info("" + this + " died.")
- }
+ def die() = if (isAlive) {
+ isAlive = false
+ //Debug.info("" + this + " died.")
}
}
diff --git a/src/actors/scala/actors/multi/Process.scala b/src/actors/scala/actors/multi/Process.scala
index afac16cbdb..6eba3a995a 100644
--- a/src/actors/scala/actors/multi/Process.scala
+++ b/src/actors/scala/actors/multi/Process.scala
@@ -211,22 +211,6 @@ class Process extends scala.actors.Process with Actor[Any] {
}
}
- override def receive(f: PartialFunction[Any,unit]): Nothing = {
- if (isAlive) {
- Scheduler.tick(this)
- continuation = null
- sent.dequeueFirst(f.isDefinedAt) match {
- case Some(msg) =>
- process(f, msg)
- die()
- case None =>
- continuation = f
- //Debug.info("No msg found. " + this + " has continuation " + continuation + ".")
- }
- }
- throw new Done
- }
-
override def receiveMsg(msg: Any) = {
//Debug.info("" + Thread.currentThread() + ": Resuming " + this)
if (continuation != null) {
diff --git a/src/actors/scala/actors/multi/ReceiverTask.scala b/src/actors/scala/actors/multi/ReceiverTask.scala
index 033df00ef0..348c2247d3 100644
--- a/src/actors/scala/actors/multi/ReceiverTask.scala
+++ b/src/actors/scala/actors/multi/ReceiverTask.scala
@@ -15,6 +15,7 @@ package scala.actors.multi
*/
class ReceiverTask(val actor: MailBox, msg: Any) extends Runnable {
def run(): Unit = {
+ Scheduler.setProcess(Thread.currentThread(), actor)
try {
actor receiveMsg msg
}
diff --git a/src/actors/scala/actors/multi/Scheduler.scala b/src/actors/scala/actors/multi/Scheduler.scala
index 62d4f80cc7..88ec685368 100644
--- a/src/actors/scala/actors/multi/Scheduler.scala
+++ b/src/actors/scala/actors/multi/Scheduler.scala
@@ -15,22 +15,9 @@ import scala.collection.mutable._
/**
* @author Philipp Haller
*/
-abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
- def execute(item: ReceiverTask): Unit
- def getTask(worker: WorkerThread): Runnable
- def tick(a: MailBox): Unit
- def getProcess(t: Thread): Process
-
- val QUIT_TASK = new Runnable() {
- def run(): Unit = {}
- override def toString() = "QUIT_TASK"
- }
-}
-
-
object Scheduler /*extends java.util.concurrent.Executor*/ {
private var sched: /*java.util.concurrent.Executor*/ IScheduler =
- //java.util.concurrent.Executors.newFixedThreadPool(2);
+ //java.util.concurrent.Executors.newFixedThreadPool(4);
//new FixedWorkersScheduler(2);
new SpareWorkerScheduler2
//new SpareWorkerScheduler
@@ -41,17 +28,45 @@ object Scheduler /*extends java.util.concurrent.Executor*/ {
sched = scheduler
}
- def execute(item: ReceiverTask) =
+ def execute(item: ReceiverTask) = synchronized {
sched.execute(item)
+ }
- def tick(a: MailBox) =
+ def tick(a: MailBox) = {
sched.tick(a)
+ }
- def getProcess(t: Thread): Process =
- sched.getProcess(t)
+ private val process = new HashMap[Thread, MailBox]
+
+ def getProcess(t: Thread): Process = synchronized {
+ process.get(t) match {
+ case None => null
+ case Some(p: Process) => p
+ }
+ }
+
+ def setProcess(t: Thread, m: MailBox) = synchronized {
+ process.update(t, m)
+ }
}
+/**
+ * @author Philipp Haller
+ */
+abstract class IScheduler /*extends java.util.concurrent.Executor*/ {
+ def execute(item: ReceiverTask): Unit
+ def getTask(worker: WorkerThread): Runnable
+ def tick(a: MailBox): Unit
+
+ val QUIT_TASK = new Runnable() {
+ def run(): Unit = {}
+ override def toString() = "QUIT_TASK"
+ }
+}
+/**
+ * @author Philipp Haller
+ */
class SpareWorkerScheduler2 extends IScheduler {
private val tasks = new Queue[ReceiverTask];
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread];
@@ -59,7 +74,6 @@ class SpareWorkerScheduler2 extends IScheduler {
val idle = new Queue[WorkerThread];
val ticks = new HashMap[WorkerThread, long]
val executing = new HashMap[MailBox, WorkerThread]
- val rexec = new HashMap[Thread, MailBox]
var TICKFREQ = 50
@@ -84,18 +98,10 @@ class SpareWorkerScheduler2 extends IScheduler {
}
}
- def getProcess(t: Thread): Process = synchronized {
- rexec.get(t) match {
- case None => null
- case Some(p: Process) => p
- }
- }
-
def execute(item: ReceiverTask): unit = synchronized {
if (idle.length > 0) {
val wt = idle.dequeue
executing.update(item.actor, wt)
- rexec.update(wt, item.actor)
wt.execute(item)
}
else {
@@ -127,7 +133,6 @@ class SpareWorkerScheduler2 extends IScheduler {
maxWorkers = workers.length // statistics
executing.update(item.actor, newWorker)
- rexec.update(newWorker, item.actor)
newWorker.execute(item)
newWorker.start()
@@ -144,7 +149,6 @@ class SpareWorkerScheduler2 extends IScheduler {
if (tasks.length > 0) {
val item = tasks.dequeue
executing.update(item.actor, worker)
- rexec.update(worker, item.actor)
item
}
else {
diff --git a/src/actors/scala/actors/multi/TimerThread.scala b/src/actors/scala/actors/multi/TimerThread.scala
index 436b27354f..d344e7acef 100644
--- a/src/actors/scala/actors/multi/TimerThread.scala
+++ b/src/actors/scala/actors/multi/TimerThread.scala
@@ -102,8 +102,8 @@ object TimerThread extends AnyRef with Runnable {
notify()
} else
if (queue.max.time > wakeTime) { // add to 1st position and restart sleeping
- queue += WakedActor (a, wakeTime, "")
- notify()
+ queue += WakedActor (a, wakeTime, "")
+ notify()
}
else // simply add to queue
queue += WakedActor (a, wakeTime, "")