summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/Actor.scala
blob: 57e107538c7a554e0e309553559f19781bb9c2eb (plain) (tree)
1
2
3
4
5
6
7
8
9

                                                                          
                                                                          
                                                                          



                                                                          
 

                    
                                          
                                   
 
   
                                           
                                               



                                                                
   
                                  
 
                        
          













                                                     












                                                        
 

                                             
 
                                                                
 
     


                                                                  

                                                   
     

                                   


                                                      

                                             
 
                                                                  

                    
                                                         








                                             

   
     






                                                                  
                    
                  
                                                  
                                                                   


     
                                                            






                                                                 
                   
                


     
                                                       
    
                 




                                
        



                                                                                  
                                     
                       
                      
                                                                
     

             

   
     
                                             
                                         
    
                 








                                         
        
    
                                                                            






                                                                                  
                                                                





             


                                                                    
                      
 
     


                                                                
    







                                             

                                                                 
     
                                                 
                   
 
     






                                                            


                                                                    
     
                                                                   
                               
 
     
                                                 




                                                                



















                                                                                

                                                                 
     
                                                     
                    
 
     
                                                       




                                                                


                                                                    
     
                                                                       
                             
 
                                                         
                                                        
 
                                                                                     
                                              
                                      
                                                             
                       
                                
                   


     
     
                                                            
     
                                  
                  
 
     
                                                             
                     
     
                             
                      
 
     
                                                            
                     
     
                     
                     
 




                                                                  
                                            
 
     
                                                      
                                 
    
                 

                          
                                                               

                 
        
     


                                                                
                                                            

       
                                 
                                     

   
                                                    
                                                                

   
     

                                                      
                                    
                                   
     
                                                            

     
                                                                       
    

                                                 
     
                                                  

     

                                                            
                                         
     
                                                           

     






                                                                   
                                                         




                                                                 
                                      
         
     
                                                       











                                                                   
                                      
 

 
   
      
                                                        


































                                                                                       
       

                                                                       



                                                                      
                                                           


                                                                     
                                                                    
                                                                      
                                           
                                                                         

          
       
  
                         

                      
                                  
   

                                                                                                                 
 



                                                             
           
                                 
 



                                                                
           

                                          

                                                                  
                                                                                                                       





                                                   
 




































                                                                                 

                                                                                                                      
 
                                                     
                                                   
                                                                                          
 
                  
                                     
                              



                    





                                                                               
                        


                                                      
                                    

                         
                          
                              
                                           

                                    

           

                                
                                        
                   
       
     
 
                                
                   
                          


          
                                                           
                                                                     
                                                                                          
 
                  
                                     
                              


                                                                 
                                                                               
 




                                   
                                      
     
 

                    





                                                                               
                        


                                                      
                                    

                         
                                  


                          
                                    


                                              

                                              


                                                    

                                                       










                                                                                        

           
              
              





                                   
                                
                                        
                   
       
     
 
                                
                   
                          


          
                                                   
                                                                      
                  
                            
     
                        

   
                                                         
                                                                                        
                  
                            
     
                                    

   
                                                   

                         

   
                            
                                     
                                                                                      
                    


                                                
                                                  
                                     

     

                                                                                
                   
                               

          
                      


                             
                                             
                         



                                       

       

                                     
   
 
                             
                       

            
 



                                                       











                                                                 
 


                                              
   
 
                            










                                                           
                                                      
 
     

                                                      

                                   
     
                                                
                                                                                     

                  


      
     



                                                                       
     
                                    
                                                                                     
                       
                      
                                                                     
     


             

   
                                                                

                       
 
     
                                                            
     
                                   
                                                                                       

                        

   
                                                                      
                                    
   
 
           
                      
                    
                                          
                    
                                        
 
     






                                                                   
                                                         




                                                                 
                                      
         
     



                                                         
          
   
 


                                                      







                                                    
                
   
 
                          
                    
                                                  
                                   

                                                             
                             
                                          





                                                  

                                   
                                         
           


         
   
 
                          
                    
                                                                
                       
                

   
                         
                                                                 
                   
                               

                               
                    

                           


                                          

                        
                                                        

                                                                        
                                         
                                                            



                                                                  
                                                   
             
         
       
   
 
                                                                      
                                    
     
                                               
                                     
   

 
 

                                    
                                                                     
                                        
                                                                   
                                   

               
                          



                           
  
                          



                   
                    





                                                                   
                                                    
 
                                             
  

                         
                                                                  
