diff options
Diffstat (limited to 'cask/actor/src/Context.scala')
-rw-r--r-- | cask/actor/src/Context.scala | 190 |
1 files changed, 0 insertions, 190 deletions
diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala deleted file mode 100644 index 6b56f2e..0000000 --- a/cask/actor/src/Context.scala +++ /dev/null @@ -1,190 +0,0 @@ -package cask.actor -import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise} -import scala.util.Try - -/** - * An extended `scala.concurrent.ExecutionContext`; provides the ability to - * schedule messages to be sent later, and hooks to track the current number of - * outstanding tasks or log the actor message sends for debugging purporses - */ -trait Context extends ExecutionContext { - def reportSchedule(): Context.Token = new Context.Token.Simple() - - def reportSchedule(fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - new Context.Token.Future(fileName, line) - } - - def reportSchedule(a: Actor[_], - msg: Any, - fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - new Context.Token.Send(a, msg, fileName, line) - } - - def reportRun(a: Actor[_], - msg: Any, - token: Context.Token): Unit = () - - def reportComplete(token: Context.Token): Unit = () - - def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit - - def future[T](t: => T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Future[T] - - def execute(runnable: Runnable): Unit -} - -object Context{ - trait Token - object Token{ - class Simple extends Token(){ - override def toString = "token@" + Integer.toHexString(hashCode()) - } - - class Future(val fileName: sourcecode.FileName, - val line: sourcecode.Line) extends Token(){ - override def toString = - "token@" + Integer.toHexString(hashCode()) + "@" + - fileName.value + ":" + line.value - } - - class Send(val a: Actor[_], - val msg: Any, - val fileName: sourcecode.FileName, - val line: sourcecode.Line) extends Token(){ - override def toString = - "token@" + Integer.toHexString(hashCode()) + "@" + - fileName.value + ":" + line.value - } - } - - class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl { - def executionContext = ec - def reportFailure(t: Throwable) = logEx(t) - } - - object Simple{ - implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace()) - } - - class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global, - logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl { - private[this] val active = collection.mutable.Set.empty[Context.Token] - private[this] var promise = concurrent.Promise.successful[Unit](()) - - def executionContext = ec - - def reportFailure(t: Throwable) = logEx(t) - - def handleReportSchedule(token: Context.Token) = synchronized{ - if (active.isEmpty) { - assert(promise.isCompleted) - promise = concurrent.Promise[Unit] - } - active.add(token) - token - } - override def reportSchedule() = { - handleReportSchedule(super.reportSchedule()) - } - override def reportSchedule(fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - handleReportSchedule(super.reportSchedule(fileName, line)) - } - - override def reportSchedule(a: Actor[_], - msg: Any, - fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - handleReportSchedule(super.reportSchedule(a, msg, fileName, line)) - } - - override def reportComplete(token: Context.Token) = this.synchronized{ - assert(active.remove(token)) - - if (active.isEmpty) promise.success(()) - } - - def waitForInactivity(timeout: Option[java.time.Duration] = None) = { - Await.result( - this.synchronized(promise).future, - timeout match{ - case None => scala.concurrent.duration.Duration.Inf - case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos) - } - ) - } - } - - trait Impl extends Context { - def executionContext: ExecutionContext - - def execute(runnable: Runnable): Unit = { - val token = reportSchedule() - executionContext.execute(new Runnable { - def run(): Unit = { - try runnable.run() - finally reportComplete(token) - } - }) - } - - def future[T](t: => T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Future[T] = { - val token = reportSchedule(fileName, line) - val p = Promise[T] - executionContext.execute(new Runnable { - def run(): Unit = { - p.complete(scala.util.Try(t)) - reportComplete(token) - } - }) - p.future - } - - lazy val scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory { - def newThread(r: Runnable): Thread = { - val t = new Thread(r, "ActorContext-Scheduler-Thread") - t.setDaemon(true) - t - } - } - ) - - def scheduleMsg[T](a: Actor[T], - msg: T, delay: java.time.Duration) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line) = { - val token = reportSchedule(a, msg, fileName, line) - scheduler.schedule[Unit]( - () => { - a.send(msg)(fileName, line) - reportComplete(token) - }, - delay.toMillis, - TimeUnit.MILLISECONDS - ) - } - } - -} - -trait Actor[T]{ - def send(t: T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit - - def sendAsync(f: scala.concurrent.Future[T]) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit -} |