diff options
Diffstat (limited to 'src/actors/scala/actors/Reactor.scala')
-rw-r--r-- | src/actors/scala/actors/Reactor.scala | 307 |
1 files changed, 0 insertions, 307 deletions
diff --git a/src/actors/scala/actors/Reactor.scala b/src/actors/scala/actors/Reactor.scala deleted file mode 100644 index aa985b3a17..0000000000 --- a/src/actors/scala/actors/Reactor.scala +++ /dev/null @@ -1,307 +0,0 @@ -/* __ *\ -** ________ ___ / / ___ Scala API ** -** / __/ __// _ | / / / _ | (c) 2005-2013, LAMP/EPFL ** -** __\ \/ /__/ __ |/ /__/ __ | http://scala-lang.org/ ** -** /____/\___/_/ |_/____/_/ | | ** -** |/ ** -\* */ - - -package scala.actors - -import scala.actors.scheduler.{DelegatingScheduler, ExecutorScheduler, - ForkJoinScheduler, ThreadPoolConfig} -import java.util.concurrent.{ThreadPoolExecutor, TimeUnit, LinkedBlockingQueue} -import scala.language.implicitConversions - -private[actors] object Reactor { - - val scheduler = new DelegatingScheduler { - def makeNewScheduler: IScheduler = { - val sched = if (!ThreadPoolConfig.useForkJoin) { - // default is non-daemon - val workQueue = new LinkedBlockingQueue[Runnable] - ExecutorScheduler( - new ThreadPoolExecutor(ThreadPoolConfig.corePoolSize, - ThreadPoolConfig.maxPoolSize, - 60000L, - TimeUnit.MILLISECONDS, - workQueue, - new ThreadPoolExecutor.CallerRunsPolicy)) - } else { - // default is non-daemon, non-fair - val s = new ForkJoinScheduler(ThreadPoolConfig.corePoolSize, ThreadPoolConfig.maxPoolSize, false, false) - s.start() - s - } - Debug.info(this+": starting new "+sched+" ["+sched.getClass+"]") - sched - } - } - - val waitingForNone: PartialFunction[Any, Unit] = new PartialFunction[Any, Unit] { - def isDefinedAt(x: Any) = false - def apply(x: Any) {} - } -} - -/** - * Super trait of all actor traits. - * - * @author Philipp Haller - * - * @define actor reactor - */ -@deprecated("Use the akka.actor package instead. For migration from the scala.actors package refer to the Actors Migration Guide.", "2.11.0") -trait Reactor[Msg >: Null] extends OutputChannel[Msg] with Combinators { - - /* The $actor's mailbox. */ - private[actors] val mailbox = new MQueue[Msg]("Reactor") - - // guarded by this - private[actors] val sendBuffer = new MQueue[Msg]("SendBuffer") - - /* Whenever this $actor executes on some thread, `waitingFor` is - * guaranteed to be equal to `Reactor.waitingForNone`. - * - * In other words, whenever `waitingFor` is not equal to - * `Reactor.waitingForNone`, this $actor is guaranteed not to execute - * on some thread. - * - * If the $actor waits in a `react`, `waitingFor` holds the - * message handler that `react` was called with. - * - * guarded by this - */ - private[actors] var waitingFor: PartialFunction[Msg, Any] = - Reactor.waitingForNone - - // guarded by this - private[actors] var _state: Actor.State.Value = Actor.State.New - - /** - * The $actor's behavior is specified by implementing this method. - */ - def act(): Unit - - /** - * This partial function is applied to exceptions that propagate out of - * this $actor's body. - */ - protected[actors] def exceptionHandler: PartialFunction[Exception, Unit] = - Map() - - protected[actors] def scheduler: IScheduler = - Reactor.scheduler - - protected[actors] def mailboxSize: Int = - mailbox.size - - def send(msg: Msg, replyTo: OutputChannel[Any]) { - val todo = synchronized { - if (waitingFor ne Reactor.waitingForNone) { - val savedWaitingFor = waitingFor - waitingFor = Reactor.waitingForNone - startSearch(msg, replyTo, savedWaitingFor) - } else { - sendBuffer.append(msg, replyTo) - () => { /* do nothing */ } - } - } - todo() - } - - private[actors] def startSearch(msg: Msg, replyTo: OutputChannel[Any], handler: PartialFunction[Msg, Any]) = - () => scheduler execute makeReaction(() => { - val startMbox = new MQueue[Msg]("Start") - synchronized { startMbox.append(msg, replyTo) } - searchMailbox(startMbox, handler, true) - }) - - private[actors] final def makeReaction(fun: () => Unit): Runnable = - makeReaction(fun, null, null) - - /* This method is supposed to be overridden. */ - private[actors] def makeReaction(fun: () => Unit, handler: PartialFunction[Msg, Any], msg: Msg): Runnable = - new ReactorTask(this, fun, handler, msg) - - private[actors] def resumeReceiver(item: (Msg, OutputChannel[Any]), handler: PartialFunction[Msg, Any], onSameThread: Boolean) { - if (onSameThread) - makeReaction(null, handler, item._1).run() - else - scheduleActor(handler, item._1) - - /* Here, we throw a SuspendActorControl to avoid - terminating this actor when the current ReactorTask - is finished. - - The SuspendActorControl skips the termination code - in ReactorTask. - */ - throw Actor.suspendException - } - - def !(msg: Msg) { - send(msg, null) - } - - def forward(msg: Msg) { - send(msg, null) - } - - def receiver: Actor = this.asInstanceOf[Actor] - - // guarded by this - private[actors] def drainSendBuffer(mbox: MQueue[Msg]) { - sendBuffer.foreachDequeue(mbox) - } - - private[actors] def searchMailbox(startMbox: MQueue[Msg], - handler: PartialFunction[Msg, Any], - resumeOnSameThread: Boolean) { - var tmpMbox = startMbox - var done = false - while (!done) { - val qel = tmpMbox.extractFirst(handler) - if (tmpMbox ne mailbox) - tmpMbox.foreachAppend(mailbox) - if (null eq qel) { - synchronized { - // in mean time new stuff might have arrived - if (!sendBuffer.isEmpty) { - tmpMbox = new MQueue[Msg]("Temp") - drainSendBuffer(tmpMbox) - // keep going - } else { - waitingFor = handler - /* Here, we throw a SuspendActorControl to avoid - terminating this actor when the current ReactorTask - is finished. - - The SuspendActorControl skips the termination code - in ReactorTask. - */ - throw Actor.suspendException - } - } - } else { - resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread) - done = true - } - } - } - - /** - * Receives a message from this $actor's mailbox. - * - * This method never returns. Therefore, the rest of the computation - * has to be contained in the actions of the partial function. - * - * @param handler a partial function with message patterns and actions - */ - protected def react(handler: PartialFunction[Msg, Unit]): Nothing = { - synchronized { drainSendBuffer(mailbox) } - searchMailbox(mailbox, handler, false) - throw Actor.suspendException - } - - /* This method is guaranteed to be executed from inside - * an $actor's act method. - * - * assume handler != null - * - * never throws SuspendActorControl - */ - private[actors] def scheduleActor(handler: PartialFunction[Msg, Any], msg: Msg) { - scheduler executeFromActor makeReaction(null, handler, msg) - } - - private[actors] def preAct() = {} - - // guarded by this - private[actors] def dostart() { - _state = Actor.State.Runnable - scheduler newActor this - scheduler execute makeReaction(() => { - preAct() - act() - }, null, null) - } - - /** - * Starts this $actor. This method is idempotent. - */ - def start(): Reactor[Msg] = synchronized { - if (_state == Actor.State.New) - dostart() - this - } - - /** - * Restarts this $actor. - * - * @throws java.lang.IllegalStateException if the $actor is not in state `Actor.State.Terminated` - */ - def restart(): Unit = synchronized { - if (_state == Actor.State.Terminated) - dostart() - else - throw new IllegalStateException("restart only in state "+Actor.State.Terminated) - } - - /** Returns the execution state of this $actor. - * - * @return the execution state - */ - def getState: Actor.State.Value = synchronized { - if (waitingFor ne Reactor.waitingForNone) - Actor.State.Suspended - else - _state - } - - implicit def mkBody[A](body: => A) = new InternalActor.Body[A] { - def andThen[B](other: => B): Unit = Reactor.this.seq(body, other) - } - - /* This closure is used to implement control-flow operations - * built on top of `seq`. Note that the only invocation of - * `kill` is supposed to be inside `ReactorTask.run`. - */ - @volatile - private[actors] var kill: () => Unit = - () => { exit() } - - private[actors] def seq[a, b](first: => a, next: => b): Unit = { - val killNext = this.kill - this.kill = () => { - this.kill = killNext - - // to avoid stack overflow: - // instead of directly executing `next`, - // schedule as continuation - scheduleActor({ case _ => next }, null) - throw Actor.suspendException - } - first - throw new KillActorControl - } - - protected[actors] def exit(): Nothing = { - terminated() - throw Actor.suspendException - } - - private[actors] def internalPostStop() = {} - - private[actors] def terminated() { - synchronized { - _state = Actor.State.Terminated - // reset waitingFor, otherwise getState returns Suspended - waitingFor = Reactor.waitingForNone - } - internalPostStop() - scheduler.terminated(this) - } - -} |