/*                     __                                               *\
**     ________ ___   / /  ___     Scala API                            **
**    / __/ __// _ | / /  / _ |    (c) 2005-2011, LAMP/EPFL             **
**  __\ \/ /__/ __ |/ /__/ __ |    http://scala-lang.org/               **
** /____/\___/_/ |_/____/_/ | |                                         **
**                          |/                                          **
\*                                                                      */


package scala.actors

import scala.util.control.ControlThrowable
import java.util.{Timer, TimerTask}

/**
 * Provides functions for the definition of
 * actors, as well as actor operations, such as
 * <code>receive</code>, <code>react</code>, <code>reply</code>,
 * etc.
 *
 * @author Philipp Haller
 */
object Actor extends Combinators {

  /** State of an actor.
   *  <ul>
   *    <li><b>New</b> -
   *      Not yet started</li>
   *    <li><b>Runnable</b> -
   *      Executing</li>
   *    <li><b>Suspended</b> -
   *      Suspended, waiting in a `react`</li>
   *    <li><b>TimedSuspended</b> -
   *      Suspended, waiting in a `reactWithin` </li>
   *    <li><b>Blocked</b> -
   *      Blocked waiting in a `receive` </li>
   *    <li><b>TimedBlocked</b> -
   *      Blocked waiting in a `receiveWithin` </li>
   *    <li><b>Terminated</b> -
   *      Actor has terminated </li>
   *  </ul>
   */
  object State extends Enumeration {
    val New,
        Runnable,
        Suspended,
        TimedSuspended,
        Blocked,
        TimedBlocked,
        Terminated = Value
  }

  private[actors] val tl = new ThreadLocal[ReplyReactor]

  // timer thread runs as daemon
  private[actors] val timer = new Timer(true)

  private[actors] val suspendException = new SuspendActorControl

  /**
   * Returns the currently executing actor. Should be used instead
   * of <code>this</code> in all blocks of code executed by
   * actors.
   *
   * @return returns the currently executing actor.
   */
  def self: Actor = self(Scheduler)

  private[actors] def self(sched: IScheduler): Actor =
    rawSelf(sched).asInstanceOf[Actor]

  private[actors] def rawSelf: ReplyReactor =
    rawSelf(Scheduler)

  private[actors] def rawSelf(sched: IScheduler): ReplyReactor = {
    val s = tl.get
    if (s eq null) {
      val r = new ActorProxy(Thread.currentThread, sched)
      tl.set(r)
      r
    } else
      s
  }

  private def parentScheduler: IScheduler = {
    val s = tl.get
    if (s eq null) Scheduler else s.scheduler
  }

  /**
   * Resets an actor proxy associated with the current thread.
   * It replaces the implicit <code>ActorProxy</code> instance
   * of the current thread (if any) with a new instance.
   *
   * This permits to re-use the current thread as an actor
   * even if its <code>ActorProxy</code> has died for some reason.
   */
  def resetProxy() {
    val a = tl.get
    if ((null ne a) && a.isInstanceOf[ActorProxy])
      tl.set(new ActorProxy(Thread.currentThread, parentScheduler))
  }

  /**
   * Removes any reference to an <code>Actor</code> instance
   * currently stored in thread-local storage.
   *
   * This allows to release references from threads that are
   * potentially long-running or being re-used (e.g. inside
   * a thread pool). Permanent references in thread-local storage
   * are a potential memory leak.
   */
  def clearSelf() {
    tl.set(null)
  }

  /**
   * Factory method for creating and starting an actor.
   *
   * @example {{{
   * import scala.actors.Actor._
   * ...
   * val a = actor {
   *   ...
   * }
   * }}}
   *
   * @param  body  the code block to be executed by the newly created actor
   * @return       the newly created actor. Note that it is automatically started.
   */
  def actor(body: => Unit): Actor = {
    val a = new Actor {
      def act() = body
      override final val scheduler: IScheduler = parentScheduler
    }
    a.start()
    a
  }

