summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2009-06-23 15:49:40 +0000
committerPhilipp Haller <hallerp@gmail.com>2009-06-23 15:49:40 +0000
commitd09ec90432258a34e4afcd803c4e1fc10338d695 (patch)
tree0b1cd417dc553e83517d986572d587e7bcc4f087 /src/actors
parent3f9bbdbc7827070c09ab87331623740fa226221b (diff)
downloadscala-d09ec90432258a34e4afcd803c4e1fc10338d695.tar.gz
scala-d09ec90432258a34e4afcd803c4e1fc10338d695.tar.bz2
scala-d09ec90432258a34e4afcd803c4e1fc10338d695.zip
Thread-based receive uses ManagerBlocker interf...
Thread-based receive uses ManagerBlocker interface of new ForkJoinPool.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala22
-rw-r--r--src/actors/scala/actors/DelegatingScheduler.scala5
-rw-r--r--src/actors/scala/actors/ForkJoinScheduler.scala22
-rw-r--r--src/actors/scala/actors/IScheduler.scala6
4 files changed, 32 insertions, 23 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index 91fa746c64..1420a1af4d 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -14,6 +14,8 @@ import scala.compat.Platform
import java.util.{Timer, TimerTask}
import java.util.concurrent.ExecutionException
+import forkjoin.ForkJoinPool
+
/**
* The <code>Actor</code> object provides functions for the definition of
* actors, as well as actor operations, such as
@@ -449,7 +451,8 @@ trait Actor extends Reactor with AbstractActor {
// keep going
} else {
waitingFor = f.isDefinedAt
- suspendActor()
+ isSuspended = true
+ scheduler.managedBlock(new ActorBlocker(0))
done = true
}
}
@@ -509,7 +512,8 @@ trait Actor extends Reactor with AbstractActor {
} else {
waitingFor = f.isDefinedAt
received = None
- suspendActorFor(msec)
+ isSuspended = true
+ scheduler.managedBlock(new ActorBlocker(msec))
done = true
if (received.isEmpty) {
// actor is not resumed because of new message
@@ -839,8 +843,19 @@ trait Actor extends Reactor with AbstractActor {
scheduler execute task
}
+ class ActorBlocker(timeout: Long) extends ForkJoinPool.ManagedBlocker {
+ def block() = {
+ if (timeout > 0)
+ Actor.this.suspendActorFor(timeout)
+ else
+ Actor.this.suspendActor()
+ true
+ }
+ def isReleasable() =
+ !Actor.this.isSuspended
+ }
+
private def suspendActor() {
- isSuspended = true
while (isSuspended) {
try {
wait()
@@ -856,7 +871,6 @@ trait Actor extends Reactor with AbstractActor {
val ts = Platform.currentTime
var waittime = msec
var fromExc = false
- isSuspended = true
while (isSuspended) {
try {
fromExc = false
diff --git a/src/actors/scala/actors/DelegatingScheduler.scala b/src/actors/scala/actors/DelegatingScheduler.scala
index 39328c8077..78c4020bb0 100644
--- a/src/actors/scala/actors/DelegatingScheduler.scala
+++ b/src/actors/scala/actors/DelegatingScheduler.scala
@@ -8,6 +8,8 @@
package scala.actors
+import forkjoin.ForkJoinPool
+
/**
* @author Erik Engbrecht
*/
@@ -48,4 +50,7 @@ trait DelegatingScheduler extends IScheduler {
def terminated(actor: Reactor) = impl.terminated(actor)
def onTerminate(actor: Reactor)(f: => Unit) = impl.onTerminate(actor)(f)
+
+ override def managedBlock(blocker: ForkJoinPool.ManagedBlocker): Unit =
+ impl.managedBlock(blocker)
}
diff --git a/src/actors/scala/actors/ForkJoinScheduler.scala b/src/actors/scala/actors/ForkJoinScheduler.scala
index 850706eb72..59cc14fbb4 100644
--- a/src/actors/scala/actors/ForkJoinScheduler.scala
+++ b/src/actors/scala/actors/ForkJoinScheduler.scala
@@ -16,15 +16,9 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
private val CHECK_FREQ = 50
- private def allWorkersWaiting: Boolean =
- pool.workers.forall(t => {
- if (t == null)
- true
- else {
- val s = t.getState()
- s == State.WAITING || s == State.TIMED_WAITING
- }
- })
+ override def managedBlock(blocker: ForkJoinPool.ManagedBlocker) {
+ ForkJoinPool.managedBlock(blocker, true)
+ }
override def run() {
try {
@@ -42,16 +36,6 @@ class ForkJoinScheduler extends Thread with IScheduler with TerminationMonitor {
//Debug.info(this+": all actors terminated")
throw new QuitException
}
-
- if (!pool.isQuiescent && allWorkersWaiting) {
- //Debug.info(this+": all workers blocked")
- val par = pool.getParallelism()
- //Debug.info(this+": parallelism "+par)
- //Debug.info(this+": max pool size "+pool.getMaximumPoolSize())
- if (par < pool.getMaximumPoolSize()) {
- pool.setParallelism(par + 1)
- }
- }
}
}
} catch {
diff --git a/src/actors/scala/actors/IScheduler.scala b/src/actors/scala/actors/IScheduler.scala
index 49b42d39d6..811fd47ce3 100644
--- a/src/actors/scala/actors/IScheduler.scala
+++ b/src/actors/scala/actors/IScheduler.scala
@@ -10,6 +10,8 @@
package scala.actors
+import forkjoin.ForkJoinPool
+
/**
* The <code>IScheduler</code> trait provides a common interface
* for all schedulers used to execute actor tasks.
@@ -63,4 +65,8 @@ trait IScheduler {
* @param f the closure to be registered
*/
def onTerminate(a: Reactor)(f: => Unit): Unit
+
+ def managedBlock(blocker: ForkJoinPool.ManagedBlocker) {
+ blocker.block()
+ }
}