package cask.actor import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} import scala.concurrent.duration.Duration import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise} import scala.util.Try /** * An extended `scala.concurrent.ExecutionContext`; provides the ability to * schedule messages to be sent later, and hooks to track the current number of * outstanding tasks or log the actor message sends for debugging purporses */ trait Context extends ExecutionContext { def reportSchedule(): Context.Token = new Context.Token.Simple() def reportSchedule(fileName: sourcecode.FileName, line: sourcecode.Line): Context.Token = { new Context.Token.Future(fileName, line) } def reportSchedule(a: Actor[_], msg: Any, fileName: sourcecode.FileName, line: sourcecode.Line): Context.Token = { new Context.Token.Send(a, msg, fileName, line) } def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = () def reportComplete(token: Context.Token): Unit = () def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration) (implicit fileName: sourcecode.FileName, line: sourcecode.Line): Unit def future[T](t: => T) (implicit fileName: sourcecode.FileName, line: sourcecode.Line): Future[T] def execute(runnable: Runnable): Unit } object Context{ trait Token object Token{ class Simple extends Token(){ override def toString = "token@" + Integer.toHexString(hashCode()) } class Future(val fileName: sourcecode.FileName, val line: sourcecode.Line) extends Token(){ override def toString = "token@" + Integer.toHexString(hashCode()) + "@" + fileName.value + ":" + line.value } class Send(val a: Actor[_], val msg: Any, val fileName: sourcecode.FileName, val line: sourcecode.Line) extends Token(){ override def toString = "token@" + Integer.toHexString(hashCode()) + "@" + fileName.value + ":" + line.value } } class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl { def executionContext = ec def reportFailure(t: Throwable) = logEx(t) } object Simple{ implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace()) } class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global, logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl { private[this] val active = collection.mutable.Set.empty[Context.Token] private[this] var promise = concurrent.Promise.successful[Unit](()) def executionContext = ec def reportFailure(t: Throwable) = logEx(t) def handleReportSchedule(token: Context.Token) = synchronized{ if (active.isEmpty) { assert(promise.isCompleted) promise = concurrent.Promise[Unit] } active.add(token) token } override def reportSchedule() = { handleReportSchedule(super.reportSchedule()) } override def reportSchedule(fileName: sourcecode.FileName, line: sourcecode.Line): Context.Token = { handleReportSchedule(super.reportSchedule(fileName, line)) } override def reportSchedule(a: Actor[_], msg: Any, fileName: sourcecode.FileName, line: sourcecode.Line): Context.Token = { handleReportSchedule(super.reportSchedule(a, msg, fileName, line)) } override def reportComplete(token: Context.Token) = this.synchronized{ assert(active.remove(token)) if (active.isEmpty) promise.success(()) } def waitForInactivity(timeout: Option[java.time.Duration] = None) = { Await.result( this.synchronized(promise).future, timeout match{ case None => scala.concurrent.duration.Duration.Inf case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos) } ) } } trait Impl extends Context { def executionContext: ExecutionContext def execute(runnable: Runnable): Unit = { val token = reportSchedule() executionContext.execute(new Runnable { def run(): Unit = { try runnable.run() finally reportComplete(token) } }) } def future[T](t: => T) (implicit fileName: sourcecode.FileName, line: sourcecode.Line): Future[T] = { val token = reportSchedule(fileName, line) val p = Promise[T] executionContext.execute(new Runnable { def run(): Unit = { p.complete(scala.util.Try(t)) reportComplete(token) } }) p.future } lazy val scheduler = Executors.newSingleThreadScheduledExecutor( new ThreadFactory { def newThread(r: Runnable): Thread = { val t = new Thread(r, "ActorContext-Scheduler-Thread") t.setDaemon(true) t } } ) def scheduleMsg[T](a: Actor[T], msg: T, delay: java.time.Duration) (implicit fileName: sourcecode.FileName, line: sourcecode.Line) = { val token = reportSchedule(a, msg, fileName, line) scheduler.schedule[Unit]( () => { a.send(msg)(fileName, line) reportComplete(token) }, delay.toMillis, TimeUnit.MILLISECONDS ) } } } trait Actor[T]{ def send(t: T) (implicit fileName: sourcecode.FileName, line: sourcecode.Line): Unit def sendAsync(f: scala.concurrent.Future[T]) (implicit fileName: sourcecode.FileName, line: sourcecode.Line): Unit }