summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-11-27 10:56:51 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-11-27 10:56:51 +0000
commit59a0cce0c00c3cde6c3bb6753c47a6ae9266891f (patch)
tree91f9d326510ab30e917064d8d96b3070cdd3ad78
parent090482dae2e111084a1ac2d2d58113388dbbbffe (diff)
downloadscala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.gz
scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.tar.bz2
scala-59a0cce0c00c3cde6c3bb6753c47a6ae9266891f.zip
Fixed issue with task scheduler creating too ma...
Fixed issue with task scheduler creating too many threads. Improved actor termination code. Added size query to MessageQueue.
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala3
-rw-r--r--src/actors/scala/actors/MessageQueue.scala18
-rw-r--r--src/actors/scala/actors/Scheduler.scala13
3 files changed, 28 insertions, 6 deletions
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index 0600f4d7b2..8b99b83f7c 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -114,7 +114,8 @@ class FJTaskScheduler2 extends Thread with IScheduler {
if (Platform.currentTime - lastActivity >= TICK_FREQ
&& coreSize < maxSize
&& executor.checkPoolSize()) {
- // do nothing
+ coreSize += 1
+ lastActivity = Platform.currentTime
}
else {
if (pendingReactions <= 0) {
diff --git a/src/actors/scala/actors/MessageQueue.scala b/src/actors/scala/actors/MessageQueue.scala
index 4beacfe70b..8f2f6c72cc 100644
--- a/src/actors/scala/actors/MessageQueue.scala
+++ b/src/actors/scala/actors/MessageQueue.scala
@@ -41,7 +41,16 @@ class MessageQueue {
def isEmpty = null eq last
+ private var _size = 0
+ def size = _size
+
+ protected def changeSize(diff: Int) = {
+ _size += diff
+ }
+
def append(msg: Any, session: OutputChannel[Any]) = {
+ changeSize(1) // size always increases by 1
+
if (null eq last) { // list empty
val el = new MessageQueueElement
el.msg = msg
@@ -59,7 +68,9 @@ class MessageQueue {
}
def extractFirst(p: Any => Boolean): MessageQueueElement = {
- if (null eq last) null
+ changeSize(-1) // assume size decreases by 1
+
+ val msg = if (null eq last) null
else {
// test first element
if (p(first.msg)) {
@@ -95,5 +106,10 @@ class MessageQueue {
null
}
}
+
+ if (null eq msg)
+ changeSize(1) // correct wrong assumption
+
+ msg
}
}
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index ddf6df164b..cb10795899 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -85,11 +85,15 @@ object Scheduler {
termHandlers += (a -> (() => f))
}
- def unPendReaction(a: Actor) {
+ def unPendReaction(a: Actor) = synchronized {
// execute registered termination handler (if any)
termHandlers.get(a) match {
- case Some(handler) => handler()
- case None => // do nothing
+ case Some(handler) =>
+ handler()
+ // remove mapping
+ termHandlers -= a
+ case None =>
+ // do nothing
}
// notify scheduler
@@ -159,7 +163,8 @@ class SingleThreadedScheduler extends IScheduler {
}
def execute(task: Runnable) {
- if (Actor.tl.get.isInstanceOf[ActorProxy]) {
+ val a = Actor.tl.get.asInstanceOf[Actor]
+ if ((null ne a) && a.isInstanceOf[ActorProxy]) {
// execute task immediately on same thread
task.run()
while (taskQ.length > 0) {