  /**
   * Factory method for creating actors whose
   * body is defined using a `Responder`.
   *
   * @example {{{
   * import scala.actors.Actor._
   * import Responder.exec
   * ...
   * val a = reactor {
   *   for {
   *     res <- b !! MyRequest;
   *     if exec(println("result: "+res))
   *   } yield {}
   * }
   * }}}
   *
   * @param  body  the `Responder` to be executed by the newly created actor
   * @return       the newly created actor. Note that it is automatically started.
   */
  def reactor(body: => Responder[Unit]): Actor = {
    val a = new Actor {
      def act() {
        Responder.run(body)
      }
      override final val scheduler: IScheduler = parentScheduler
    }
    a.start()
    a
  }

  /**
   * Receives the next message from the mailbox of the current actor
   * <code>self</code>.
   */
  def ? : Any = self.?

  /**
   * Receives a message from the mailbox of
   * <code>self</code>. Blocks if no message matching any of the
   * cases of <code>f</code> can be received.
   *
   * @example {{{
   * receive {
   *   case "exit" => println("exiting")
   *   case 42 => println("got the answer")
   *   case x:Int => println("got an answer")
   * }
   * }}}
   *
   * @param  f a partial function specifying patterns and actions
   * @return   the result of processing the received message
   */
  def receive[A](f: PartialFunction[Any, A]): A =
    self.receive(f)

  /**
   * Receives a message from the mailbox of
   * <code>self</code>. Blocks at most <code>msec</code>
   * milliseconds if no message matching any of the cases of
   * <code>f</code> can be received. If no message could be
   * received the <code>TIMEOUT</code> action is executed if
   * specified.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function specifying patterns and actions
   * @return      the result of processing the received message
   */
  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R =
    self.receiveWithin(msec)(f)

  /**
   * Lightweight variant of <code>receive</code>.
   *
   * Actions in <code>f</code> have to contain the rest of the
   * computation of <code>self</code>, as this method will never
   * return.
   *
   * A common method of continuting the computation is to send a message
   * to another actor:
   * {{{
   * react {
   *   case Get(from) =>
   *     react {
   *       case Put(x) => from ! x
   *     }
   * }
   * }}}
   *
   * Another common method is to use `loop` to continuously `react` to messages:
   * {{{
   * loop {
   *   react {
   *     case Msg(data) => // process data
   *   }
   * }
   * }}}
   *
   * @param  f a partial function specifying patterns and actions
   * @return   this function never returns
   */
  def react(f: PartialFunction[Any, Unit]): Nothing =
    rawSelf.react(f)

  /**
   * Lightweight variant of <code>receiveWithin</code>.
   *
   * Actions in <code>f</code> have to contain the rest of the
   * computation of <code>self</code>, as this method will never
   * return.
   *
   * @param  msec the time span before timeout
   * @param  f    a partial function specifying patterns and actions
   * @return      this function never returns
   */
  def reactWithin(msec: Long)(f: PartialFunction[Any, Unit]): Nothing =
    self.reactWithin(msec)(f)

  def eventloop(f: PartialFunction[Any, Unit]): Nothing =
    rawSelf.react(new RecursiveProxyHandler(rawSelf, f))

  private class RecursiveProxyHandler(a: ReplyReactor, f: PartialFunction[Any, Unit])
          extends PartialFunction[Any, Unit] {
    def isDefinedAt(m: Any): Boolean =
      true // events are immediately removed from the mailbox
    def apply(m: Any) {
      if (f.isDefinedAt(m)) f(m)
      a.react(this)
    }
  }

  /**
   * Returns the actor which sent the last received message.
   */
  def sender: OutputChannel[Any] =
    rawSelf.sender

  /**
   * Sends <code>msg</code> to the actor waiting in a call to
   * <code>!?</code>.
   */
  def reply(msg: Any): Unit =
    rawSelf.reply(msg)

  /**
   * Sends <code>()</code> to the actor waiting in a call to
   * <code>!?</code>.
   */
  def reply(): Unit =
    rawSelf.reply(())

  /**
   * Returns the number of messages in <code>self</code>'s mailbox
   *
   * @return the number of messages in <code>self</code>'s mailbox
   */
  def mailboxSize: Int = rawSelf.mailboxSize

