summaryrefslogtreecommitdiff
path: root/cask/actor/src/Context.scala
diff options
context:
space:
mode:
Diffstat (limited to 'cask/actor/src/Context.scala')
-rw-r--r--cask/actor/src/Context.scala190
1 files changed, 0 insertions, 190 deletions
diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala
deleted file mode 100644
index 6b56f2e..0000000
--- a/cask/actor/src/Context.scala
+++ /dev/null
@@ -1,190 +0,0 @@
-package cask.actor
-import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
-
-import scala.concurrent.duration.Duration
-import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise}
-import scala.util.Try
-
-/**
- * An extended `scala.concurrent.ExecutionContext`; provides the ability to
- * schedule messages to be sent later, and hooks to track the current number of
- * outstanding tasks or log the actor message sends for debugging purporses
- */
-trait Context extends ExecutionContext {
- def reportSchedule(): Context.Token = new Context.Token.Simple()
-
- def reportSchedule(fileName: sourcecode.FileName,
- line: sourcecode.Line): Context.Token = {
- new Context.Token.Future(fileName, line)
- }
-
- def reportSchedule(a: Actor[_],
- msg: Any,
- fileName: sourcecode.FileName,
- line: sourcecode.Line): Context.Token = {
- new Context.Token.Send(a, msg, fileName, line)
- }
-
- def reportRun(a: Actor[_],
- msg: Any,
- token: Context.Token): Unit = ()
-
- def reportComplete(token: Context.Token): Unit = ()
-
- def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Unit
-
- def future[T](t: => T)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Future[T]
-
- def execute(runnable: Runnable): Unit
-}
-
-object Context{
- trait Token
- object Token{
- class Simple extends Token(){
- override def toString = "token@" + Integer.toHexString(hashCode())
- }
-
- class Future(val fileName: sourcecode.FileName,
- val line: sourcecode.Line) extends Token(){
- override def toString =
- "token@" + Integer.toHexString(hashCode()) + "@" +
- fileName.value + ":" + line.value
- }
-
- class Send(val a: Actor[_],
- val msg: Any,
- val fileName: sourcecode.FileName,
- val line: sourcecode.Line) extends Token(){
- override def toString =
- "token@" + Integer.toHexString(hashCode()) + "@" +
- fileName.value + ":" + line.value
- }
- }
-
- class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl {
- def executionContext = ec
- def reportFailure(t: Throwable) = logEx(t)
- }
-
- object Simple{
- implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace())
- }
-
- class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global,
- logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl {
- private[this] val active = collection.mutable.Set.empty[Context.Token]
- private[this] var promise = concurrent.Promise.successful[Unit](())
-
- def executionContext = ec
-
- def reportFailure(t: Throwable) = logEx(t)
-
- def handleReportSchedule(token: Context.Token) = synchronized{
- if (active.isEmpty) {
- assert(promise.isCompleted)
- promise = concurrent.Promise[Unit]
- }
- active.add(token)
- token
- }
- override def reportSchedule() = {
- handleReportSchedule(super.reportSchedule())
- }
- override def reportSchedule(fileName: sourcecode.FileName,
- line: sourcecode.Line): Context.Token = {
- handleReportSchedule(super.reportSchedule(fileName, line))
- }
-
- override def reportSchedule(a: Actor[_],
- msg: Any,
- fileName: sourcecode.FileName,
- line: sourcecode.Line): Context.Token = {
- handleReportSchedule(super.reportSchedule(a, msg, fileName, line))
- }
-
- override def reportComplete(token: Context.Token) = this.synchronized{
- assert(active.remove(token))
-
- if (active.isEmpty) promise.success(())
- }
-
- def waitForInactivity(timeout: Option[java.time.Duration] = None) = {
- Await.result(
- this.synchronized(promise).future,
- timeout match{
- case None => scala.concurrent.duration.Duration.Inf
- case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos)
- }
- )
- }
- }
-
- trait Impl extends Context {
- def executionContext: ExecutionContext
-
- def execute(runnable: Runnable): Unit = {
- val token = reportSchedule()
- executionContext.execute(new Runnable {
- def run(): Unit = {
- try runnable.run()
- finally reportComplete(token)
- }
- })
- }
-
- def future[T](t: => T)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Future[T] = {
- val token = reportSchedule(fileName, line)
- val p = Promise[T]
- executionContext.execute(new Runnable {
- def run(): Unit = {
- p.complete(scala.util.Try(t))
- reportComplete(token)
- }
- })
- p.future
- }
-
- lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory {
- def newThread(r: Runnable): Thread = {
- val t = new Thread(r, "ActorContext-Scheduler-Thread")
- t.setDaemon(true)
- t
- }
- }
- )
-
- def scheduleMsg[T](a: Actor[T],
- msg: T, delay: java.time.Duration)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line) = {
- val token = reportSchedule(a, msg, fileName, line)
- scheduler.schedule[Unit](
- () => {
- a.send(msg)(fileName, line)
- reportComplete(token)
- },
- delay.toMillis,
- TimeUnit.MILLISECONDS
- )
- }
- }
-
-}
-
-trait Actor[T]{
- def send(t: T)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Unit
-
- def sendAsync(f: scala.concurrent.Future[T])
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Unit
-}