summaryrefslogtreecommitdiff
path: root/cask/actor/src/Context.scala
diff options
context:
space:
mode:
Diffstat (limited to 'cask/actor/src/Context.scala')
-rw-r--r--cask/actor/src/Context.scala190
1 files changed, 190 insertions, 0 deletions
diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala
new file mode 100644
index 0000000..6b56f2e
--- /dev/null
+++ b/cask/actor/src/Context.scala
@@ -0,0 +1,190 @@
+package cask.actor
+import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise}
+import scala.util.Try
+
+/**
+ * An extended `scala.concurrent.ExecutionContext`; provides the ability to
+ * schedule messages to be sent later, and hooks to track the current number of
+ * outstanding tasks or log the actor message sends for debugging purporses
+ */
+trait Context extends ExecutionContext {
+ def reportSchedule(): Context.Token = new Context.Token.Simple()
+
+ def reportSchedule(fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ new Context.Token.Future(fileName, line)
+ }
+
+ def reportSchedule(a: Actor[_],
+ msg: Any,
+ fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ new Context.Token.Send(a, msg, fileName, line)
+ }
+
+ def reportRun(a: Actor[_],
+ msg: Any,
+ token: Context.Token): Unit = ()
+
+ def reportComplete(token: Context.Token): Unit = ()
+
+ def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+
+ def future[T](t: => T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Future[T]
+
+ def execute(runnable: Runnable): Unit
+}
+
+object Context{
+ trait Token
+ object Token{
+ class Simple extends Token(){
+ override def toString = "token@" + Integer.toHexString(hashCode())
+ }
+
+ class Future(val fileName: sourcecode.FileName,
+ val line: sourcecode.Line) extends Token(){
+ override def toString =
+ "token@" + Integer.toHexString(hashCode()) + "@" +
+ fileName.value + ":" + line.value
+ }
+
+ class Send(val a: Actor[_],
+ val msg: Any,
+ val fileName: sourcecode.FileName,
+ val line: sourcecode.Line) extends Token(){
+ override def toString =
+ "token@" + Integer.toHexString(hashCode()) + "@" +
+ fileName.value + ":" + line.value
+ }
+ }
+
+ class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl {
+ def executionContext = ec
+ def reportFailure(t: Throwable) = logEx(t)
+ }
+
+ object Simple{
+ implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace())
+ }
+
+ class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global,
+ logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl {
+ private[this] val active = collection.mutable.Set.empty[Context.Token]
+ private[this] var promise = concurrent.Promise.successful[Unit](())
+
+ def executionContext = ec
+
+ def reportFailure(t: Throwable) = logEx(t)
+
+ def handleReportSchedule(token: Context.Token) = synchronized{
+ if (active.isEmpty) {
+ assert(promise.isCompleted)
+ promise = concurrent.Promise[Unit]
+ }
+ active.add(token)
+ token
+ }
+ override def reportSchedule() = {
+ handleReportSchedule(super.reportSchedule())
+ }
+ override def reportSchedule(fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ handleReportSchedule(super.reportSchedule(fileName, line))
+ }
+
+ override def reportSchedule(a: Actor[_],
+ msg: Any,
+ fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ handleReportSchedule(super.reportSchedule(a, msg, fileName, line))
+ }
+
+ override def reportComplete(token: Context.Token) = this.synchronized{
+ assert(active.remove(token))
+
+ if (active.isEmpty) promise.success(())
+ }
+
+ def waitForInactivity(timeout: Option[java.time.Duration] = None) = {
+ Await.result(
+ this.synchronized(promise).future,
+ timeout match{
+ case None => scala.concurrent.duration.Duration.Inf
+ case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos)
+ }
+ )
+ }
+ }
+
+ trait Impl extends Context {
+ def executionContext: ExecutionContext
+
+ def execute(runnable: Runnable): Unit = {
+ val token = reportSchedule()
+ executionContext.execute(new Runnable {
+ def run(): Unit = {
+ try runnable.run()
+ finally reportComplete(token)
+ }
+ })
+ }
+
+ def future[T](t: => T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Future[T] = {
+ val token = reportSchedule(fileName, line)
+ val p = Promise[T]
+ executionContext.execute(new Runnable {
+ def run(): Unit = {
+ p.complete(scala.util.Try(t))
+ reportComplete(token)
+ }
+ })
+ p.future
+ }
+
+ lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ val t = new Thread(r, "ActorContext-Scheduler-Thread")
+ t.setDaemon(true)
+ t
+ }
+ }
+ )
+
+ def scheduleMsg[T](a: Actor[T],
+ msg: T, delay: java.time.Duration)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line) = {
+ val token = reportSchedule(a, msg, fileName, line)
+ scheduler.schedule[Unit](
+ () => {
+ a.send(msg)(fileName, line)
+ reportComplete(token)
+ },
+ delay.toMillis,
+ TimeUnit.MILLISECONDS
+ )
+ }
+ }
+
+}
+
+trait Actor[T]{
+ def send(t: T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+
+ def sendAsync(f: scala.concurrent.Future[T])
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+}