  /**
   * Converts a synchronous event-based operation into
   * an asynchronous `Responder`.
   *
   * @example {{{
   * val adder = reactor {
   *   for {
   *     _ <- respondOn(react) { case Add(a, b) => reply(a+b) }
   *   } yield {}
   * }
   * }}}
   */
  def respondOn[A, B](fun: PartialFunction[A, Unit] => Nothing):
    PartialFunction[A, B] => Responder[B] =
      (caseBlock: PartialFunction[A, B]) => new Responder[B] {
        def respond(k: B => Unit) = fun(caseBlock andThen k)
      }

  private[actors] trait Body[a] {
    def andThen[b](other: => b): Unit
  }

  implicit def mkBody[a](body: => a) = new Body[a] {
    def andThen[b](other: => b): Unit = rawSelf.seq(body, other)
  }

  /**
   * Links <code>self</code> to actor <code>to</code>.
   *
   * @param  to the actor to link to
   * @return    the parameter actor
   */
  def link(to: AbstractActor): AbstractActor = self.link(to)

  /**
   * Links <code>self</code> to the actor defined by <code>body</code>.
   *
   * @param body the body of the actor to link to
   * @return     the parameter actor
   */
  def link(body: => Unit): Actor = self.link(body)

  /**
   * Unlinks <code>self</code> from actor <code>from</code>.
   *
   * @param from the actor to unlink from
   */
  def unlink(from: AbstractActor): Unit = self.unlink(from)

  /**
   * <p>
   *   Terminates execution of <code>self</code> with the following
   *   effect on linked actors:
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>true</code>, send message
   *   <code>Exit(self, reason)</code> to <code>a</code>.
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>false</code> (default),
   *   call <code>a.exit(reason)</code> if
   *   <code>reason != 'normal</code>.
   * </p>
   */
  def exit(reason: AnyRef): Nothing = self.exit(reason)

  /**
   * <p>
   *   Terminates execution of <code>self</code> with the following
   *   effect on linked actors:
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>true</code>, send message
   *   <code>Exit(self, 'normal)</code> to <code>a</code>.
   * </p>
   */
  def exit(): Nothing = rawSelf.exit()

}

/**
 * <p>
 *   Provides lightweight, concurrent actors. Actors are
 *   created by extending the `Actor` trait (alternatively, one of the
 *   factory methods in its companion object can be used).  The
 *   behavior of an `Actor` subclass is defined by implementing its
 *   `act` method:
 *
 *   {{{
 *   class MyActor extends Actor {
 *     def act() {
 *       // actor behavior goes here
 *     }
 *   }
 *   }}}
 *
 *   A new `Actor` instance is started by invoking its `start` method.
 *
 *   '''Note:''' care must be taken when invoking thread-blocking methods
 *   other than those provided by the `Actor` trait or its companion
 *   object (such as `receive`). Blocking the underlying thread inside
 *   an actor may lead to starvation of other actors. This also
 *   applies to actors hogging their thread for a long time between
 *   invoking `receive`/`react`.
 *
 *   If actors use blocking operations (for example, methods for
 *   blocking I/O), there are several options:
 *   <ul>
 *     <li>The run-time system can be configured to use a larger thread pool size
 *     (for example, by setting the `actors.corePoolSize` JVM property).</li>
 *
 *     <li>The `scheduler` method of the `Actor` trait can be overridden to return a
 *     `ResizableThreadPoolScheduler`, which resizes its thread pool to
 *     avoid starvation caused by actors that invoke arbitrary blocking methods.</li>
 *
 *     <li>The `actors.enableForkJoin` JVM property can be set to `false`, in which
 *     case a `ResizableThreadPoolScheduler` is used by default to execute actors.</li>
 *   </ul>
 * </p>
 * <p>
 * The main ideas of the implementation are explained in the two papers
 * <ul>
 *   <li>
 *     <a href="http://lampwww.epfl.ch/~odersky/papers/jmlc06.pdf">
 *     <span style="font-weight:bold; white-space:nowrap;">Event-Based
 *     Programming without Inversion of Control</span></a>,
 *     Philipp Haller and Martin Odersky, <i>Proc. JMLC 2006</i>, and
 *   </li>
 *   <li>
 *     <a href="http://lamp.epfl.ch/~phaller/doc/haller07coord.pdf">
 *     <span style="font-weight:bold; white-space:nowrap;">Actors that
 *     Unify Threads and Events</span></a>,
 *     Philipp Haller and Martin Odersky, <i>Proc. COORDINATION 2007</i>.
 *   </li>
 * </ul>
 * </p>
 *
 * @author Philipp Haller
 *
 * @define actor actor
 * @define channel actor's mailbox
 */
