summaryrefslogtreecommitdiff
path: root/cask/actor/src/Actors.scala
diff options
context:
space:
mode:
Diffstat (limited to 'cask/actor/src/Actors.scala')
-rw-r--r--cask/actor/src/Actors.scala86
1 files changed, 0 insertions, 86 deletions
diff --git a/cask/actor/src/Actors.scala b/cask/actor/src/Actors.scala
deleted file mode 100644
index 50b3b4e..0000000
--- a/cask/actor/src/Actors.scala
+++ /dev/null
@@ -1,86 +0,0 @@
-package cask.actor
-import collection.mutable
-
-abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{
- private val queue = new mutable.Queue[(T, Context.Token)]()
-
- private var scheduled = false
-
- def send(t: T)
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line): Unit = synchronized{
- val token = ac.reportSchedule(this, t, fileName, line)
- queue.enqueue((t, token))
- if (!scheduled){
- scheduled = true
- ac.execute(() => runWithItems())
- }
- }
- def sendAsync(f: scala.concurrent.Future[T])
- (implicit fileName: sourcecode.FileName,
- line: sourcecode.Line) = {
- f.onComplete{
- case scala.util.Success(v) => this.send(v)
- case scala.util.Failure(e) => ac.reportFailure(e)
- }
- }
-
- def runBatch0(msgs: Seq[(T, Context.Token)]): Unit
- private[this] def runWithItems(): Unit = {
- val msgs = synchronized(queue.dequeueAll(_ => true))
-
- runBatch0(msgs)
-
- synchronized{
- if (queue.nonEmpty) ac.execute(() => runWithItems())
- else{
- assert(scheduled)
- scheduled = false
- }
- }
- }
-}
-
-abstract class BatchActor[T]()(implicit ac: Context) extends BaseActor[T]{
- def runBatch(msgs: Seq[T]): Unit
- def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = {
- try {
- msgs.foreach{case (m, token) => ac.reportRun(this, m, token)}
- runBatch(msgs.map(_._1))
- }
- catch{case e: Throwable => ac.reportFailure(e)}
- finally msgs.foreach{case (m, token) => ac.reportComplete(token)}
-
- }
-}
-
-abstract class SimpleActor[T]()(implicit ac: Context) extends BaseActor[T]{
- def run(msg: T): Unit
- override def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = {
- for((msg, token) <- msgs) {
- try {
- ac.reportRun(this, msg, token)
- run(msg)
- }
- catch{case e: Throwable => ac.reportFailure(e)}
- finally ac.reportComplete(token)
- }
- }
-}
-
-abstract class StateMachineActor[T]()(implicit ac: Context) extends SimpleActor[T]() {
- class State(run0: T => State = null){
- def run = run0
- }
- protected[this] def initialState: State
- protected[this] var state: State = initialState
- def run(msg: T): Unit = {
- assert(state != null)
- state = state.run(msg)
- }
-}
-
-class ProxyActor[T, V](f: T => V, downstream: Actor[V])
- (implicit ac: Context) extends SimpleActor[T]{
- def run(msg: T): Unit = downstream.send(f(msg))
-} \ No newline at end of file