summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-09-10 12:11:29 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-09-10 12:11:29 +0000
commitee5d963fc27f2d8168d67d9d07257ffaa528a10f (patch)
tree6c84ab118b98c9aa6c4b94378b4146d3e261863e
parentbe7c4a11a03f1719bd2a33c45cab56b1dc433626 (diff)
downloadscala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.tar.gz
scala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.tar.bz2
scala-ee5d963fc27f2d8168d67d9d07257ffaa528a10f.zip
Ported bug fixes from trunk.
-rw-r--r--src/actors/scala/actors/Actor.scala13
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala17
-rw-r--r--src/actors/scala/actors/Reaction.scala30
-rw-r--r--src/actors/scala/actors/Scheduler.scala22
4 files changed, 57 insertions, 25 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 48d491cc35..cba7a8c4c0 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -266,6 +266,7 @@ object Actor {
* @version 0.9.9
* @author Philipp Haller
*/
+@serializable
trait Actor extends OutputChannel[Any] {
private var received: Option[Any] = None
@@ -402,7 +403,6 @@ trait Actor extends OutputChannel[Any] {
def react(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
- Scheduler.pendReaction
this.synchronized {
tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
@@ -431,7 +431,6 @@ trait Actor extends OutputChannel[Any] {
def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
- Scheduler.pendReaction
this.synchronized {
tick()
// first, remove spurious TIMEOUT message from mailbox if any
@@ -652,7 +651,13 @@ trait Actor extends OutputChannel[Any] {
/**
* Starts this actor.
*/
- def start(): Actor = {
+ def start(): Actor = synchronized {
+ // reset various flags
+ trapExit = false
+ exitReason = 'normal
+ exiting = false
+ shouldExit = false
+
Scheduler start new Reaction(this)
this
}
@@ -669,7 +674,7 @@ trait Actor extends OutputChannel[Any] {
case 'kill => Actor.self.kill()
}, 'kill)
- throw new ExitActorException
+ throw new SuspendActorException
}
private[actors] var links: List[Actor] = Nil
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index cf1cdfb822..a724d8987c 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -1,3 +1,12 @@
+/* __ *\
+** ________ ___ / / ___ Scala API **
+** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL **
+** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ **
+** /____/\___/_/ |_/____/_/ | | **
+** |/ **
+\* */
+
+// $Id$
package scala.actors
@@ -11,10 +20,12 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
- * @version 0.9.8
+ * @version 0.9.9
* @author Philipp Haller
*/
class FJTaskScheduler2 extends Thread with IScheduler {
+ // as long as this thread runs, JVM should not exit
+ setDaemon(false)
val printStats = false
//val printStats = true
@@ -130,9 +141,7 @@ class FJTaskScheduler2 extends Thread with IScheduler {
}
def start(task: Runnable) {
- this.synchronized {
- pendingReactions = pendingReactions + 1
- }
+ pendReaction
executor.execute(task)
}
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index e3d9ae4b2e..9f46e9c0ce 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -42,36 +42,38 @@ private[actors] class ExitActorException extends Throwable
def run() {
val saved = Actor.tl.get.asInstanceOf[Actor]
Actor.tl.set(a)
- Scheduler.unPendReaction
a.isDetached = false
try {
- try {
- if (a.shouldExit) // links
- a.exit()
- else {
- if (f == null)
- a.act()
- else
- f(msg)
- a.kill(); a.exit()
- }
- } catch {
- case _: ExitActorException =>
+ if (a.shouldExit) // links
+ a.exit()
+ else {
+ if (f == null)
+ a.act()
+ else
+ f(msg)
+ a.kill(); a.exit()
}
}
catch {
+ case eae: ExitActorException => {
+ Scheduler.unPendReaction
+ }
case _: SuspendActorException => {
// do nothing (continuation is already saved)
}
case t: Throwable => {
+ Debug.info(a+": caught "+t)
+ t.printStackTrace()
+ Scheduler.unPendReaction
// links
a.synchronized {
if (!a.links.isEmpty)
a.exitLinked(t)
}
}
+ } finally {
+ Actor.tl.set(saved)
}
- Actor.tl.set(saved)
}
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index 8a88ac0706..64478262ea 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -36,6 +36,7 @@ object Scheduler {
def impl = sched
def impl_= (scheduler: IScheduler) = {
sched = scheduler
+ sched.start()
}
var tasks: LinkedQueue = null
@@ -125,20 +126,35 @@ trait IScheduler {
* This scheduler executes the tasks of an actor on a single
* thread (the current thread).
*
- * @version 0.9.8
+ * @version 0.9.9
* @author Philipp Haller
*/
class SingleThreadedScheduler extends IScheduler {
def start() {}
+ val taskQ = new scala.collection.mutable.Queue[Runnable]
+
def start(task: Runnable) {
// execute task immediately on same thread
task.run()
+ while (taskQ.length > 0) {
+ val nextTask = taskQ.dequeue
+ nextTask.run()
+ }
}
def execute(task: Runnable) {
- // execute task immediately on same thread
- task.run()
+ if (Actor.tl.get.isInstanceOf[ActorProxy]) {
+ // execute task immediately on same thread
+ task.run()
+ while (taskQ.length > 0) {
+ val nextTask = taskQ.dequeue
+ nextTask.run()
+ }
+ } else {
+ // queue task for later execution
+ taskQ += task
+ }
}
def getTask(worker: WorkerThread): Runnable = null