diff options
Diffstat (limited to 'cask/actor')
-rw-r--r-- | cask/actor/src/Actors.scala | 86 | ||||
-rw-r--r-- | cask/actor/src/Context.scala | 190 | ||||
-rw-r--r-- | cask/actor/test/src-jvm/JvmActorsTest.scala | 365 | ||||
-rw-r--r-- | cask/actor/test/src/ActorsTest.scala | 17 |
4 files changed, 0 insertions, 658 deletions
diff --git a/cask/actor/src/Actors.scala b/cask/actor/src/Actors.scala deleted file mode 100644 index 50b3b4e..0000000 --- a/cask/actor/src/Actors.scala +++ /dev/null @@ -1,86 +0,0 @@ -package cask.actor -import collection.mutable - -abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{ - private val queue = new mutable.Queue[(T, Context.Token)]() - - private var scheduled = false - - def send(t: T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit = synchronized{ - val token = ac.reportSchedule(this, t, fileName, line) - queue.enqueue((t, token)) - if (!scheduled){ - scheduled = true - ac.execute(() => runWithItems()) - } - } - def sendAsync(f: scala.concurrent.Future[T]) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line) = { - f.onComplete{ - case scala.util.Success(v) => this.send(v) - case scala.util.Failure(e) => ac.reportFailure(e) - } - } - - def runBatch0(msgs: Seq[(T, Context.Token)]): Unit - private[this] def runWithItems(): Unit = { - val msgs = synchronized(queue.dequeueAll(_ => true)) - - runBatch0(msgs) - - synchronized{ - if (queue.nonEmpty) ac.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - } -} - -abstract class BatchActor[T]()(implicit ac: Context) extends BaseActor[T]{ - def runBatch(msgs: Seq[T]): Unit - def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { - try { - msgs.foreach{case (m, token) => ac.reportRun(this, m, token)} - runBatch(msgs.map(_._1)) - } - catch{case e: Throwable => ac.reportFailure(e)} - finally msgs.foreach{case (m, token) => ac.reportComplete(token)} - - } -} - -abstract class SimpleActor[T]()(implicit ac: Context) extends BaseActor[T]{ - def run(msg: T): Unit - override def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { - for((msg, token) <- msgs) { - try { - ac.reportRun(this, msg, token) - run(msg) - } - catch{case e: Throwable => ac.reportFailure(e)} - finally ac.reportComplete(token) - } - } -} - -abstract class StateMachineActor[T]()(implicit ac: Context) extends SimpleActor[T]() { - class State(run0: T => State = null){ - def run = run0 - } - protected[this] def initialState: State - protected[this] var state: State = initialState - def run(msg: T): Unit = { - assert(state != null) - state = state.run(msg) - } -} - -class ProxyActor[T, V](f: T => V, downstream: Actor[V]) - (implicit ac: Context) extends SimpleActor[T]{ - def run(msg: T): Unit = downstream.send(f(msg)) -}
\ No newline at end of file diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala deleted file mode 100644 index 6b56f2e..0000000 --- a/cask/actor/src/Context.scala +++ /dev/null @@ -1,190 +0,0 @@ -package cask.actor -import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} - -import scala.concurrent.duration.Duration -import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise} -import scala.util.Try - -/** - * An extended `scala.concurrent.ExecutionContext`; provides the ability to - * schedule messages to be sent later, and hooks to track the current number of - * outstanding tasks or log the actor message sends for debugging purporses - */ -trait Context extends ExecutionContext { - def reportSchedule(): Context.Token = new Context.Token.Simple() - - def reportSchedule(fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - new Context.Token.Future(fileName, line) - } - - def reportSchedule(a: Actor[_], - msg: Any, - fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - new Context.Token.Send(a, msg, fileName, line) - } - - def reportRun(a: Actor[_], - msg: Any, - token: Context.Token): Unit = () - - def reportComplete(token: Context.Token): Unit = () - - def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit - - def future[T](t: => T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Future[T] - - def execute(runnable: Runnable): Unit -} - -object Context{ - trait Token - object Token{ - class Simple extends Token(){ - override def toString = "token@" + Integer.toHexString(hashCode()) - } - - class Future(val fileName: sourcecode.FileName, - val line: sourcecode.Line) extends Token(){ - override def toString = - "token@" + Integer.toHexString(hashCode()) + "@" + - fileName.value + ":" + line.value - } - - class Send(val a: Actor[_], - val msg: Any, - val fileName: sourcecode.FileName, - val line: sourcecode.Line) extends Token(){ - override def toString = - "token@" + Integer.toHexString(hashCode()) + "@" + - fileName.value + ":" + line.value - } - } - - class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl { - def executionContext = ec - def reportFailure(t: Throwable) = logEx(t) - } - - object Simple{ - implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace()) - } - - class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global, - logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl { - private[this] val active = collection.mutable.Set.empty[Context.Token] - private[this] var promise = concurrent.Promise.successful[Unit](()) - - def executionContext = ec - - def reportFailure(t: Throwable) = logEx(t) - - def handleReportSchedule(token: Context.Token) = synchronized{ - if (active.isEmpty) { - assert(promise.isCompleted) - promise = concurrent.Promise[Unit] - } - active.add(token) - token - } - override def reportSchedule() = { - handleReportSchedule(super.reportSchedule()) - } - override def reportSchedule(fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - handleReportSchedule(super.reportSchedule(fileName, line)) - } - - override def reportSchedule(a: Actor[_], - msg: Any, - fileName: sourcecode.FileName, - line: sourcecode.Line): Context.Token = { - handleReportSchedule(super.reportSchedule(a, msg, fileName, line)) - } - - override def reportComplete(token: Context.Token) = this.synchronized{ - assert(active.remove(token)) - - if (active.isEmpty) promise.success(()) - } - - def waitForInactivity(timeout: Option[java.time.Duration] = None) = { - Await.result( - this.synchronized(promise).future, - timeout match{ - case None => scala.concurrent.duration.Duration.Inf - case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos) - } - ) - } - } - - trait Impl extends Context { - def executionContext: ExecutionContext - - def execute(runnable: Runnable): Unit = { - val token = reportSchedule() - executionContext.execute(new Runnable { - def run(): Unit = { - try runnable.run() - finally reportComplete(token) - } - }) - } - - def future[T](t: => T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Future[T] = { - val token = reportSchedule(fileName, line) - val p = Promise[T] - executionContext.execute(new Runnable { - def run(): Unit = { - p.complete(scala.util.Try(t)) - reportComplete(token) - } - }) - p.future - } - - lazy val scheduler = Executors.newSingleThreadScheduledExecutor( - new ThreadFactory { - def newThread(r: Runnable): Thread = { - val t = new Thread(r, "ActorContext-Scheduler-Thread") - t.setDaemon(true) - t - } - } - ) - - def scheduleMsg[T](a: Actor[T], - msg: T, delay: java.time.Duration) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line) = { - val token = reportSchedule(a, msg, fileName, line) - scheduler.schedule[Unit]( - () => { - a.send(msg)(fileName, line) - reportComplete(token) - }, - delay.toMillis, - TimeUnit.MILLISECONDS - ) - } - } - -} - -trait Actor[T]{ - def send(t: T) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit - - def sendAsync(f: scala.concurrent.Future[T]) - (implicit fileName: sourcecode.FileName, - line: sourcecode.Line): Unit -} diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala deleted file mode 100644 index 63da92c..0000000 --- a/cask/actor/test/src-jvm/JvmActorsTest.scala +++ /dev/null @@ -1,365 +0,0 @@ -package cask.actor - -import utest._ -object JvmActorsTest extends TestSuite{ - def tests = Tests{ - os.remove.all(os.pwd / "out" / "scratch") - test("lock"){ - val rotateSize = 50 - val logPath = os.pwd / "out" / "scratch" / "log.txt" - val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" - - var logSize = 0 - - def logLine(s: String): Unit = synchronized{ - val newLogSize = logSize + s.length + 1 - if (newLogSize <= rotateSize) logSize = newLogSize - else { - logSize = 0 - os.move(logPath, oldPath, replaceExisting = true) - } - - os.write.append(logPath, s + "\n", createFolders = true) - } - - logLine("I am cow") - logLine("hear me moo") - logLine("I weight twice as much as you") - logLine("And I look good on the barbecue") - logLine("Yoghurt curds cream cheese and butter") - logLine("Comes from liquids from my udder") - logLine("I am cow, I am cow") - logLine("Hear me moo, moooo") - - os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder" - os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo" - } - - test("actor"){ - class Logger(log: os.Path, old: os.Path, rotateSize: Int) - (implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = { - val newLogSize = logSize + s.length + 1 - if (newLogSize <= rotateSize) logSize = newLogSize - else { - logSize = s.length - os.move(log, old, replaceExisting = true) - } - os.write.append(log, s + "\n", createFolders = true) - } - private var logSize = 0 - } - - implicit val ac = new Context.Test() - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" - - val logger = new Logger(logPath, oldPath, rotateSize = 50) - - logger.send("I am cow") - logger.send("hear me moo") - logger.send("I weight twice as much as you") - logger.send("And I look good on the barbecue") - logger.send("Yoghurt curds cream cheese and butter") - logger.send("Comes from liquids from my udder") - logger.send("I am cow, I am cow") - logger.send("Hear me moo, moooo") - - // Logger hasn't finished yet, running in the background - ac.waitForInactivity() - // Now logger has finished - - os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder") - os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") - } - - test("pipeline"){ - class Writer(log: os.Path, old: os.Path, rotateSize: Int) - (implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = { - val newLogSize = logSize + s.length + 1 - if (newLogSize <= rotateSize) logSize = newLogSize - else { - logSize = s.length - os.move(log, old, replaceExisting = true) - } - os.write.append(log, s + "\n", createFolders = true) - } - private var logSize = 0 - } - - class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes)) - } - - implicit val ac = new Context.Test() - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" - - val writer = new Writer(logPath, oldPath, rotateSize = 50) - val logger = new Logger(writer) - - logger.send("I am cow") - logger.send("hear me moo") - logger.send("I weight twice as much as you") - logger.send("And I look good on the barbecue") - logger.send("Yoghurt curds cream cheese and butter") - logger.send("Comes from liquids from my udder") - logger.send("I am cow, I am cow") - logger.send("Hear me moo, moooo") - - ac.waitForInactivity() - - os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n" - os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n" - - def decodeFile(p: os.Path) = { - os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s))) - } - - decodeFile(oldPath) ==> Seq("Comes from liquids from my udder") - decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") - } - - test("batch"){ - sealed trait Msg - case class Text(value: String) extends Msg - case class Rotate() extends Msg - class Writer(log: os.Path, old: os.Path) - (implicit ac: Context) extends BatchActor[Msg]{ - def runBatch(msgs: Seq[Msg]): Unit = { - msgs.lastIndexOf(Rotate()) match{ - case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true) - case rotateIndex => - val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1) - if (prevRotateIndex != -1) os.remove.all(log) - os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true) - os.move(log, old, replaceExisting = true) - os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true) - } - } - def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n" - } - - class Logger(dest: Actor[Msg], rotateSize: Int) - (implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = { - val newLogSize = logSize + s.length + 1 - if (newLogSize <= rotateSize) logSize = newLogSize - else { - logSize = s.length - dest.send(Rotate()) - } - dest.send(Text(s)) - } - private var logSize = 0 - } - - implicit val ac = new Context.Test() - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" - - val writer = new Writer(logPath, oldPath) - val logger = new Logger(writer, rotateSize = 50) - - logger.send("I am cow") - logger.send("hear me moo") - logger.send("I weight twice as much as you") - logger.send("And I look good on the barbecue") - logger.send("Yoghurt curds cream cheese and butter") - logger.send("Comes from liquids from my udder") - logger.send("I am cow, I am cow") - logger.send("Hear me moo, moooo") - - // Logger hasn't finished yet, running in the background - ac.waitForInactivity() - // Now logger has finished - - os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder") - os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") - } - - test("debounce"){ - sealed trait Msg - case class Debounced() extends Msg - case class Text(value: String) extends Msg - - class Logger(log: os.Path, debounceTime: java.time.Duration) - (implicit ac: Context) extends StateMachineActor[Msg]{ - def initialState = Idle() - case class Idle() extends State({ - case Text(value) => - ac.scheduleMsg(this, Debounced(), debounceTime) - Buffering(Vector(value)) - }) - case class Buffering(buffer: Vector[String]) extends State({ - case Text(value) => Buffering(buffer :+ value) - case Debounced() => - os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true) - Idle() - }) - } - - implicit val ac = new Context.Test() - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - - val logger = new Logger(logPath, java.time.Duration.ofMillis(50)) - - logger.send(Text("I am cow")) - logger.send(Text("hear me moo")) - Thread.sleep(100) - logger.send(Text("I weight twice as much as you")) - logger.send(Text("And I look good on the barbecue")) - Thread.sleep(100) - logger.send(Text("Yoghurt curds cream cheese and butter")) - logger.send(Text("Comes from liquids from my udder")) - logger.send(Text("I am cow, I am cow")) - logger.send(Text("Hear me moo, moooo")) - - ac.waitForInactivity() - - os.read.lines(logPath) ==> Seq( - "I am cow hear me moo", - "I weight twice as much as you And I look good on the barbecue", - "Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo", - ) - } - test("log"){ - sealed trait Msg - case class Debounced() extends Msg - case class Text(value: String) extends Msg - - class Logger(log: os.Path, debounceTime: java.time.Duration) - (implicit ac: Context) extends StateMachineActor[Msg]{ - def initialState = Idle() - case class Idle() extends State({ - case Text(value) => - ac.scheduleMsg(this, Debounced(), debounceTime) - Buffering(Vector(value)) - }) - case class Buffering(buffer: Vector[String]) extends State({ - case Text(value) => Buffering(buffer :+ value) - case Debounced() => - os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true) - Idle() - }) - - override def run(msg: Msg): Unit = { - println(s"$state + $msg -> ") - super.run(msg) - println(state) - } - } - - implicit val ac = new Context.Test() - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - - val logger = new Logger(logPath, java.time.Duration.ofMillis(50)) - - logger.send(Text("I am cow")) - // Idle() + Text(I am cow) -> - // Buffering(Vector(I am cow)) - logger.send(Text("hear me moo")) - // Buffering(Vector(I am cow)) + Text(hear me moo) -> - // Buffering(Vector(I am cow, hear me moo)) - Thread.sleep(100) - // Buffering(Vector(I am cow, hear me moo)) + Debounced() -> - // Idle() - logger.send(Text("I weight twice as much as you")) - // Idle() + Text(I weight twice as much as you) -> - // Buffering(Vector(I weight twice as much as you)) - logger.send(Text("And I look good on the barbecue")) - // Buffering(Vector(I weight twice as much as you)) + Text(And I look good on the barbecue) -> - // Buffering(Vector(I weight twice as much as you, And I look good on the barbecue)) - Thread.sleep(100) - // Buffering(Vector(I weight twice as much as you, And I look good on the barbecue)) + Debounced() -> - // Idle() - logger.send(Text("Yoghurt curds cream cheese and butter")) - // Idle() + Text(Yoghurt curds cream cheese and butter) -> - // Buffering(Vector(Yoghurt curds cream cheese and butter)) - logger.send(Text("Comes from liquids from my udder")) - // Buffering(Vector(Yoghurt curds cream cheese and butter)) + - // Text(Comes from liquids from my udder) -> Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder)) - logger.send(Text("I am cow, I am cow")) - // Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder)) + Text(I am cow, I am cow) -> - // Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow)) - logger.send(Text("Hear me moo, moooo")) - // Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow)) + Text(Hear me moo, moooo) -> - // Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow, Hear me moo, moooo)) - - ac.waitForInactivity() - // Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow, Hear me moo, moooo)) + Debounced() -> - // Idle() - - os.read.lines(logPath) ==> Seq( - "I am cow hear me moo", - "I weight twice as much as you And I look good on the barbecue", - "Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo", - ) - } - test("pipelineLog"){ - class Writer(log: os.Path, old: os.Path, rotateSize: Int) - (implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = { - val newLogSize = logSize + s.length + 1 - if (newLogSize <= rotateSize) logSize = newLogSize - else { - logSize = s.length - os.move(log, old, replaceExisting = true) - } - os.write.append(log, s + "\n", createFolders = true) - } - private var logSize = 0 - } - - class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{ - def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes)) - } - - implicit val ac = new Context.Test( - scala.concurrent.ExecutionContext.fromExecutor( - java.util.concurrent.Executors.newSingleThreadExecutor() - ) - ){ - override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = { - println(s"$a <- $msg") - super.reportRun(a, msg, token) - } - } - - val logPath = os.pwd / "out" / "scratch" / "log.txt" - val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" - - val writer = new Writer(logPath, oldPath, rotateSize = 50) - val logger = new Logger(writer) - - logger.send("I am cow") - logger.send("hear me moo") - logger.send("I weight twice as much as you") - logger.send("And I look good on the barbecue") - logger.send("Yoghurt curds cream cheese and butter") - logger.send("Comes from liquids from my udder") - logger.send("I am cow, I am cow") - logger.send("Hear me moo, moooo") - - ac.waitForInactivity() - - os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n" - os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n" - - def decodeFile(p: os.Path) = { - os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s))) - } - - decodeFile(oldPath) ==> Seq("Comes from liquids from my udder") - decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") - } - - - } -}
\ No newline at end of file diff --git a/cask/actor/test/src/ActorsTest.scala b/cask/actor/test/src/ActorsTest.scala deleted file mode 100644 index 2c81e5b..0000000 --- a/cask/actor/test/src/ActorsTest.scala +++ /dev/null @@ -1,17 +0,0 @@ -package cask.actor -import utest._ -object ActorsTest extends TestSuite{ - def tests = Tests{ - test("hello"){ - import Context.Simple.global - - sealed trait Msg - - object logger extends SimpleActor[Msg]{ - def run(msg: Msg) = { - - } - } - } - } -}
\ No newline at end of file |