summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2006-11-03 16:08:43 +0000
committerPhilipp Haller <hallerp@gmail.com>2006-11-03 16:08:43 +0000
commitbc752a61fed299942cce659f075702ef9e4fb205 (patch)
tree8bba01bae3931e16f7b528ea968564a0f99a1b80
parentf03c47f101aa39236b6eea2b94c1bca216b52cb5 (diff)
downloadscala-bc752a61fed299942cce659f075702ef9e4fb205.tar.gz
scala-bc752a61fed299942cce659f075702ef9e4fb205.tar.bz2
scala-bc752a61fed299942cce659f075702ef9e4fb205.zip
Fixed severe bug (badly handled interruption).
-rw-r--r--src/actors/scala/actors/Actor.scala42
-rw-r--r--src/actors/scala/actors/Reaction.scala11
-rw-r--r--src/actors/scala/actors/Scheduler.scala101
3 files changed, 104 insertions, 50 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 380486d87d..b09b9b4a1e 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -11,6 +11,7 @@
package scala.actors
import scala.collection.mutable.{HashSet, Stack}
+import compat.Platform
/**
* The <code>Actor</code> object provides functions for the definition of
@@ -123,7 +124,6 @@ object Actor {
def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing =
self.in.reactWithin(msec)(f)
- /*
def eventloop(f: PartialFunction[Any, Unit]): Nothing =
self.in.react(new RecursiveProxyHandler(self, f))
@@ -136,7 +136,6 @@ object Actor {
self.in.react(this)
}
}
- */
/**
* <p>Used for receiving a message from a specific actor.</p>
@@ -374,11 +373,44 @@ trait Actor extends OutputChannel[Any] {
private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
private[actors] var kill: () => Unit = _
+ private var continue = false
+
private[actors] def resetActor(): Unit = {
- suspendActor = () => wait()
- suspendActorFor = (msec: long) => wait(msec)
- resumeActor = () => notify()
+ suspendActor = () => {
+ continue = false
+ while(!continue) {
+ try {
+ wait()
+ } catch {
+ case t: InterruptedException =>
+ }
+ }
+ }
+
+ suspendActorFor = (msec: long) => {
+ val ts = Platform.currentTime
+ var waittime = msec
+ continue = false
+ while(!continue) {
+ try {
+ wait(waittime)
+ } catch {
+ case t: InterruptedException => {
+ val now = Platform.currentTime
+ val waited = now-ts
+ waittime = msec-waited
+ }
+ }
+ }
+ }
+
+ resumeActor = () => {
+ continue = true
+ notify()
+ }
+
detachActor = defaultDetachActor
+
kill = () => {}
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index f88e6b7c77..da57c0cd3c 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -53,12 +53,17 @@ private[actors] class StartTask(a: Actor) extends Reaction {
a.exit("normal")
}
catch {
- case _: InterruptedException =>
+ case ie: InterruptedException => {
+ ie.printStackTrace()
a.exitLinked()
- case d: SuspendActorException =>
+ }
+ case d: SuspendActorException => {
// do nothing (continuation is already saved)
- case t: Throwable =>
+ }
+ case t: Throwable => {
+ t.printStackTrace()
a.exit(t.toString())
+ }
}
finally {
Actor.selfs.put(t, saved)
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 49c14f010f..c79f9c49ab 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -28,7 +28,11 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue}
object Scheduler {
private var sched: IScheduler =
//new SpareWorkerScheduler
- new TickedScheduler
+ {
+ val s = new TickedScheduler
+ s.start()
+ s
+ }
def impl = sched
def impl_= (scheduler: IScheduler) = {
@@ -51,7 +55,7 @@ object Scheduler {
* @version Beta2
* @author Philipp Haller
*/
-abstract class IScheduler {
+trait IScheduler {
def execute(task: Reaction): Unit
def getTask(worker: WorkerThread): Runnable
def tick(a: Actor): Unit
@@ -156,7 +160,7 @@ class SpareWorkerScheduler extends IScheduler {
*
* @author Philipp Haller
*/
-class TickedScheduler extends IScheduler {
+class TickedScheduler extends Thread with IScheduler {
private val tasks = new Queue[Reaction]
private var workers: Buffer[WorkerThread] = new ArrayBuffer[WorkerThread]
@@ -166,7 +170,8 @@ class TickedScheduler extends IScheduler {
private var terminating = false
- var TICKFREQ = 50
+ var TICKFREQ = 5
+ var CHECKFREQ = 50
def init() = {
for (val i <- List.range(0, 2)) {
@@ -177,6 +182,54 @@ class TickedScheduler extends IScheduler {
}
init()
+ override def run(): unit = {
+ try {
+ while (!terminating) {
+ this.synchronized {
+ try {
+ wait(CHECKFREQ)
+ } catch {
+ case _: InterruptedException =>
+ if (terminating) throw new QuitException
+ }
+
+ if (tasks.length > 0) {
+ // check if we need more threads
+ val iter = workers.elements
+ var foundBusy = false
+ while (iter.hasNext && !foundBusy) {
+ val wt = iter.next
+ ticks.get(wt) match {
+ case None =>
+ foundBusy = false
+ case Some(ts) =>
+ val currTime = Platform.currentTime
+ if (currTime - ts < TICKFREQ)
+ foundBusy = true
+ }
+ }
+
+ if (!foundBusy) {
+ val newWorker = new WorkerThread(this)
+ workers += newWorker
+
+ // dequeue item to be processed
+ val item = tasks.dequeue
+
+ executing.update(item.actor, newWorker)
+ newWorker.execute(item)
+ newWorker.start()
+ }
+ } // tasks.length > 0
+ } // sync
+
+ } // while (!terminating)
+ } catch {
+ case _: QuitException =>
+ // allow thread to exit
+ }
+ }
+
def execute(item: Reaction): unit = synchronized {
if (!terminating)
if (idle.length > 0) {
@@ -184,44 +237,8 @@ class TickedScheduler extends IScheduler {
executing.update(item.actor, wt)
wt.execute(item)
}
- else {
- /*
- only create new worker thread when all are blocked
- according to heuristic
-
- we check time stamps of latest send/receive ops of ALL
- workers
-
- we stop if there is one which is not blocked
- */
-
- val iter = workers.elements
- var foundBusy = false
- while (iter.hasNext && !foundBusy) {
- val wt = iter.next
- ticks.get(wt) match {
- case None =>
- foundBusy = true // assume not blocked
- case Some(ts) =>
- val currTime = Platform.currentTime
- if (currTime - ts < TICKFREQ)
- foundBusy = true
- }
- }
-
- if (!foundBusy) {
- val newWorker = new WorkerThread(this)
- workers += newWorker
- executing.update(item.actor, newWorker)
- newWorker.execute(item)
- newWorker.start()
- }
- else {
- // wait assuming busy thread will be finished soon
- // and ask for more work
- tasks += item
- }
- }
+ else
+ tasks += item
}
def getTask(worker: WorkerThread) = synchronized {