From 783944cb8750e3c26284aba80a8d48590dcba3b8 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sun, 3 Nov 2019 23:59:18 +0800 Subject: actor readme pipelineLog --- cask/actor/test/src-jvm/JvmActorsTest.scala | 58 ++++++++++++++++++++++++++ docs/pages/4 - Cask Actors.md | 64 ++++++++++++++++++++++++++++- 2 files changed, 121 insertions(+), 1 deletion(-) diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala index 066fd3e..45c8546 100644 --- a/cask/actor/test/src-jvm/JvmActorsTest.scala +++ b/cask/actor/test/src-jvm/JvmActorsTest.scala @@ -243,6 +243,64 @@ object JvmActorsTest extends TestSuite{ "Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo", ) } + test("pipelineLog"){ + class Writer(log: os.Path, old: os.Path, rotateSize: Int) + (implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = s.length + os.move(log, old, replaceExisting = true) + } + os.write.append(log, s + "\n", createFolders = true) + } + private var logSize = 0 + } + + class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes)) + } + + implicit val ac = new Context.Test( + scala.concurrent.ExecutionContext.fromExecutor( + java.util.concurrent.Executors.newSingleThreadExecutor() + ) + ){ + override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = { + println(s"$a <- $msg") + super.reportRun(a, msg, token) + } + } + + val logPath = os.pwd / "out" / "scratch" / "log.txt" + val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + + val writer = new Writer(logPath, oldPath, rotateSize = 50) + val logger = new Logger(writer) + + logger.send("I am cow") + logger.send("hear me moo") + logger.send("I weight twice as much as you") + logger.send("And I look good on the barbecue") + logger.send("Yoghurt curds cream cheese and butter") + logger.send("Comes from liquids from my udder") + logger.send("I am cow, I am cow") + logger.send("Hear me moo, moooo") + + ac.waitForInactivity() + + os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n" + os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n" + + def decodeFile(p: os.Path) = { + os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s))) + } + + decodeFile(oldPath) ==> Seq("Comes from liquids from my udder") + decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") + } + } } \ No newline at end of file diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md index 6845d08..96f3412 100644 --- a/docs/pages/4 - Cask Actors.md +++ b/docs/pages/4 - Cask Actors.md @@ -417,4 +417,66 @@ override def run(msg: Msg): Unit = { super.run(msg) if (???) println(state) } -``` \ No newline at end of file +``` + +### Context Logging + +Apart from logging individual Actors, you can also insert logging into the +`cask.actor.Context` to log certain state transitions or actions. For example, +you can log every time a message is run on an actor by overriding the +`reportRun` callback: + +```scala +implicit val ac = new Context.Test(){ + override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = { + println(s"$a <- $msg") + super.reportRun(a, msg, token) + } +} +``` + +Running this on the +[two-actor pipeline example](#parallelism-using-actor-pipelines) from earlier, +it helps us visualize exactly what our actors are going: + +```text +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I am cow +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- hear me moo +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I weight twice as much as you +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSBhbSBjb3c= +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- And I look good on the barbecue +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Yoghurt curds cream cheese and butter +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Comes from liquids from my udder +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I am cow, I am cow +cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Hear me moo, moooo +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- aGVhciBtZSBtb28= +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSB3ZWlnaHQgdHdpY2UgYXMgbXVjaCBhcyB5b3U= +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- QW5kIEkgbG9vayBnb29kIG9uIHRoZSBiYXJiZWN1ZQ== +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- WW9naHVydCBjdXJkcyBjcmVhbSBjaGVlc2UgYW5kIGJ1dHRlcg== +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI= +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSBhbSBjb3csIEkgYW0gY293 +cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SGVhciBtZSBtb28sIG1vb29v +``` + +We can also replace the default `scala.concurrent.ExecutionContext.global` +executor with a single-threaded executor, if we want our Actor pipeline to +behave 100% deterministically: + +```scala +implicit val ac = new Context.Test( + scala.concurrent.ExecutionContext.fromExecutor( + java.util.concurrent.Executors.newSingleThreadExecutor() + ) +){ + override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = { + println(s"$a <- $msg") + super.reportRun(a, msg, token) + } +} +``` + +Any asynchronous Actor pipeline should be able to run no a +`newSingleThreadExecutor`. While it would be slower than running on the default +thread pool, it should make execution of your actors much more deterministic - +only one actor will be running at a time - and make it easier to track down +logical bugs without multithreaded parallelism getting in the way. \ No newline at end of file -- cgit v1.2.3