summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2008-12-17 16:05:17 +0000
committerPhilipp Haller <hallerp@gmail.com>2008-12-17 16:05:17 +0000
commitba2043b7edcebe59ddf8db8da15114d3b9476521 (patch)
tree137eac832a9f766b0f3d94d989c0cbdd1c4610d3 /src/actors
parent825c2e5522c8ccf5fa4823903637ee1228615ec8 (diff)
downloadscala-ba2043b7edcebe59ddf8db8da15114d3b9476521.tar.gz
scala-ba2043b7edcebe59ddf8db8da15114d3b9476521.tar.bz2
scala-ba2043b7edcebe59ddf8db8da15114d3b9476521.zip
Revert Thread.getState optimization.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala8
-rw-r--r--src/actors/scala/actors/FJTaskScheduler2.scala27
2 files changed, 24 insertions, 11 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index ad0927195e..af94a8a4e7 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -389,6 +389,7 @@ trait Actor extends AbstractActor {
* @param replyTo the reply destination
*/
def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized {
+ tick()
if (waitingFor(msg)) {
received = Some(msg)
@@ -423,6 +424,7 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
+ tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@@ -452,6 +454,7 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "receive from channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
+ tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@@ -503,6 +506,7 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
+ tick()
val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m))
if (null eq qel) {
waitingFor = f.isDefinedAt
@@ -530,6 +534,7 @@ trait Actor extends AbstractActor {
assert(Actor.self == this, "react on channel belonging to other actor")
if (shouldExit) exit() // links
this.synchronized {
+ tick()
// first, remove spurious TIMEOUT message from mailbox if any
val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT)
@@ -718,6 +723,9 @@ trait Actor extends AbstractActor {
scheduler execute task
}
+ private def tick(): Unit =
+ scheduler tick this
+
private[actors] var kill: () => Unit = () => {}
private def suspendActor() {
diff --git a/src/actors/scala/actors/FJTaskScheduler2.scala b/src/actors/scala/actors/FJTaskScheduler2.scala
index d983b7a00f..24a2f119d5 100644
--- a/src/actors/scala/actors/FJTaskScheduler2.scala
+++ b/src/actors/scala/actors/FJTaskScheduler2.scala
@@ -13,7 +13,6 @@ package scala.actors
import compat.Platform
import java.lang.{Runnable, Thread, InterruptedException, System, Runtime}
-import java.lang.Thread.State
import scala.collection.Set
import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet}
@@ -21,7 +20,7 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
/**
* FJTaskScheduler2
*
- * @version 0.9.20
+ * @version 0.9.18
* @author Philipp Haller
*/
class FJTaskScheduler2 extends Thread with IScheduler {
@@ -69,10 +68,13 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var terminating = false
private var suspending = false
+ private var lastActivity = Platform.currentTime
+
private var submittedTasks = 0
def printActorDump {}
+ private val TICK_FREQ = 50
private val CHECK_FREQ = 100
def onLockup(handler: () => Unit) =
@@ -85,12 +87,6 @@ class FJTaskScheduler2 extends Thread with IScheduler {
private var lockupHandler: () => Unit = null
- private def allWorkersBlocked: Boolean =
- executor.threads.forall(t => {
- val s = t.getState()
- s == State.BLOCKED || s == State.WAITING || s == State.TIMED_WAITING
- })
-
override def run() {
try {
while (!terminating) {
@@ -107,11 +103,12 @@ class FJTaskScheduler2 extends Thread with IScheduler {
ActorGC.gc()
// check if we need more threads
- if (coreSize < maxSize
- && allWorkersBlocked
+ if (Platform.currentTime - lastActivity >= TICK_FREQ
+ && coreSize < maxSize
&& executor.checkPoolSize()) {
//Debug.info(this+": increasing thread pool size")
coreSize += 1
+ lastActivity = Platform.currentTime
}
else {
if (ActorGC.allTerminated) {
@@ -152,10 +149,18 @@ class FJTaskScheduler2 extends Thread with IScheduler {
def run() { fun }
})
+ private var tickCnt = 0
+
/**
* @param a the actor
*/
- def tick(a: Actor) {}
+ def tick(a: Actor) = synchronized {
+ if (tickCnt == 100) {
+ tickCnt = 0
+ lastActivity = Platform.currentTime
+ } else
+ tickCnt += 1
+ }
/** Shuts down all idle worker threads.
*/