diff options
Diffstat (limited to 'cask/actor/src/Context.scala')
-rw-r--r-- | cask/actor/src/Context.scala | 190 |
1 files changed, 190 insertions, 0 deletions
diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala new file mode 100644 index 0000000..6b56f2e --- /dev/null +++ b/cask/actor/src/Context.scala @@ -0,0 +1,190 @@ +package cask.actor +import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise} +import scala.util.Try + +/** + * An extended `scala.concurrent.ExecutionContext`; provides the ability to + * schedule messages to be sent later, and hooks to track the current number of + * outstanding tasks or log the actor message sends for debugging purporses + */ +trait Context extends ExecutionContext { + def reportSchedule(): Context.Token = new Context.Token.Simple() + + def reportSchedule(fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + new Context.Token.Future(fileName, line) + } + + def reportSchedule(a: Actor[_], + msg: Any, + fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + new Context.Token.Send(a, msg, fileName, line) + } + + def reportRun(a: Actor[_], + msg: Any, + token: Context.Token): Unit = () + + def reportComplete(token: Context.Token): Unit = () + + def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit + + def future[T](t: => T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Future[T] + + def execute(runnable: Runnable): Unit +} + +object Context{ + trait Token + object Token{ + class Simple extends Token(){ + override def toString = "token@" + Integer.toHexString(hashCode()) + } + + class Future(val fileName: sourcecode.FileName, + val line: sourcecode.Line) extends Token(){ + override def toString = + "token@" + Integer.toHexString(hashCode()) + "@" + + fileName.value + ":" + line.value + } + + class Send(val a: Actor[_], + val msg: Any, + val fileName: sourcecode.FileName, + val line: sourcecode.Line) extends Token(){ + override def toString = + "token@" + Integer.toHexString(hashCode()) + "@" + + fileName.value + ":" + line.value + } + } + + class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl { + def executionContext = ec + def reportFailure(t: Throwable) = logEx(t) + } + + object Simple{ + implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace()) + } + + class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global, + logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl { + private[this] val active = collection.mutable.Set.empty[Context.Token] + private[this] var promise = concurrent.Promise.successful[Unit](()) + + def executionContext = ec + + def reportFailure(t: Throwable) = logEx(t) + + def handleReportSchedule(token: Context.Token) = synchronized{ + if (active.isEmpty) { + assert(promise.isCompleted) + promise = concurrent.Promise[Unit] + } + active.add(token) + token + } + override def reportSchedule() = { + handleReportSchedule(super.reportSchedule()) + } + override def reportSchedule(fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + handleReportSchedule(super.reportSchedule(fileName, line)) + } + + override def reportSchedule(a: Actor[_], + msg: Any, + fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + handleReportSchedule(super.reportSchedule(a, msg, fileName, line)) + } + + override def reportComplete(token: Context.Token) = this.synchronized{ + assert(active.remove(token)) + + if (active.isEmpty) promise.success(()) + } + + def waitForInactivity(timeout: Option[java.time.Duration] = None) = { + Await.result( + this.synchronized(promise).future, + timeout match{ + case None => scala.concurrent.duration.Duration.Inf + case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos) + } + ) + } + } + + trait Impl extends Context { + def executionContext: ExecutionContext + + def execute(runnable: Runnable): Unit = { + val token = reportSchedule() + executionContext.execute(new Runnable { + def run(): Unit = { + try runnable.run() + finally reportComplete(token) + } + }) + } + + def future[T](t: => T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Future[T] = { + val token = reportSchedule(fileName, line) + val p = Promise[T] + executionContext.execute(new Runnable { + def run(): Unit = { + p.complete(scala.util.Try(t)) + reportComplete(token) + } + }) + p.future + } + + lazy val scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory { + def newThread(r: Runnable): Thread = { + val t = new Thread(r, "ActorContext-Scheduler-Thread") + t.setDaemon(true) + t + } + } + ) + + def scheduleMsg[T](a: Actor[T], + msg: T, delay: java.time.Duration) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line) = { + val token = reportSchedule(a, msg, fileName, line) + scheduler.schedule[Unit]( + () => { + a.send(msg)(fileName, line) + reportComplete(token) + }, + delay.toMillis, + TimeUnit.MILLISECONDS + ) + } + } + +} + +trait Actor[T]{ + def send(t: T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit + + def sendAsync(f: scala.concurrent.Future[T]) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit +} |