summaryrefslogblamecommitdiff
path: root/src/actors/scala/actors/Actor.scala
blob: 7159cb890e53bd1b2d1a726c603ddb8c19836507 (plain) (tree)












































































































































































































































































                                                                                      
package scala.actors

object Actor {

  private[actors] val selfs = new java.util.WeakHashMap(16, 0.5f)

  def self: Actor = synchronized {
    val t = Thread.currentThread()
    if (t.isInstanceOf[ActorThread])
      t.asInstanceOf[ActorThread]
    else {
      var a = selfs.get(t).asInstanceOf[Actor]
      if (a == null) {
        a = new ActorProxy(t)
        selfs.put(t, a)
      }
      a
    }
  }

  def actor(body: => Unit): ActorThread = synchronized {
    val actor = new ActorThread {
      override def run(): Unit = {
        body
        this.kill()
      }
    }
    actor.start()
    actor
  }

  def reactor(body: => Unit): Reactor = synchronized {
    val reactor = new Reactor {
      override def run(): Unit = body
    }
    reactor.start()
    reactor
  }

  def receive[a](f: PartialFunction[Any, a]): a =
    self.in.receive(f)

  def receiveWithin[R](msec: long)(f: PartialFunction[Any, R]): R =
    self.in.receiveWithin(msec)(f)

  def react(f: PartialFunction[Any, Unit]): Nothing =
    self.in.react(f)

  def reactWithin(msec: long)(f: PartialFunction[Any, Unit]): Nothing =
    self.in.reactWithin(msec)(f)

  def eventloop(f: PartialFunction[Any, Unit]): Nothing =
    self.in.react(new RecursiveProxyHandler(self, f))

  private class RecursiveProxyHandler(a: Actor, f: PartialFunction[Any, Unit])
          extends PartialFunction[Any, Unit] {
    def isDefinedAt(m: Any): boolean =
      true // events are immediately removed from the mailbox
    def apply(m: Any): Unit = {
      if (f.isDefinedAt(m)) f(m)
      self.in.react(this)
    }
  }

  def from(r: Actor): FromReceive =
    new FromReceive(r)

  private[actors] class FromReceive(r: Actor) {
    def receive[a](f: PartialFunction[Any, a]): a =
      self.in.receiveFrom(r)(f)
  }

  def sender: Actor = self.sender

  def reply(msg: Any): Unit = sender.reply ! msg

  def reply(): Unit = reply(())

  def forward(msg: Any): Unit = self.in.forward(msg)

  private[actors] trait Body[T] {
    def orElse(other: => T): T
    def andThen(other: => T): T
  }

  implicit def mkBody(body: => Unit) = new Body[Unit] {
    def orElse(other: => Unit): Unit = choose(body, other)
    def andThen(other: => Unit): Unit = seq(body, other)
  }

  def choose(alt1: => Unit, alt2: => Unit): Unit = {
    val s = self
    // save former custom suspendActor function
    // (e.g. from further orElse)
    val suspendNext = s.suspendActor
    val detachNext = s.detachActor

    // have to get out of the point of suspend in alt1's
    // receive
    s.suspendActor = () => { throw new SuspendActorException }
    s.detachActor = f => { throw new SuspendActorException }

    try { alt1 } catch {
      case d: SuspendActorException => {
        s.suspendActor = suspendNext
        s.detachActor = detachNext
        alt2
      }
    }
  }

  def loop(b: => Unit): Unit = {
    val s = self
    s.kill = () => { b; s.kill() }
    b
  }

  def seq(b1: => Unit, b2: => Unit): Unit = {
    val s = self
    s.kill = () => { b2 }
    b1
  }
}

trait Actor {

  private[actors] val in = new Channel[Any]
  in.receiver = this

  private var rc: Channel[Any] = null
  private[actors] def reply: Channel[Any] = {
    if (rc == null) {
      rc = new Channel[Any]
      rc.receiver = this
    }
    rc
  }

  private[actors] def freshReply(): Unit = {
    rc = new Channel[Any]
    rc.receiver = this
  }

  def !(msg: Any): Unit = in ! msg
  def !?(msg: Any): Any = in !? msg

  private[actors] def sender: Actor
  private[actors] def pushSender(sender: Actor): unit
  private[actors] def popSender(): unit

  private[actors] var kill: () => Unit = _
  private[actors] var suspendActor: () => unit = _
  private[actors] var suspendActorFor: long => unit = _
  private[actors] var detachActor: PartialFunction[Any, unit] => unit = _

  private[actors] def scheduleActor(f: PartialFunction[Any, Unit], msg: Any)

  private[actors] def isThreaded: boolean
  private[actors] def resetActor(): unit

  resetActor()
}

class ActorThread extends Thread with ThreadedActor

class ActorProxy(t: Thread) extends ThreadedActor


object RemoteActor {
  import remote.NetKernel
  import remote.TcpService

  private val kernels = new scala.collection.mutable.HashMap[Actor, NetKernel]

  def alive(port: int): Unit = {
    val serv = new TcpService(port)
    serv.start()
    kernels += Actor.self -> serv.kernel
  }

  def register(name: Symbol, a: Actor): Unit = {
    val kernel = kernels.get(Actor.self) match {
      case None => {
        val serv = new TcpService(TcpService.generatePort)
        serv.start()
        kernels += Actor.self -> serv.kernel
        serv.kernel
      }
      case Some(k) => k
    }
    kernel.register(name, a)
  }

  def select(node: Node, name: Symbol): Actor =
    new Reactor {
      override def !(msg: Any): Unit = msg match {
        case a: AnyRef => {
          // establish remotely accessible
          // return path (sender)
          val kernel = kernels.get(Actor.self) match {
            case None => {
              val serv = new TcpService(TcpService.generatePort)
              serv.start()
              kernels += Actor.self -> serv.kernel
              serv.kernel
            }
            case Some(k) => k
          }
          kernel.send(node, name, a)
        }
        case other =>
          error("Cannot send non-AnyRef value remotely.")
      }
      override def !?(msg: Any): Any =
        error("!? not implemented for remote actors.")
    }
}

abstract class Node
case class TcpNode(address: String, port: Int) extends Node
case class JxtaNode(group: String) extends Node


private[actors] abstract class MessageQueueResult[Msg] {
  def msg: Msg
  def sender: Actor
}

private[actors] class MessageQueue[Msg] extends MessageQueueResult[Msg] {
  var msg: Msg = _
  var sender: Actor = _
  private var next: MessageQueue[Msg] = this

  def append(msg: Msg, sender: Actor) = {
    val q = new MessageQueue[Msg]
    q.msg = msg
    q.sender = sender
    q.next = next
    next = q
  }

  def extractFirst(p: Msg => boolean): MessageQueueResult[Msg] = {
    var q = this
    var qnext = q.next
    while (qnext != this) {
      if (p(qnext.msg)) {
        q.next = qnext.next
        return qnext
      }
      q = qnext
      qnext = qnext.next
    }
    null
  }

  def dequeueFirst(p: MessageQueueResult[Msg] => boolean): MessageQueueResult[Msg] = {
    var q = this
    var qnext = q.next
    while (qnext != this) {
      if (p(qnext)) {
        q.next = qnext.next
        return qnext
      }
      q = qnext
      qnext = qnext.next
    }
    null
  }
}