summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-09-05 16:13:16 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-09-05 16:13:16 +0000
commit4fdaad4888591939074e9c8d93069c1b2f0d6dde (patch)
tree88290bed8ed9f114002cfd293c6514c2c613142a
parentaa6cc51acba5ba192b3fee0458a876d8c5476677 (diff)
downloadscala-4fdaad4888591939074e9c8d93069c1b2f0d6dde.tar.gz
scala-4fdaad4888591939074e9c8d93069c1b2f0d6dde.tar.bz2
scala-4fdaad4888591939074e9c8d93069c1b2f0d6dde.zip
Fixed bug with automatic termination (pending r...
Fixed bug with automatic termination (pending reactions). Fixed stack overflow problem in SingleThreadedScheduler.
-rw-r--r--src/actors/scala/actors/Actor.scala12
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala14
-rw-r--r--src/actors/scala/actors/Reaction.scala30
-rw-r--r--src/actors/scala/actors/Scheduler.scala26
4 files changed, 51 insertions, 31 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 4b7ddac2b6..cba7a8c4c0 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -403,7 +403,6 @@ trait Actor extends OutputChannel[Any] {
def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
- Scheduler.pendReaction
this.synchronized {
tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
@@ -432,7 +431,6 @@ trait Actor extends OutputChannel[Any] {
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
- Scheduler.pendReaction
this.synchronized {
tick()
// first, remove spurious TIMEOUT message from mailbox if any
@@ -653,7 +651,13 @@ trait Actor extends OutputChannel[Any] {
/**
* Starts this actor.
*/
- def start(): Actor = {
+ def start(): Actor = synchronized {
+ // reset various flags
+ trapExit = false
+ exitReason = 'normal
+ exiting = false
+ shouldExit = false
+
Scheduler start new Reaction(this)
this
}
@@ -670,7 +674,7 @@ trait Actor extends OutputChannel[Any] {
case 'kill => Actor.self.kill()
}, 'kill)
- throw new ExitActorException
+ throw new SuspendActorException
}
private[actors] var links: List[Actor] = Nil
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index cf1cdfb822..c797279602 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -11,10 +11,12 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
- * @version 0.9.8
+ * @version 0.9.9
* @author Philipp Haller
*/
-class FJTaskScheduler2 extends Thread with IScheduler {
+class FJTaskScheduler2(core: Int, max: Int) extends Thread with IScheduler {
+ // as long as this thread runs, JVM should not exit
+ setDaemon(false)
val printStats = false
//val printStats = true
@@ -24,11 +26,11 @@ class FJTaskScheduler2 extends Thread with IScheduler {
val initCoreSize =
if (null ne coreProp) Integer.parseInt(coreProp)
- else 4
+ else core
val maxSize =
if (null ne maxProp) Integer.parseInt(maxProp)
- else 256
+ else max
private var coreSize = initCoreSize
@@ -130,9 +132,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
}
def start(task: Runnable) {
- this.synchronized {
- pendingReactions = pendingReactions + 1
- }
+ pendReaction
executor.execute(task)
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index 8e46ecacf1..9f46e9c0ce 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -42,38 +42,38 @@ private[actors] class ExitActorException extends Throwable
def run() {
val saved = Actor.tl.get.asInstanceOf[Actor]
Actor.tl.set(a)
- Scheduler.unPendReaction
a.isDetached = false
try {
- try {
- if (a.shouldExit) // links
- a.exit()
- else {
- if (f == null)
- a.act()
- else
- f(msg)
- a.kill(); a.exit()
- }
- } catch {
- case eae: ExitActorException =>
- Debug.info(a+": caught "+eae)
+ if (a.shouldExit) // links
+ a.exit()
+ else {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+ a.kill(); a.exit()
}
}
catch {
+ case eae: ExitActorException => {
+ Scheduler.unPendReaction
+ }
case _: SuspendActorException => {
// do nothing (continuation is already saved)
}
case t: Throwable => {
Debug.info(a+": caught "+t)
+ t.printStackTrace()
+ Scheduler.unPendReaction
// links
a.synchronized {
if (!a.links.isEmpty)
a.exitLinked(t)
}
}
+ } finally {
+ Actor.tl.set(saved)
}
- Actor.tl.set(saved)
}
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 8a88ac0706..6c71841ef5 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -28,7 +28,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
object Scheduler {
private var sched: IScheduler =
{
- var s: IScheduler = new FJTaskScheduler2
+ var s: IScheduler = new FJTaskScheduler2(4, 256)
s.start()
s
}
@@ -36,6 +36,7 @@ object Scheduler {
def impl = sched
def impl_= (scheduler: IScheduler) = {
sched = scheduler
+ sched.start()
}
var tasks: LinkedQueue = null
@@ -49,7 +50,7 @@ object Scheduler {
def restart(): Unit = synchronized {
sched = {
- var s: IScheduler = new FJTaskScheduler2
+ var s: IScheduler = new FJTaskScheduler2(4, 256)
s.asInstanceOf[FJTaskScheduler2].setPendingCount(pendingCount)
s.start()
s
@@ -125,20 +126,35 @@ trait IScheduler {
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
- * @version 0.9.8
+ * @version 0.9.9
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
def start() {}
+ val taskQ = new scala.collection.mutable.Queue[Runnable]
+
def start(task: Runnable) {
// execute task immediately on same thread
task.run()
+ while (taskQ.length > 0) {
+ val nextTask = taskQ.dequeue
+ nextTask.run()
+ }
}
def execute(task: Runnable) {
- // execute task immediately on same thread
- task.run()
+ if (Actor.tl.get.isInstanceOf[ActorProxy]) {
+ // execute task immediately on same thread
+ task.run()
+ while (taskQ.length > 0) {
+ val nextTask = taskQ.dequeue
+ nextTask.run()
+ }
+ } else {
+ // queue task for later execution
+ taskQ += task
+ }
}
def getTask(worker: WorkerThread): Runnable = null