summaryrefslogtreecommitdiff
path: root/src/actors
diff options
context:
space:
mode:
authorPhilipp Haller <hallerp@gmail.com>2007-01-17 17:46:40 +0000
committerPhilipp Haller <hallerp@gmail.com>2007-01-17 17:46:40 +0000
commit2bd3ff37dfe7bfe454460459377879722f6b20a6 (patch)
treef54a419f0779d063b8b248c1b6b0813aeeb3d428 /src/actors
parent6b0dcb20529c5ae07b7a2d1b20605996408cf046 (diff)
downloadscala-2bd3ff37dfe7bfe454460459377879722f6b20a6.tar.gz
scala-2bd3ff37dfe7bfe454460459377879722f6b20a6.tar.bz2
scala-2bd3ff37dfe7bfe454460459377879722f6b20a6.zip
Replaced some function variables by methods.
Diffstat (limited to 'src/actors')
-rw-r--r--src/actors/scala/actors/Actor.scala140
-rw-r--r--src/actors/scala/actors/Channel.scala3
-rw-r--r--src/actors/scala/actors/JDK5Scheduler.scala16
-rw-r--r--src/actors/scala/actors/Reaction.scala9
-rw-r--r--src/actors/scala/actors/Scheduler.scala16
-rw-r--r--src/actors/scala/actors/TickedScheduler.scala5
6 files changed, 87 insertions, 102 deletions
diff --git a/src/actors/scala/actors/Actor.scala b/src/actors/scala/actors/Actor.scala
index c2c9dd738f..3fce66b1a8 100644
--- a/src/actors/scala/actors/Actor.scala
+++ b/src/actors/scala/actors/Actor.scala
@@ -26,7 +26,9 @@ import java.util.Stack
*/
object Actor {
- private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)
+ //private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)
+
+ private[actors] val tl = new ThreadLocal
/**
* Returns the currently executing actor. Should be used instead
@@ -36,13 +38,20 @@ object Actor {
* @return returns the currently executing actor.
*/
def self: Actor = synchronized {
- val t = currentThread
+ var a = tl.get.asInstanceOf[Actor]
+ if (null eq a) {
+ a = new ActorProxy(currentThread)
+ //Debug.info("created "+a+" for "+currentThread)
+ tl.set(a)
+ }
+ a
+ /*val t = currentThread
var a = selfs.get(t).asInstanceOf[Actor]
if (a eq null) {
a = new ActorProxy(t)
selfs.put(t, a)
}
- a
+ a*/
}
def actor(body: => Unit): Actor = synchronized {
@@ -155,49 +164,13 @@ object Actor {
def reply(): Unit = self.reply(())
private[actors] trait Body[a] {
- def orElse[b >: a](other: => b): b
def andThen[b](other: => b): Nothing
}
implicit def mkBody[a](body: => a) = new Body[a] {
- def orElse[b >: a](other: => b): b = choose(body, other)
def andThen[b](other: => b): Nothing = seq(body, other)
}
- private[actors] def choose[a, b >: a](alt1: => a, alt2: => b): b = {
- val s = self
- // save former custom suspendActor function
- // (e.g. from further orElse)
- val suspendNext = s.suspendActor
- val detachNext = s.detachActor
-
- // have to get out of the point of suspend in alt1's
- // receive
- s.suspendActor = () => {
- s.isSuspended = false
- s.waitingFor = s.waitingForNone
- throw new SuspendActorException
- }
- s.detachActor = f => {
- s.waitingFor = s.waitingForNone
- Scheduler.unPendReaction
- throw new SuspendActorException
- }
-
- try {
- val res = alt1
- s.suspendActor = suspendNext
- s.detachActor = detachNext
- res
- }
- catch {
- case d: SuspendActorException =>
- s.suspendActor = suspendNext
- s.detachActor = detachNext
- alt2
- }
- }
-
/**
* Causes <code>self</code> to repeatedly execute
* <code>body</code>.
@@ -498,69 +471,56 @@ trait Actor extends OutputChannel[Any] {
private[actors] def tick(): Unit =
Scheduler tick this
- private[actors] def defaultDetachActor: PartialFunction[Any, Unit] => Unit =
- (f: PartialFunction[Any, Unit]) => {
- continuation = f
- isDetached = true
- throw new SuspendActorException
- }
+ private[actors] def detachActor(f: PartialFunction[Any, Unit]) {
+ continuation = f
+ isDetached = true
+ throw new SuspendActorException
+ }
- private[actors] var suspendActor: () => Unit = _
- private[actors] var suspendActorFor: long => Unit = _
- private[actors] var resumeActor: () => Unit = _
- private[actors] var detachActor: PartialFunction[Any, Unit] => Unit = _
- private[actors] var kill: () => Unit = _
+ private[actors] var kill = () => {}
private class ExitSuspendLoop extends Throwable
- private[actors] def resetActor(): Unit = {
- suspendActor = () => {
- isWaiting = true
- while(isWaiting) {
- try {
- wait()
- } catch {
- case _: InterruptedException =>
- }
+ def suspendActor() {
+ isWaiting = true
+ while(isWaiting) {
+ try {
+ wait()
+ } catch {
+ case _: InterruptedException =>
}
}
+ }
- suspendActorFor = (msec: long) => {
- val ts = Platform.currentTime
- var waittime = msec
- var fromExc = false
- isWaiting = true
+ def suspendActorFor(msec: long) {
+ val ts = Platform.currentTime
+ var waittime = msec
+ var fromExc = false
+ isWaiting = true
- try {
- while(isWaiting) {
- try {
- fromExc = false
- wait(waittime)
- } catch {
- case _: InterruptedException => {
- fromExc = true
- val now = Platform.currentTime
- val waited = now-ts
- waittime = msec-waited
- if (waittime < 0) { isWaiting = false }
- }
+ try {
+ while(isWaiting) {
+ try {
+ fromExc = false
+ wait(waittime)
+ } catch {
+ case _: InterruptedException => {
+ fromExc = true
+ val now = Platform.currentTime
+ val waited = now-ts
+ waittime = msec-waited
+ if (waittime < 0) { isWaiting = false }
}
- if (!fromExc) throw new ExitSuspendLoop
}
- } catch { case _: ExitSuspendLoop => }
- }
-
- resumeActor = () => {
- isWaiting = false
- notify()
- }
-
- detachActor = defaultDetachActor
-
- kill = () => {}
+ if (!fromExc) throw new ExitSuspendLoop
+ }
+ } catch { case _: ExitSuspendLoop => }
}
- resetActor()
+ def resumeActor() {
+ isWaiting = false
+ notify()
+ }
/**
* Starts this actor.
diff --git a/src/actors/scala/actors/Channel.scala b/src/actors/scala/actors/Channel.scala
index 3c6bf822fb..f17c95f53f 100644
--- a/src/actors/scala/actors/Channel.scala
+++ b/src/actors/scala/actors/Channel.scala
@@ -38,7 +38,8 @@ class Channel[Msg] extends InputChannel[Msg] with OutputChannel[Msg] {
private[actors] var receiver: Actor = synchronized {
// basically Actor.self, but can be null
- Actor.selfs.get(currentThread).asInstanceOf[Actor]
+ //Actor.selfs.get(currentThread).asInstanceOf[Actor]
+ Actor.tl.get.asInstanceOf[Actor]
}
/**
diff --git a/src/actors/scala/actors/JDK5Scheduler.scala b/src/actors/scala/actors/JDK5Scheduler.scala
index c71a504d50..825e5ddfd7 100644
--- a/src/actors/scala/actors/JDK5Scheduler.scala
+++ b/src/actors/scala/actors/JDK5Scheduler.scala
@@ -10,7 +10,16 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
import java.util.concurrent.{ThreadPoolExecutor,
LinkedBlockingQueue,
- TimeUnit}
+ TimeUnit,
+ RejectedExecutionHandler}
+
+class TaskRejectedHandler(sched: JDK5Scheduler) extends RejectedExecutionHandler {
+ def rejectedExecution(r: Runnable, executor: ThreadPoolExecutor) {
+ sched.pendReaction
+ r.run()
+ sched.unPendReaction
+ }
+}
/**
*
@@ -19,7 +28,7 @@ import java.util.concurrent.{ThreadPoolExecutor,
*/
class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with IScheduler {
- //Debug.info("using JDK5Scheduler")
+ Debug.info("using JDK5Scheduler("+initCoreSize+", "+maxSize+")")
/* Note:
* When using an unbounded queue such as a
@@ -35,7 +44,8 @@ class JDK5Scheduler(initCoreSize: int, maxSize: int) extends Thread with ISchedu
initCoreSize,
5000,
TimeUnit.NANOSECONDS,
- new LinkedBlockingQueue)
+ new LinkedBlockingQueue,
+ new TaskRejectedHandler(this))
private var coreSize = initCoreSize
diff --git a/src/actors/scala/actors/Reaction.scala b/src/actors/scala/actors/Reaction.scala
index f3510d7303..9bb6c849f5 100644
--- a/src/actors/scala/actors/Reaction.scala
+++ b/src/actors/scala/actors/Reaction.scala
@@ -31,9 +31,10 @@ private[actors] class Reaction(a: Actor,
def actor = a
def run(): Unit = {
- val t = currentThread
+ /*val t = currentThread
val saved = Actor.selfs.get(t).asInstanceOf[Actor]
- Actor.selfs.put(t, a)
+ Actor.selfs.put(t, a)*/
+ Actor.tl.set(a)
Scheduler.unPendReaction
a.isDetached = false
try {
@@ -59,9 +60,9 @@ private[actors] class Reaction(a: Actor,
a.exitLinked()
}
}
- finally {
+ /*finally {
Actor.selfs.put(t, saved)
- }
+ }*/
}
private var runnable = false
diff --git a/src/actors/scala/actors/Scheduler.scala b/src/actors/scala/actors/Scheduler.scala
index eda32bcd23..f4fed33d94 100644
--- a/src/actors/scala/actors/Scheduler.scala
+++ b/src/actors/scala/actors/Scheduler.scala
@@ -30,7 +30,21 @@ import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, Has
object Scheduler {
private var sched: IScheduler =
{
- val s = new JDK5Scheduler(4, 32)
+ var s: Thread with IScheduler = null
+
+ // Check for JDK version >= 1.5
+ var olderThanJDK5 = false
+ try {
+ java.lang.Class.forName("java.util.concurrent.ThreadPoolExecutor")
+ } catch {
+ case _: ClassNotFoundException =>
+ olderThanJDK5 = true
+ }
+
+ s = if (olderThanJDK5)
+ new TickedScheduler
+ else
+ new JDK5Scheduler(4, 32)
s.start()
s
}
diff --git a/src/actors/scala/actors/TickedScheduler.scala b/src/actors/scala/actors/TickedScheduler.scala
index 2806a420f6..4effcb2d75 100644
--- a/src/actors/scala/actors/TickedScheduler.scala
+++ b/src/actors/scala/actors/TickedScheduler.scala
@@ -74,8 +74,8 @@ class TickedScheduler extends Thread with IScheduler {
if (terminating) throw new QuitException
}
- Debug.info("tasks.length: "+tasks.length)
- Debug.info("pendingReactions: "+pendingReactions)
+ //Debug.info("tasks.length: "+tasks.length)
+ //Debug.info("pendingReactions: "+pendingReactions)
if (tasks.length > 0) {
// check if we need more threads
@@ -120,7 +120,6 @@ class TickedScheduler extends Thread with IScheduler {
* @param item the task to be executed.
*/
def execute(item: Reaction): unit = synchronized {
- Debug.info("got new task")
if (!terminating) {
if (idle.length > 0) {
val wt = idle.dequeue