@SerialVersionUID(-781154067877019505L)
trait Actor extends AbstractActor with ReplyReactor with ActorCanReply with InputChannel[Any] with Serializable {

  /* The following two fields are only used when the actor
   * suspends by blocking its underlying thread, for example,
   * when waiting in a receive or synchronous send.
   */
  @volatile
  private var isSuspended = false

  /* This field is used to communicate the received message from
   * the invocation of send to the place where the thread of
   * the receiving actor resumes inside receive/receiveWithin.
   */
  @volatile
  private var received: Option[Any] = None

  protected[actors] override def scheduler: IScheduler = Scheduler

  private[actors] override def startSearch(msg: Any, replyTo: OutputChannel[Any], handler: PartialFunction[Any, Any]) =
    if (isSuspended) {
      () => synchronized {
        mailbox.append(msg, replyTo)
        resumeActor()
      }
    } else super.startSearch(msg, replyTo, handler)

  // we override this method to check `shouldExit` before suspending
  private[actors] override def searchMailbox(startMbox: MQueue[Any],
                                             handler: PartialFunction[Any, Any],
                                             resumeOnSameThread: Boolean) {
    var tmpMbox = startMbox
    var done = false
    while (!done) {
      val qel = tmpMbox.extractFirst((msg: Any, replyTo: OutputChannel[Any]) => {
        senders = List(replyTo)
        handler.isDefinedAt(msg)
      })
      if (tmpMbox ne mailbox)
        tmpMbox.foreach((m, s) => mailbox.append(m, s))
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            tmpMbox = new MQueue[Any]("Temp")
            drainSendBuffer(tmpMbox)
            // keep going
          } else {
            // very important to check for `shouldExit` at this point
            // since linked actors might have set it after we checked
            // last time (e.g., at the beginning of `react`)
            if (shouldExit) exit()
            waitingFor = handler
            // see Reactor.searchMailbox
            throw Actor.suspendException
          }
        }
      } else {
        resumeReceiver((qel.msg, qel.session), handler, resumeOnSameThread)
        done = true
      }
    }
  }

  private[actors] override def makeReaction(fun: () => Unit, handler: PartialFunction[Any, Any], msg: Any): Runnable =
    new ActorTask(this, fun, handler, msg)

  /** See the companion object's `receive` method. */
  def receive[R](f: PartialFunction[Any, R]): R = {
    assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")

    synchronized {
      if (shouldExit) exit() // links
      drainSendBuffer(mailbox)
    }

    var done = false
    while (!done) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = replyTo :: senders
        val matches = f.isDefinedAt(m)
        senders = senders.tail
        matches
      })
      if (null eq qel) {
        synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            drainSendBuffer(mailbox)
            // keep going
          } else {
            waitingFor = f
            isSuspended = true
            scheduler.managedBlock(blocker)
            drainSendBuffer(mailbox)
            // keep going
          }
        }
      } else {
        received = Some(qel.msg)
        senders = qel.session :: senders
        done = true
      }
    }

    val result = f(received.get)
    received = None
    senders = senders.tail
    result
  }

  /** See the companion object's `receiveWithin` method. */
  def receiveWithin[R](msec: Long)(f: PartialFunction[Any, R]): R = {
    assert(Actor.self(scheduler) == this, "receive from channel belonging to other actor")

    synchronized {
      if (shouldExit) exit() // links
      drainSendBuffer(mailbox)
    }

    // first, remove spurious TIMEOUT message from mailbox if any
    mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => m == TIMEOUT)

    val receiveTimeout = () => {
      if (f.isDefinedAt(TIMEOUT)) {
        received = Some(TIMEOUT)
        senders = this :: senders
      } else
        sys.error("unhandled timeout")
    }

    var done = false
    while (!done) {
      val qel = mailbox.extractFirst((m: Any, replyTo: OutputChannel[Any]) => {
        senders = replyTo :: senders
        val matches = f.isDefinedAt(m)
        senders = senders.tail
        matches
      })
      if (null eq qel) {
        val todo = synchronized {
          // in mean time new stuff might have arrived
          if (!sendBuffer.isEmpty) {
            drainSendBuffer(mailbox)
            // keep going
            () => {}
          } else if (msec == 0L) {
            done = true
            receiveTimeout
          } else {
            if (onTimeout.isEmpty) {
              if (!f.isDefinedAt(TIMEOUT))
                sys.error("unhandled timeout")

              val thisActor = this
              onTimeout = Some(new TimerTask {
                def run() {
                  thisActor.send(TIMEOUT, thisActor)
                }
              })
              Actor.timer.schedule(onTimeout.get, msec)
            }

            // It is possible that !onTimeout.isEmpty, but TIMEOUT is not yet in mailbox
            // See SI-4759
            waitingFor = f
            received = None
            isSuspended = true
            scheduler.managedBlock(blocker)
            drainSendBuffer(mailbox)
            // keep going
            () => {}
          }
        }
        todo()
      } else {
        synchronized {
          if (!onTimeout.isEmpty) {
            onTimeout.get.cancel()
            onTimeout = None
          }
        }
        received = Some(qel.msg)
        senders = qel.session :: senders
        done = true
      }
    }

    val result = f(received.get)
    received = None
    senders = senders.tail
    result
  }

  /** See the companion object's `react` method. */
  override def react(handler: PartialFunction[Any, Unit]): Nothing = {
    synchronized {
      if (shouldExit) exit()
    }
    super.react(handler)
  }

  /** See the companion object's `reactWithin` method. */
  override def reactWithin(msec: Long)(handler: PartialFunction[Any, Unit]): Nothing = {
    synchronized {
      if (shouldExit) exit()
    }
    super.reactWithin(msec)(handler)
  }

  /** Receives the next message from the mailbox */
  def ? : Any = receive {
    case x => x
  }

  // guarded by lock of this
  // never throws SuspendActorControl
  private[actors] override def scheduleActor(f: PartialFunction[Any, Any], msg: Any) =
    if (f eq null) {
      // do nothing (timeout is handled instead)
    }
    else {
      val task = new ActorTask(this, null, f, msg)
      scheduler executeFromActor task
    }

  /* Used for notifying scheduler when blocking inside receive/receiveWithin. */
  private object blocker extends scala.concurrent.ManagedBlocker {
    def block() = {
      Actor.this.suspendActor()
      true
    }
    def isReleasable =
      !Actor.this.isSuspended
  }

  private def suspendActor() = synchronized {
    while (isSuspended) {
      try {
        wait()
      } catch {
        case _: InterruptedException =>
      }
    }
    // links: check if we should exit
    if (shouldExit) exit()
  }

  private def resumeActor() {
    isSuspended = false
    notify()
  }

  private[actors] override def exiting = synchronized {
    _state == Actor.State.Terminated
  }

  // guarded by this
  private[actors] override def dostart() {
    // Reset various flags.
    //
    // Note that we do *not* reset `trapExit`. The reason is that
    // users should be able to set the field in the constructor
    // and before `act` is called.
    exitReason = 'normal
    shouldExit = false

    super.dostart()
  }

  override def start(): Actor = synchronized {
    super.start()
    this
  }

  /** State of this actor */
  override def getState: Actor.State.Value = synchronized {
    if (isSuspended) {
      if (onTimeout.isEmpty)
        Actor.State.Blocked
      else
        Actor.State.TimedBlocked
    } else
      super.getState
  }

  // guarded by this
  private[actors] var links: List[AbstractActor] = Nil

  /**
   * Links <code>self</code> to actor <code>to</code>.
   *
   * @param to the actor to link to
   * @return   the parameter actor
   */
  def link(to: AbstractActor): AbstractActor = {
    assert(Actor.self(scheduler) == this, "link called on actor different from self")
    this linkTo to
    to linkTo this
    to
  }

  /**
   * Links <code>self</code> to the actor defined by <code>body</code>.
   *
   * @param body the body of the actor to link to
   * @return     the parameter actor
   */
  def link(body: => Unit): Actor = {
    assert(Actor.self(scheduler) == this, "link called on actor different from self")
    val a = new Actor {
      def act() = body
      override final val scheduler: IScheduler = Actor.this.scheduler
    }
    link(a)
    a.start()
    a
  }

  private[actors] def linkTo(to: AbstractActor) = synchronized {
    links = to :: links
  }

  /**
   * Unlinks <code>self</code> from actor <code>from</code>.
   */
  def unlink(from: AbstractActor) {
    assert(Actor.self(scheduler) == this, "unlink called on actor different from self")
    this unlinkFrom from
    from unlinkFrom this
  }

  private[actors] def unlinkFrom(from: AbstractActor) = synchronized {
    links = links.filterNot(from.==)
  }

  @volatile
  var trapExit = false
  // guarded by this
  private var exitReason: AnyRef = 'normal
  // guarded by this
  private[actors] var shouldExit = false

  /**
   * <p>
   *   Terminates execution of <code>self</code> with the following
   *   effect on linked actors:
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>true</code>, send message
   *   <code>Exit(self, reason)</code> to <code>a</code>.
   * </p>
   * <p>
   *   For each linked actor <code>a</code> with
   *   <code>trapExit</code> set to <code>false</code> (default),
   *   call <code>a.exit(reason)</code> if
   *   <code>reason != 'normal</code>.
   * </p>
   */
  protected[actors] def exit(reason: AnyRef): Nothing = {
    synchronized {
      exitReason = reason
    }
    exit()
  }

  /**
   * Terminates with exit reason <code>'normal</code>.
   */
  protected[actors] override def exit(): Nothing = {
    val todo = synchronized {
      if (!links.isEmpty)
        exitLinked()
      else
        () => {}
    }
    todo()
    super.exit()
  }

  // Assume !links.isEmpty
  // guarded by this
  private[actors] def exitLinked(): () => Unit = {
    _state = Actor.State.Terminated
    // reset waitingFor, otherwise getState returns Suspended
    waitingFor = Reactor.waitingForNone
    // remove this from links
    val mylinks = links.filterNot(this.==)
    // unlink actors
    mylinks.foreach(unlinkFrom(_))
    // return closure that locks linked actors
    () => {
      mylinks.foreach((linked: AbstractActor) => {
        linked.synchronized {
          if (!linked.exiting) {
            linked.unlinkFrom(this)
            linked.exit(this, exitReason)
          }
        }
      })
    }
  }

  // Assume !links.isEmpty
  // guarded by this
  private[actors] def exitLinked(reason: AnyRef): () => Unit = {
    exitReason = reason
    exitLinked()
  }

  // Assume !this.exiting
  private[actors] def exit(from: AbstractActor, reason: AnyRef) {
    if (trapExit) {
      this ! Exit(from, reason)
    }
    else if (reason != 'normal)
      synchronized {
        shouldExit = true
        exitReason = reason
        // resume this Actor in a way that
        // causes it to exit
        // (because shouldExit == true)
        if (isSuspended)
          resumeActor()
        else if (waitingFor ne Reactor.waitingForNone) {
          waitingFor = Reactor.waitingForNone
          // it doesn't matter what partial function we are passing here
          scheduleActor(waitingFor, null)
          /* Here we should not throw a SuspendActorControl,
             since the current method is called from an actor that
             is in the process of exiting.

             Therefore, the contract for scheduleActor is that
             it never throws a SuspendActorControl.
           */
        }
      }
  }

  /* Requires qualified private, because <code>RemoteActor</code> must
   * register a termination handler.
   */
  private[actors] def onTerminate(f: => Unit) {
    scheduler.onTerminate(this) { f }
  }
}


/**
 *    Used as the timeout pattern in
 *    <a href="Actor.html#receiveWithin(Long)" target="contentFrame">
 *    <code>receiveWithin</code></a> and
 *    <a href="Actor.html#reactWithin(Long)" target="contentFrame">
 *    <code>reactWithin</code></a>.
 *
 * @example {{{
 *    receiveWithin(500) {
 *      case (x, y) => ...
 *      case TIMEOUT => ...
 *    }
 * }}}
 *
 *  @author Philipp Haller
 */
case object TIMEOUT


/** Sent to an actor
 *  with `trapExit` set to `true` whenever one of its linked actors
 *  terminates.
 *
 *  @param from   the actor that terminated
 *  @param reason the reason that caused the actor to terminate
 */
case class Exit(from: AbstractActor, reason: AnyRef)

/** Manages control flow of actor executions.
 *
 * @author Philipp Haller
 */
private[actors] class SuspendActorControl extends ControlThrowable