diff options
author | Philipp Haller <hallerp@gmail.com> | 2007-09-10 12:11:29 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2007-09-10 12:11:29 +0000 |
commit | ee5d963fc27f2d8168d67d9d07257ffaa528a10f (patch) | |
tree | 6c84ab118b98c9aa6c4b94378b4146d3e261863e | |
parent | be7c4a11a03f1719bd2a33c45cab56b1dc433626 (diff) | |
download | scala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.tar.gz scala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.tar.bz2 scala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.zip |
Ported bug fixes from trunk.
-rw-r--r-- | src/actors/scala/actors/Actor.scala | 13 | ||||
-rw-r--r-- | src/actors/scala/actors/FJTaskScheduler2.scala | 17 | ||||
-rw-r--r-- | src/actors/scala/actors/Reaction.scala | 30 | ||||
-rw-r--r-- | src/actors/scala/actors/Scheduler.scala | 22 |
4 files changed, 57 insertions, 25 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala index 48d491cc35..cba7a8c4c0 100644 --- a/src/actors/scala/actors/Actor.scala +++ b/src/actors/scala/actors/Actor.scala @@ -266,6 +266,7 @@ object Actor { * @version 0.9.9 * @author Philipp Haller */ +@serializable trait Actor extends OutputChannel[Any] { private var received: Option[Any] = None @@ -402,7 +403,6 @@ trait Actor extends OutputChannel[Any] { def react(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links - Scheduler.pendReaction this.synchronized { tick() val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) @@ -431,7 +431,6 @@ trait Actor extends OutputChannel[Any] { def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { assert(Actor.self == this, "react on channel belonging to other actor") if (shouldExit) exit() // links - Scheduler.pendReaction this.synchronized { tick() // first, remove spurious TIMEOUT message from mailbox if any @@ -652,7 +651,13 @@ trait Actor extends OutputChannel[Any] { /** * Starts this actor. */ - def start(): Actor = { + def start(): Actor = synchronized { + // reset various flags + trapExit = false + exitReason = 'normal + exiting = false + shouldExit = false + Scheduler start new Reaction(this) this } @@ -669,7 +674,7 @@ trait Actor extends OutputChannel[Any] { case 'kill => Actor.self.kill() }, 'kill) - throw new ExitActorException + throw new SuspendActorException } private[actors] var links: List[Actor] = Nil diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala index cf1cdfb822..a724d8987c 100644 --- a/src/actors/scala/actors/FJTaskScheduler2.scala +++ b/src/actors/scala/actors/FJTaskScheduler2.scala @@ -1,3 +1,12 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id$ package scala.actors @@ -11,10 +20,12 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has /** * FJTaskScheduler2 * - * @version 0.9.8 + * @version 0.9.9 * @author Philipp Haller */ class FJTaskScheduler2 extends Thread with IScheduler { + // as long as this thread runs, JVM should not exit + setDaemon(false) val printStats = false //val printStats = true @@ -130,9 +141,7 @@ class FJTaskScheduler2 extends Thread with IScheduler { } def start(task: Runnable) { - this.synchronized { - pendingReactions = pendingReactions + 1 - } + pendReaction executor.execute(task) } diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala index e3d9ae4b2e..9f46e9c0ce 100644 --- a/src/actors/scala/actors/Reaction.scala +++ b/src/actors/scala/actors/Reaction.scala @@ -42,36 +42,38 @@ private[actors] class ExitActorException extends Throwable def run() { val saved = Actor.tl.get.asInstanceOf[Actor] Actor.tl.set(a) - Scheduler.unPendReaction a.isDetached = false try { - try { - if (a.shouldExit) // links - a.exit() - else { - if (f == null) - a.act() - else - f(msg) - a.kill(); a.exit() - } - } catch { - case _: ExitActorException => + if (a.shouldExit) // links + a.exit() + else { + if (f == null) + a.act() + else + f(msg) + a.kill(); a.exit() } } catch { + case eae: ExitActorException => { + Scheduler.unPendReaction + } case _: SuspendActorException => { // do nothing (continuation is already saved) } case t: Throwable => { + Debug.info(a+": caught "+t) + t.printStackTrace() + Scheduler.unPendReaction // links a.synchronized { if (!a.links.isEmpty) a.exitLinked(t) } } + } finally { + Actor.tl.set(saved) } - Actor.tl.set(saved) } } diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala index 8a88ac0706..64478262ea 100644 --- a/src/actors/scala/actors/Scheduler.scala +++ b/src/actors/scala/actors/Scheduler.scala @@ -36,6 +36,7 @@ object Scheduler { def impl = sched def impl_= (scheduler: IScheduler) = { sched = scheduler + sched.start() } var tasks: LinkedQueue = null @@ -125,20 +126,35 @@ trait IScheduler { * This scheduler executes the tasks of an actor on a single * thread (the current thread). * - * @version 0.9.8 + * @version 0.9.9 * @author Philipp Haller */ class SingleThreadedScheduler extends IScheduler { def start() {} + val taskQ = new scala.collection.mutable.Queue[Runnable] + def start(task: Runnable) { // execute task immediately on same thread task.run() + while (taskQ.length > 0) { + val nextTask = taskQ.dequeue + nextTask.run() + } } def execute(task: Runnable) { - // execute task immediately on same thread - task.run() + if (Actor.tl.get.isInstanceOf[ActorProxy]) { + // execute task immediately on same thread + task.run() + while (taskQ.length > 0) { + val nextTask = taskQ.dequeue + nextTask.run() + } + } else { + // queue task for later execution + taskQ += task + } } def getTask(worker: WorkerThread): Runnable = null |