summaryrefslogblamecommitdiff
path: root/cask/actor/src/Context.scala
blob: 6b56f2e12fc56ff239a4c1eb7546f0ca472816c0 (plain) (tree)





























































































































































































                                                                                                                     
package cask.actor
import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}

import scala.concurrent.duration.Duration
import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise}
import scala.util.Try

/**
 * An extended `scala.concurrent.ExecutionContext`; provides the ability to
 * schedule messages to be sent later, and hooks to track the current number of
 * outstanding tasks or log the actor message sends for debugging purporses
 */
trait Context extends ExecutionContext {
  def reportSchedule(): Context.Token = new Context.Token.Simple()

  def reportSchedule(fileName: sourcecode.FileName,
                     line: sourcecode.Line): Context.Token = {
    new Context.Token.Future(fileName, line)
  }

  def reportSchedule(a: Actor[_],
                     msg: Any,
                     fileName: sourcecode.FileName,
                     line: sourcecode.Line): Context.Token = {
    new Context.Token.Send(a, msg, fileName, line)
  }

  def reportRun(a: Actor[_],
                msg: Any,
                token: Context.Token): Unit = ()

  def reportComplete(token: Context.Token): Unit = ()

  def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
                    (implicit fileName: sourcecode.FileName,
                     line: sourcecode.Line): Unit

  def future[T](t: => T)
               (implicit fileName: sourcecode.FileName,
                line: sourcecode.Line): Future[T]

  def execute(runnable: Runnable): Unit
}

object Context{
  trait Token
  object Token{
    class Simple extends Token(){
      override def toString = "token@" + Integer.toHexString(hashCode())
    }

    class Future(val fileName: sourcecode.FileName,
                 val line: sourcecode.Line) extends Token(){
      override def toString =
        "token@" + Integer.toHexString(hashCode()) + "@" +
        fileName.value + ":" + line.value
    }

    class Send(val a: Actor[_],
               val msg: Any,
               val fileName: sourcecode.FileName,
               val line: sourcecode.Line) extends Token(){
      override def toString =
        "token@" + Integer.toHexString(hashCode()) + "@" +
        fileName.value + ":" + line.value
    }
  }

  class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl {
    def executionContext = ec
    def reportFailure(t: Throwable) = logEx(t)
  }

  object Simple{
    implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace())
  }

  class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global,
             logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl {
    private[this] val active = collection.mutable.Set.empty[Context.Token]
    private[this] var promise = concurrent.Promise.successful[Unit](())

    def executionContext = ec

    def reportFailure(t: Throwable) = logEx(t)

    def handleReportSchedule(token: Context.Token) = synchronized{
      if (active.isEmpty) {
        assert(promise.isCompleted)
        promise = concurrent.Promise[Unit]
      }
      active.add(token)
      token
    }
    override def reportSchedule() = {
      handleReportSchedule(super.reportSchedule())
    }
    override def reportSchedule(fileName: sourcecode.FileName,
                                line: sourcecode.Line): Context.Token = {
      handleReportSchedule(super.reportSchedule(fileName, line))
    }

    override def reportSchedule(a: Actor[_],
                                msg: Any,
                                fileName: sourcecode.FileName,
                                line: sourcecode.Line): Context.Token = {
      handleReportSchedule(super.reportSchedule(a, msg, fileName, line))
    }

    override def reportComplete(token: Context.Token) = this.synchronized{
      assert(active.remove(token))

      if (active.isEmpty) promise.success(())
    }

    def waitForInactivity(timeout: Option[java.time.Duration] = None) = {
      Await.result(
        this.synchronized(promise).future,
        timeout match{
          case None => scala.concurrent.duration.Duration.Inf
          case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos)
        }
      )
    }
  }

  trait Impl extends Context {
    def executionContext: ExecutionContext

    def execute(runnable: Runnable): Unit = {
      val token = reportSchedule()
      executionContext.execute(new Runnable {
        def run(): Unit = {
          try runnable.run()
          finally reportComplete(token)
        }
      })
    }

    def future[T](t: => T)
                 (implicit fileName: sourcecode.FileName,
                  line: sourcecode.Line): Future[T] = {
      val token = reportSchedule(fileName, line)
      val p = Promise[T]
      executionContext.execute(new Runnable {
        def run(): Unit = {
          p.complete(scala.util.Try(t))
          reportComplete(token)
        }
      })
      p.future
    }

    lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
      new ThreadFactory {
        def newThread(r: Runnable): Thread = {
          val t = new Thread(r, "ActorContext-Scheduler-Thread")
          t.setDaemon(true)
          t
        }
      }
    )

    def scheduleMsg[T](a: Actor[T],
                       msg: T, delay: java.time.Duration)
                      (implicit fileName: sourcecode.FileName,
                       line: sourcecode.Line) = {
      val token = reportSchedule(a, msg, fileName, line)
      scheduler.schedule[Unit](
        () => {
          a.send(msg)(fileName, line)
          reportComplete(token)
        },
        delay.toMillis,
        TimeUnit.MILLISECONDS
      )
    }
  }

}

trait Actor[T]{
  def send(t: T)
          (implicit fileName: sourcecode.FileName,
           line: sourcecode.Line): Unit

  def sendAsync(f: scala.concurrent.Future[T])
               (implicit fileName: sourcecode.FileName,
                line: sourcecode.Line): Unit
}