diff options
author | Philipp Haller <hallerp@gmail.com> | 2008-12-16 17:11:20 +0000 |
---|---|---|
committer | Philipp Haller <hallerp@gmail.com> | 2008-12-16 17:11:20 +0000 |
commit | c4a2e6b3af5f9145af0c29a8d14e5178059a7abe (patch) | |
tree | d263894994df3a9292affbae96fb7766c9b27a2c | |
parent | 6d1b4e24a91b085c928f6f9d9e030aa5168b536c (diff) | |
download | scala-c4a2e6b3af5f9145af0c29a8d14e5178059a7abe.tar.gz scala-c4a2e6b3af5f9145af0c29a8d14e5178059a7abe.tar.bz2 scala-c4a2e6b3af5f9145af0c29a8d14e5178059a7abe.zip |
Add JVM 1.4 versions of actor classes.
-rw-r--r-- | src/jvm14-actors/scala/actors/Actor.scala | 976 | ||||
-rw-r--r-- | src/jvm14-actors/scala/actors/FJTaskScheduler2.scala | 172 |
2 files changed, 1148 insertions, 0 deletions
diff --git a/src/jvm14-actors/scala/actors/Actor.scala b/src/jvm14-actors/scala/actors/Actor.scala new file mode 100644 index 0000000000..0f98d57552 --- /dev/null +++ b/src/jvm14-actors/scala/actors/Actor.scala @@ -0,0 +1,976 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: Actor.scala 16759 2008-12-15 10:19:01Z phaller $ + +package scala.actors + +import scala.collection.mutable.{HashSet, Queue} +import scala.compat.Platform + +import java.util.{Timer, TimerTask} + +/** + * The <code>Actor</code> object provides functions for the definition of + * actors, as well as actor operations, such as + * <code>receive</code>, <code>react</code>, <code>reply</code>, + * etc. + * + * @version 0.9.18 + * @author Philipp Haller + */ +object Actor { + + private[actors] val tl = new ThreadLocal[Actor] + + private[actors] var timer = new Timer + + /** + * Returns the currently executing actor. Should be used instead + * of <code>this</code> in all blocks of code executed by + * actors. + * + * @return returns the currently executing actor. + */ + def self: Actor = { + var a = tl.get.asInstanceOf[Actor] + if (null eq a) { + a = new ActorProxy(currentThread) + tl.set(a) + } + a + } + + /** + * Resets an actor proxy associated with the current thread. + * It replaces the implicit <code>ActorProxy</code> instance + * of the current thread (if any) with a new instance. + * + * This permits to re-use the current thread as an actor + * even if its <code>ActorProxy</code> has died for some reason. + */ + def resetProxy { + val a = tl.get.asInstanceOf[Actor] + if ((null ne a) && a.isInstanceOf[ActorProxy]) + tl.set(new ActorProxy(currentThread)) + } + + /** + * Removes any reference to an <code>Actor</code> instance + * currently stored in thread-local storage. + * + * This allows to release references from threads that are + * potentially long-running or being re-used (e.g. inside + * a thread pool). Permanent references in thread-local storage + * are a potential memory leak. + */ + def clearSelf { + tl.set(null) + } + + /** + * <p>This is a factory method for creating actors.</p> + * + * <p>The following example demonstrates its usage:</p> + * + * <pre> + * import scala.actors.Actor._ + * ... + * val a = actor { + * ... + * } + * </pre> + * + * @param body the code block to be executed by the newly created actor + * @return the newly created actor. Note that it is automatically started. + */ + def actor(body: => Unit): Actor = { + val actor = new Actor { + def act() = body + } + actor.start() + actor + } + + /** + * <p> + * This is a factory method for creating actors whose + * body is defined using a <code>Responder</code>. + * </p> + * + * <p>The following example demonstrates its usage:</p> + * + * <pre> + * import scala.actors.Actor._ + * import Responder.exec + * ... + * val a = reactor { + * for { + * res <- b !! MyRequest; + * if exec(println("result: "+res)) + * } yield {} + * } + * </pre> + * + * @param body the <code>Responder</code> to be executed by the newly created actor + * @return the newly created actor. Note that it is automatically started. + */ + def reactor(body: => Responder[Unit]): Actor = { + val a = new Actor { + def act() { + Responder.run(body) + } + } + a.start() + a + } + + /** + * Receives the next message from the mailbox of the current actor + * <code>self</code>. + */ + def ? : Any = self.? + + /** + * Receives a message from the mailbox of + * <code>self</code>. Blocks if no message matching any of the + * cases of <code>f</code> can be received. + * + * @param f a partial function specifying patterns and actions + * @return the result of processing the received message + */ + def receive[A](f: PartialFunction[Any, A]): A = + self.receive(f) + + /** + * Receives a message from the mailbox of + * <code>self</code>. Blocks at most <code>msec</code> + * milliseconds if no message matching any of the cases of + * <code>f</code> can be received. If no message could be + * received the <code>TIMEOUT</code> action is executed if + * specified. + * + * @param msec the time span before timeout + * @param f a partial function specifying patterns and actions + * @return the result of processing the received message + */ + def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = + self.receiveWithin(msec)(f) + + /** + * Lightweight variant of <code>receive</code>. + * + * Actions in <code>f</code> have to contain the rest of the + * computation of <code>self</code>, as this method will never + * return. + * + * @param f a partial function specifying patterns and actions + * @return this function never returns + */ + def react(f: PartialFunction[Any, Unit]): Nothing = + self.react(f) + + /** + * Lightweight variant of <code>receiveWithin</code>. + * + * Actions in <code>f</code> have to contain the rest of the + * computation of <code>self</code>, as this method will never + * return. + * + * @param msec the time span before timeout + * @param f a partial function specifying patterns and actions + * @return this function never returns + */ + def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = + self.reactWithin(msec)(f) + + def eventloop(f: PartialFunction[Any, Unit]): Nothing = + self.react(new RecursiveProxyHandler(self, f)) + + private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit]) + extends PartialFunction[Any, Unit] { + def isDefinedAt(m: Any): Boolean = + true // events are immediately removed from the mailbox + def apply(m: Any) { + if (f.isDefinedAt(m)) f(m) + self.react(this) + } + } + + /** + * Returns the actor which sent the last received message. + */ + def sender: OutputChannel[Any] = self.sender + + /** + * Send <code>msg</code> to the actor waiting in a call to + * <code>!?</code>. + */ + def reply(msg: Any): Unit = self.reply(msg) + + /** + * Send <code>()</code> to the actor waiting in a call to + * <code>!?</code>. + */ + def reply(): Unit = self.reply(()) + + /** + * Returns the number of messages in <code>self</code>'s mailbox + * + * @return the number of messages in <code>self</code>'s mailbox + */ + def mailboxSize: Int = self.mailboxSize + + /** + * <p> + * Converts a synchronous event-based operation into + * an asynchronous <code>Responder</code>. + * </p> + * + * <p>The following example demonstrates its usage:</p> + * + * <pre> + * val adder = reactor { + * for { + * _ <- respondOn(react) { case Add(a, b) => reply(a+b) } + * } yield {} + * } + * </pre> + */ + def respondOn[A, B](fun: PartialFunction[A, Unit] => Nothing): + PartialFunction[A, B] => Responder[B] = + (caseBlock: PartialFunction[A, B]) => new Responder[B] { + def respond(k: B => Unit) = fun(caseBlock andThen k) + } + + private[actors] trait Body[a] { + def andThen[b](other: => b): Unit + } + + implicit def mkBody[a](body: => a) = new Body[a] { + def andThen[b](other: => b): Unit = self.seq(body, other) + } + + /** + * Causes <code>self</code> to repeatedly execute + * <code>body</code>. + * + * @param body the code block to be executed + */ + def loop(body: => Unit): Unit = body andThen loop(body) + + /** + * Causes <code>self</code> to repeatedly execute + * <code>body</code> while the condition + * <code>cond</code> is <code>true</code>. + * + * @param cond the condition to test + * @param body the code block to be executed + */ + def loopWhile(cond: => Boolean)(body: => Unit): Unit = + if (cond) { body andThen loopWhile(cond)(body) } + else continue + + /** + * Links <code>self</code> to actor <code>to</code>. + * + * @param to the actor to link to + * @return + */ + def link(to: AbstractActor): AbstractActor = self.link(to) + + /** + * Links <code>self</code> to actor defined by <code>body</code>. + * + * @param body ... + * @return ... + */ + def link(body: => Unit): Actor = self.link(body) + + /** + * Unlinks <code>self</code> from actor <code>from</code>. + * + * @param from the actor to unlink from + */ + def unlink(from: Actor): Unit = self.unlink(from) + + /** + * <p> + * Terminates execution of <code>self</code> with the following + * effect on linked actors: + * </p> + * <p> + * For each linked actor <code>a</code> with + * <code>trapExit</code> set to <code>true</code>, send message + * <code>Exit(self, reason)</code> to <code>a</code>. + * </p> + * <p> + * For each linked actor <code>a</code> with + * <code>trapExit</code> set to <code>false</code> (default), + * call <code>a.exit(reason)</code> if + * <code>reason != 'normal</code>. + * </p> + */ + def exit(reason: AnyRef): Nothing = self.exit(reason) + + /** + * <p> + * Terminates execution of <code>self</code> with the following + * effect on linked actors: + * </p> + * <p> + * For each linked actor <code>a</code> with + * <code>trapExit</code> set to <code>true</code>, send message + * <code>Exit(self, 'normal)</code> to <code>a</code>. + * </p> + */ + def exit(): Nothing = self.exit() + + def continue: Unit = throw new KillActorException +} + +/** + * <p> + * This class provides an implementation of event-based actors. + * The main ideas of our approach are explained in the two papers + * </p> + * <ul> + * <li> + * <a href="http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf"> + * <span style="font-weight:bold; white-space:nowrap;">Event-Based + * Programming without Inversion of Control</span></a>,<br/> + * Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i>, and + * </li> + * <li> + * <a href="http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf"> + * <span style="font-weight:bold; white-space:nowrap;">Actors that + * Unify Threads and Events</span></a>,<br/> + * Philipp Haller and Martin Odersky, <i>Proc. COORDINATION 2007</i>. + * </li> + * </ul> + * + * @version 0.9.18 + * @author Philipp Haller + */ +@serializable +trait Actor extends AbstractActor { + + private var received: Option[Any] = None + + private val waitingForNone = (m: Any) => false + private var waitingFor: Any => Boolean = waitingForNone + private var isSuspended = false + + protected val mailbox = new MessageQueue + private var sessions: List[OutputChannel[Any]] = Nil + + protected def scheduler: IScheduler = + Scheduler + + /** + * Returns the number of messages in this actor's mailbox + * + * @return the number of messages in this actor's mailbox + */ + def mailboxSize: Int = synchronized { + mailbox.size + } + + /** + * Sends <code>msg</code> to this actor (asynchronous) supplying + * explicit reply destination. + * + * @param msg the message to send + * @param replyTo the reply destination + */ + def send(msg: Any, replyTo: OutputChannel[Any]) = synchronized { + tick() + if (waitingFor(msg)) { + received = Some(msg) + + if (isSuspended) + sessions = replyTo :: sessions + else + sessions = List(replyTo) + + waitingFor = waitingForNone + + if (!onTimeout.isEmpty) { + onTimeout.get.cancel() + onTimeout = None + } + + if (isSuspended) + resumeActor() + else // assert continuation != null + scheduler.execute(new Reaction(this, continuation, msg)) + } else { + mailbox.append(msg, replyTo) + } + } + + /** + * Receives a message from this actor's mailbox. + * + * @param f a partial function with message patterns and actions + * @return result of processing the received value + */ + def receive[R](f: PartialFunction[Any, R]): R = { + assert(Actor.self == this, "receive from channel belonging to other actor") + if (shouldExit) exit() // links + this.synchronized { + tick() + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + isSuspended = true + suspendActor() + } else { + received = Some(qel.msg) + sessions = qel.session :: sessions + } + waitingFor = waitingForNone + isSuspended = false + } + val result = f(received.get) + sessions = sessions.tail + result + } + + /** + * Receives a message from this actor's mailbox within a certain + * time span. + * + * @param msec the time span before timeout + * @param f a partial function with message patterns and actions + * @return result of processing the received value + */ + def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = { + assert(Actor.self == this, "receive from channel belonging to other actor") + if (shouldExit) exit() // links + this.synchronized { + tick() + // first, remove spurious TIMEOUT message from mailbox if any + val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) + + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + if (msec == 0) { + if (f.isDefinedAt(TIMEOUT)) + return f(TIMEOUT) + else + error("unhandled timeout") + } + else { + waitingFor = f.isDefinedAt + isSuspended = true + received = None + suspendActorFor(msec) + if (received.isEmpty) { + if (f.isDefinedAt(TIMEOUT)) { + waitingFor = waitingForNone + isSuspended = false + val result = f(TIMEOUT) + return result + } + else + error("unhandled timeout") + } + } + } else { + received = Some(qel.msg) + sessions = qel.session :: sessions + } + waitingFor = waitingForNone + isSuspended = false + } + val result = f(received.get) + sessions = sessions.tail + result + } + + /** + * Receives a message from this actor's mailbox. + * <p> + * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param f a partial function with message patterns and actions + */ + def react(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.self == this, "react on channel belonging to other actor") + if (shouldExit) exit() // links + this.synchronized { + tick() + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + waitingFor = f.isDefinedAt + continuation = f + isDetached = true + } else { + sessions = List(qel.session) + scheduleActor(f, qel.msg) + } + throw new SuspendActorException + } + } + + /** + * Receives a message from this actor's mailbox within a certain + * time span. + * <p> + * This method never returns. Therefore, the rest of the computation + * has to be contained in the actions of the partial function. + * + * @param msec the time span before timeout + * @param f a partial function with message patterns and actions + */ + def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing = { + assert(Actor.self == this, "react on channel belonging to other actor") + if (shouldExit) exit() // links + this.synchronized { + tick() + // first, remove spurious TIMEOUT message from mailbox if any + val spurious = mailbox.extractFirst((m: Any) => m == TIMEOUT) + + val qel = mailbox.extractFirst((m: Any) => f.isDefinedAt(m)) + if (null eq qel) { + if (msec == 0) { + if (f.isDefinedAt(TIMEOUT)) { + sessions = List(Actor.self) + scheduleActor(f, TIMEOUT) + } + else + error("unhandled timeout") + } + else { + waitingFor = f.isDefinedAt + + val thisActor = this + onTimeout = Some(new TimerTask { + def run() { thisActor ! TIMEOUT } + }) + Actor.timer.schedule(onTimeout.get, msec) + + continuation = f + isDetached = true + } + } else { + sessions = List(qel.session) + scheduleActor(f, qel.msg) + } + throw new SuspendActorException + } + } + + /** + * The behavior of an actor is specified by implementing this + * abstract method. Note that the preferred way to create actors + * is through the <code>actor</code> method + * defined in object <code>Actor</code>. + */ + def act(): Unit + + /** + * Sends <code>msg</code> to this actor (asynchronous). + */ + def !(msg: Any) { + send(msg, Actor.self) + } + + /** + * Forwards <code>msg</code> to this actor (asynchronous). + */ + def forward(msg: Any) { + send(msg, Actor.sender) + } + + /** + * Sends <code>msg</code> to this actor and awaits reply + * (synchronous). + * + * @param msg the message to be sent + * @return the reply + */ + def !?(msg: Any): Any = { + val replyCh = Actor.self.freshReplyChannel + send(msg, replyCh) + replyCh.receive { + case x => x + } + } + + /** + * Sends <code>msg</code> to this actor and awaits reply + * (synchronous) within <code>msec</code> milliseconds. + * + * @param msec the time span before timeout + * @param msg the message to be sent + * @return <code>None</code> in case of timeout, otherwise + * <code>Some(x)</code> where <code>x</code> is the reply + */ + def !?(msec: Long, msg: Any): Option[Any] = { + val replyCh = Actor.self.freshReplyChannel + send(msg, replyCh) + replyCh.receiveWithin(msec) { + case TIMEOUT => None + case x => Some(x) + } + } + + /** + * Sends <code>msg</code> to this actor and immediately + * returns a future representing the reply value. + */ + def !!(msg: Any): Future[Any] = { + val ftch = new Channel[Any](Actor.self) + send(msg, ftch) + new Future[Any](ftch) { + def apply() = + if (isSet) value.get + else ch.receive { + case any => value = Some(any); any + } + def respond(k: Any => Unit): Unit = + if (isSet) k(value.get) + else ch.react { + case any => value = Some(any); k(any) + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(any); true + } + case Some(_) => true + } + } + } + + /** + * Sends <code>msg</code> to this actor and immediately + * returns a future representing the reply value. + * The reply is post-processed using the partial function + * <code>f</code>. This also allows to recover a more + * precise type for the reply value. + */ + def !![A](msg: Any, f: PartialFunction[Any, A]): Future[A] = { + val ftch = new Channel[Any](Actor.self) + send(msg, ftch) + new Future[A](ftch) { + def apply() = + if (isSet) value.get.asInstanceOf[A] + else ch.receive { + case any => value = Some(f(any)); value.get.asInstanceOf[A] + } + def respond(k: A => Unit): Unit = + if (isSet) k(value.get.asInstanceOf[A]) + else ch.react { + case any => value = Some(f(any)); k(value.get.asInstanceOf[A]) + } + def isSet = value match { + case None => ch.receiveWithin(0) { + case TIMEOUT => false + case any => value = Some(f(any)); true + } + case Some(_) => true + } + } + } + + /** + * Replies with <code>msg</code> to the sender. + */ + def reply(msg: Any) { + sender ! msg + } + + private var rc: Channel[Any] = null + private[actors] def replyChannel = rc + private[actors] def freshReplyChannel: Channel[Any] = + { rc = new Channel[Any](this); rc } + + /** + * Receives the next message from this actor's mailbox. + */ + def ? : Any = receive { + case x => x + } + + def sender: OutputChannel[Any] = sessions.head + + def receiver: Actor = this + + private var continuation: PartialFunction[Any, Unit] = null + private var onTimeout: Option[TimerTask] = None + // accessed in Reaction + private[actors] var isDetached = false + private var isWaiting = false + + // guarded by lock of this + protected def scheduleActor(f: PartialFunction[Any, Unit], msg: Any) = + if ((f eq null) && (continuation eq null)) { + // do nothing (timeout is handled instead) + } + else { + val task = new Reaction(this, + if (f eq null) continuation else f, + msg) + scheduler execute task + } + + private var tickCnt = 0 + + private def tick(): Unit = + if (tickCnt == 100) { + tickCnt = 0 + scheduler tick this + } else + tickCnt += 1 + + private[actors] var kill: () => Unit = () => {} + + private def suspendActor() { + isWaiting = true + while (isWaiting) { + try { + wait() + } catch { + case _: InterruptedException => + } + } + // links: check if we should exit + if (shouldExit) exit() + } + + private def suspendActorFor(msec: Long) { + val ts = Platform.currentTime + var waittime = msec + var fromExc = false + isWaiting = true + while (isWaiting) { + try { + fromExc = false + wait(waittime) + } catch { + case _: InterruptedException => { + fromExc = true + val now = Platform.currentTime + val waited = now-ts + waittime = msec-waited + if (waittime < 0) { isWaiting = false } + } + } + if (!fromExc) { isWaiting = false } + } + // links: check if we should exit + if (shouldExit) exit() + } + + private def resumeActor() { + isWaiting = false + notify() + } + + /** + * Starts this actor. + */ + def start(): Actor = synchronized { + // Reset various flags. + // + // Note that we do *not* reset `trapExit`. The reason is that + // users should be able to set the field in the constructor + // and before `act` is called. + + exitReason = 'normal + exiting = false + shouldExit = false + + scheduler execute { + ActorGC.newActor(Actor.this) + (new Reaction(Actor.this)).run() + } + + this + } + + private def seq[a, b](first: => a, next: => b): Unit = { + val s = Actor.self + val killNext = s.kill + s.kill = () => { + s.kill = killNext + + // to avoid stack overflow: + // instead of directly executing `next`, + // schedule as continuation + scheduleActor({ case _ => next }, 1) + throw new SuspendActorException + } + first + throw new KillActorException + } + + private[actors] var links: List[AbstractActor] = Nil + + /** + * Links <code>self</code> to actor <code>to</code>. + * + * @param to ... + * @return ... + */ + def link(to: AbstractActor): AbstractActor = { + assert(Actor.self == this, "link called on actor different from self") + synchronized { + links = to :: links + } + to.linkTo(this) + to + } + + /** + * Links <code>self</code> to actor defined by <code>body</code>. + */ + def link(body: => Unit): Actor = { + val actor = new Actor { + def act() = body + } + link(actor) + actor.start() + actor + } + + private[actors] def linkTo(to: AbstractActor) = synchronized { + links = to :: links + } + + /** + * Unlinks <code>self</code> from actor <code>from</code>. + */ + def unlink(from: AbstractActor) { + assert(Actor.self == this, "unlink called on actor different from self") + synchronized { + links = links.remove(from.==) + } + from.unlinkFrom(this) + } + + private[actors] def unlinkFrom(from: AbstractActor) = synchronized { + links = links.remove(from.==) + } + + var trapExit = false + private[actors] var exitReason: AnyRef = 'normal + private[actors] var shouldExit = false + + /** + * <p> + * Terminates execution of <code>self</code> with the following + * effect on linked actors: + * </p> + * <p> + * For each linked actor <code>a</code> with + * <code>trapExit</code> set to <code>true</code>, send message + * <code>Exit(self, reason)</code> to <code>a</code>. + * </p> + * <p> + * For each linked actor <code>a</code> with + * <code>trapExit</code> set to <code>false</code> (default), + * call <code>a.exit(reason)</code> if + * <code>reason != 'normal</code>. + * </p> + */ + def exit(reason: AnyRef): Nothing = { + exitReason = reason + exit() + } + + /** + * Terminates with exit reason <code>'normal</code>. + */ + def exit(): Nothing = { + // links + if (!links.isEmpty) + exitLinked() + throw new ExitActorException + } + + // Assume !links.isEmpty + private[actors] def exitLinked() { + exiting = true + // remove this from links + val mylinks = links.remove(this.==) + // exit linked processes + mylinks.foreach((linked: AbstractActor) => { + unlink(linked) + if (!linked.exiting) + linked.exit(this, exitReason) + }) + } + + // Assume !links.isEmpty + private[actors] def exitLinked(reason: AnyRef) { + exitReason = reason + exitLinked() + } + + // Assume !this.exiting + private[actors] def exit(from: AbstractActor, reason: AnyRef) { + if (trapExit) { + this ! Exit(from, reason) + } + else if (reason != 'normal) + this.synchronized { + shouldExit = true + exitReason = reason + if (isSuspended) + resumeActor() + else if (isDetached) + scheduleActor(null, null) + } + } + +} + + +/** <p> + * This object is used as the timeout pattern in + * <a href="Actor.html#receiveWithin(Long)" target="contentFrame"> + * <code>receiveWithin</code></a> and + * <a href="Actor.html#reactWithin(Long)" target="contentFrame"> + * <code>reactWithin</code></a>. + * </p> + * <p> + * The following example demonstrates its usage: + * </p><pre> + * receiveWithin(500) { + * <b>case</b> (x, y) <b>=></b> ... + * <b>case</b> TIMEOUT <b>=></b> ... + * }</pre> + * + * @version 0.9.8 + * @author Philipp Haller + */ +case object TIMEOUT + + +case class Exit(from: AbstractActor, reason: AnyRef) + +/** <p> + * This class is used to manage control flow of actor + * executions. + * </p> + * + * @version 0.9.8 + * @author Philipp Haller + */ +private[actors] class SuspendActorException extends Throwable { + /* + * For efficiency reasons we do not fill in + * the execution stack trace. + */ + override def fillInStackTrace(): Throwable = this +} diff --git a/src/jvm14-actors/scala/actors/FJTaskScheduler2.scala b/src/jvm14-actors/scala/actors/FJTaskScheduler2.scala new file mode 100644 index 0000000000..f40cc2927a --- /dev/null +++ b/src/jvm14-actors/scala/actors/FJTaskScheduler2.scala @@ -0,0 +1,172 @@ +/* __ *\ +** ________ ___ / / ___ Scala API ** +** / __/ __// _ | / / / _ | (c) 2005-2007, LAMP/EPFL ** +** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** +** /____/\___/_/ |_/____/_/ | | ** +** |/ ** +\* */ + +// $Id: FJTaskScheduler2.scala 16171 2008-09-29 09:28:09Z phaller $ + +package scala.actors + +import compat.Platform + +import java.lang.{Runnable, Thread, InterruptedException, System, Runtime} + +import scala.collection.Set +import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, Queue, Stack, HashSet} + +/** + * FJTaskScheduler2 + * + * @version 0.9.18 + * @author Philipp Haller + */ +class FJTaskScheduler2 extends Thread with IScheduler { + // as long as this thread runs, JVM should not exit + setDaemon(false) + + var printStats = false + + val rt = Runtime.getRuntime() + val minNumThreads = 4 + + val coreProp = try { + System.getProperty("actors.corePoolSize") + } catch { + case ace: java.security.AccessControlException => + null + } + val maxProp = + try { + System.getProperty("actors.maxPoolSize") + } catch { + case ace: java.security.AccessControlException => + null + } + + val initCoreSize = + if (null ne coreProp) Integer.parseInt(coreProp) + else { + val numCores = rt.availableProcessors() + if (2 * numCores > minNumThreads) + 2 * numCores + else + minNumThreads + } + + val maxSize = + if (null ne maxProp) Integer.parseInt(maxProp) + else 256 + + private var coreSize = initCoreSize + + private val executor = + new FJTaskRunnerGroup(coreSize) + + private var terminating = false + private var suspending = false + + private var lastActivity = Platform.currentTime + + private var submittedTasks = 0 + + def printActorDump {} + + private val TICK_FREQ = 50 + private val CHECK_FREQ = 100 + + def onLockup(handler: () => Unit) = + lockupHandler = handler + + def onLockup(millis: Int)(handler: () => Unit) = { + //LOCKUP_CHECK_FREQ = millis / CHECK_FREQ + lockupHandler = handler + } + + private var lockupHandler: () => Unit = null + + override def run() { + try { + while (!terminating) { + this.synchronized { + try { + wait(CHECK_FREQ) + } catch { + case _: InterruptedException => + if (terminating) throw new QuitException + } + + if (!suspending) { + + ActorGC.gc() + + // check if we need more threads + if (Platform.currentTime - lastActivity >= TICK_FREQ + && coreSize < maxSize + && executor.checkPoolSize()) { + //Debug.info(this+": increasing thread pool size") + coreSize += 1 + lastActivity = Platform.currentTime + } + else { + if (ActorGC.allTerminated) { + // if all worker threads idle terminate + if (executor.getActiveCount() == 0) { + Debug.info(this+": initiating shutdown...") + + // Note that we don't have to shutdown + // the FJTaskRunnerGroup since there is + // no separate thread associated with it, + // and FJTaskRunner threads have daemon status. + + // terminate timer thread + Actor.timer.cancel() + throw new QuitException + } + } + } + } + } // sync + + } // while (!terminating) + } catch { + case _: QuitException => + // allow thread to exit + if (printStats) executor.stats() + } + } + + /** + * @param task the task to be executed + */ + def execute(task: Runnable): Unit = + executor execute task + + def execute(fun: => Unit): Unit = + executor.execute(new Runnable { + def run() { fun } + }) + + /** + * @param a the actor + */ + def tick(a: Actor) { + lastActivity = Platform.currentTime + } + + /** Shuts down all idle worker threads. + */ + def shutdown(): Unit = synchronized { + terminating = true + // terminate timer thread + Actor.timer.cancel() + } + + def snapshot(): LinkedQueue = { + suspending = true + executor.snapshot() + } + +} |