From 9be09d4f6961e80ef3fccda1e82dc063d395494f Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Thu, 7 Nov 2019 13:07:06 +0800 Subject: 0.3.6 --- cask/actor/src/Actors.scala | 86 --------------------------------------------- 1 file changed, 86 deletions(-) delete mode 100644 cask/actor/src/Actors.scala (limited to 'cask/actor/src/Actors.scala') diff --git a/cask/actor/src/Actors.scala b/cask/actor/src/Actors.scala deleted file mode 100644 index 50b3b4e..0000000 --- a/cask/actor/src/Actors.scala +++ /dev/null @@ -1,86 +0,0 @@ -package cask.actor -import collection.mutable - -abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{ - private val queue = new mutable.Queue[(T, Context.Token)]() - - private var scheduled = false - - def send(t: T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit = synchronized{ - val token = ac.reportSchedule(this, t, fileName, line) - queue.enqueue((t, token)) - if (!scheduled){ - scheduled = true - ac.execute(() => runWithItems()) - } - } - def sendAsync(f: scala.concurrent.Future[T]) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line) = { - f.onComplete{ - case scala.util.Success(v) => this.send(v) - case scala.util.Failure(e) => ac.reportFailure(e) - } - } - - def runBatch0(msgs: Seq[(T, Context.Token)]): Unit - private[this] def runWithItems(): Unit = { - val msgs = synchronized(queue.dequeueAll(_ => true)) - - runBatch0(msgs) - - synchronized{ - if (queue.nonEmpty) ac.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - } -} - -abstract class BatchActor[T]()(implicit ac: Context) extends BaseActor[T]{ - def runBatch(msgs: Seq[T]): Unit - def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { - try { - msgs.foreach{case (m, token) => ac.reportRun(this, m, token)} - runBatch(msgs.map(_._1)) - } - catch{case e: Throwable => ac.reportFailure(e)} - finally msgs.foreach{case (m, token) => ac.reportComplete(token)} - - } -} - -abstract class SimpleActor[T]()(implicit ac: Context) extends BaseActor[T]{ - def run(msg: T): Unit - override def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { - for((msg, token) <- msgs) { - try { - ac.reportRun(this, msg, token) - run(msg) - } - catch{case e: Throwable => ac.reportFailure(e)} - finally ac.reportComplete(token) - } - } -} - -abstract class StateMachineActor[T]()(implicit ac: Context) extends SimpleActor[T]() { - class State(run0: T => State = null){ - def run = run0 - } - protected[this] def initialState: State - protected[this] var state: State = initialState - def run(msg: T): Unit = { - assert(state != null) - state = state.run(msg) - } -} - -class ProxyActor[T, V](f: T => V, downstream: Actor[V]) - (implicit ac: Context) extends SimpleActor[T]{ - def run(msg: T): Unit = downstream.send(f(msg)) -} \ No newline at end of file -- cgit v1.2.3