summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-11-27 10:56:51 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-11-27 10:56:51 +0000
commit59a0cce0c00c3cde6c3bb6753c47a6ae9266891f (patch)
tree91f9d326510ab30e917064d8d96b3070cdd3ad78 /src/actors
parent090482dae2e111084a1ac2d2d58113388dbbbffe (diff)
downloadscala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.gz
scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.bz2
scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.zip
Fixed issue with task scheduler creating too ma...
Fixed issue with task scheduler creating too many threads. Improved actor termination code. Added size query to MessageQueue.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala3
-rw-r--r--src/actors/scala/actors/MessageQueue.scala18
-rw-r--r--src/actors/scala/actors/Scheduler.scala13
3 files changed, 28 insertions, 6 deletions
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 0600f4d7b2..8b99b83f7c 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -114,7 +114,8 @@ class FJTaskScheduler2 extends Thread with IScheduler {
if (Platform.currentTime - lastActivity >= TICK_FREQ
&& coreSize < maxSize
&& executor.checkPoolSize()) {
- // do nothing
+ coreSize += 1
+ lastActivity = Platform.currentTime
}
else {
if (pendingReactions <= 0) {
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 4beacfe70b..8f2f6c72cc 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -41,7 +41,16 @@ class MessageQueue {
def isEmpty = null eq last
+ private var _size = 0
+ def size = _size
+
+ protected def changeSize(diff: Int) = {
+ _size += diff
+ }
+
def append(msg: Any, session: OutputChannel[Any]) = {
+ changeSize(1) // size always increases by 1
+
if (null eq last) { // list empty
val el = new MessageQueueElement
el.msg = msg
@@ -59,7 +68,9 @@ class MessageQueue {
}
def extractFirst(p: Any => Boolean): MessageQueueElement = {
- if (null eq last) null
+ changeSize(-1) // assume size decreases by 1
+
+ val msg = if (null eq last) null
else {
// test first element
if (p(first.msg)) {
@@ -95,5 +106,10 @@ class MessageQueue {
null
}
}
+
+ if (null eq msg)
+ changeSize(1) // correct wrong assumption
+
+ msg
}
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index ddf6df164b..cb10795899 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -85,11 +85,15 @@ object Scheduler {
termHandlers += (a -> (() => f))
}
- def unPendReaction(a: Actor) {
+ def unPendReaction(a: Actor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
- case Some(handler) => handler()
- case None => // do nothing
+ case Some(handler) =>
+ handler()
+ // remove mapping
+ termHandlers -= a
+ case None =>
+ // do nothing
}
// notify scheduler
@@ -159,7 +163,8 @@ class SingleThreadedScheduler extends IScheduler {
}
def execute(task: Runnable) {
- if (Actor.tl.get.isInstanceOf[ActorProxy]) {
+ val a = Actor.tl.get.asInstanceOf[Actor]
+ if ((null ne a) && a.isInstanceOf[ActorProxy]) {
// execute task immediately on same thread
task.run()
while (taskQ.length > 0) {