summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-11-03 23:59:18 +0800
committerLi Haoyi <haoyi.sg@gmail.com>2019-11-03 23:59:18 +0800
commit783944cb8750e3c26284aba80a8d48590dcba3b8 (patch)
tree81682d60fabb1e0fa1a65d89addd9b91b4274892
parenta1a5484405867069427ed1f9090c847ac7ddd4c6 (diff)
downloadcask-783944cb8750e3c26284aba80a8d48590dcba3b8.tar.gz
cask-783944cb8750e3c26284aba80a8d48590dcba3b8.tar.bz2
cask-783944cb8750e3c26284aba80a8d48590dcba3b8.zip
actor readme pipelineLog
-rw-r--r--cask/actor/test/src-jvm/JvmActorsTest.scala58
-rw-r--r--docs/pages/4 - Cask Actors.md64
2 files changed, 121 insertions, 1 deletions
diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala
index 066fd3e..45c8546 100644
--- a/cask/actor/test/src-jvm/JvmActorsTest.scala
+++ b/cask/actor/test/src-jvm/JvmActorsTest.scala
@@ -243,6 +243,64 @@ object JvmActorsTest extends TestSuite{
"Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo",
)
}
+ test("pipelineLog"){
+ class Writer(log: os.Path, old: os.Path, rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ os.move(log, old, replaceExisting = true)
+ }
+ os.write.append(log, s + "\n", createFolders = true)
+ }
+ private var logSize = 0
+ }
+
+ class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
+ }
+
+ implicit val ac = new Context.Test(
+ scala.concurrent.ExecutionContext.fromExecutor(
+ java.util.concurrent.Executors.newSingleThreadExecutor()
+ )
+ ){
+ override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = {
+ println(s"$a <- $msg")
+ super.reportRun(a, msg, token)
+ }
+ }
+
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+ val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+ val writer = new Writer(logPath, oldPath, rotateSize = 50)
+ val logger = new Logger(writer)
+
+ logger.send("I am cow")
+ logger.send("hear me moo")
+ logger.send("I weight twice as much as you")
+ logger.send("And I look good on the barbecue")
+ logger.send("Yoghurt curds cream cheese and butter")
+ logger.send("Comes from liquids from my udder")
+ logger.send("I am cow, I am cow")
+ logger.send("Hear me moo, moooo")
+
+ ac.waitForInactivity()
+
+ os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
+ os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"
+
+ def decodeFile(p: os.Path) = {
+ os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
+ }
+
+ decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
+ decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+ }
+
}
} \ No newline at end of file
diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md
index 6845d08..96f3412 100644
--- a/docs/pages/4 - Cask Actors.md
+++ b/docs/pages/4 - Cask Actors.md
@@ -417,4 +417,66 @@ override def run(msg: Msg): Unit = {
super.run(msg)
if (???) println(state)
}
-``` \ No newline at end of file
+```
+
+### Context Logging
+
+Apart from logging individual Actors, you can also insert logging into the
+`cask.actor.Context` to log certain state transitions or actions. For example,
+you can log every time a message is run on an actor by overriding the
+`reportRun` callback:
+
+```scala
+implicit val ac = new Context.Test(){
+ override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = {
+ println(s"$a <- $msg")
+ super.reportRun(a, msg, token)
+ }
+}
+```
+
+Running this on the
+[two-actor pipeline example](#parallelism-using-actor-pipelines) from earlier,
+it helps us visualize exactly what our actors are going:
+
+```text
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I am cow
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- hear me moo
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I weight twice as much as you
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSBhbSBjb3c=
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- And I look good on the barbecue
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Yoghurt curds cream cheese and butter
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Comes from liquids from my udder
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- I am cow, I am cow
+cask.actor.JvmActorsTest$Logger$5@4a903c98 <- Hear me moo, moooo
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- aGVhciBtZSBtb28=
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSB3ZWlnaHQgdHdpY2UgYXMgbXVjaCBhcyB5b3U=
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- QW5kIEkgbG9vayBnb29kIG9uIHRoZSBiYXJiZWN1ZQ==
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- WW9naHVydCBjdXJkcyBjcmVhbSBjaGVlc2UgYW5kIGJ1dHRlcg==
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SSBhbSBjb3csIEkgYW0gY293
+cask.actor.JvmActorsTest$Writer$2@3bb87fa0 <- SGVhciBtZSBtb28sIG1vb29v
+```
+
+We can also replace the default `scala.concurrent.ExecutionContext.global`
+executor with a single-threaded executor, if we want our Actor pipeline to
+behave 100% deterministically:
+
+```scala
+implicit val ac = new Context.Test(
+ scala.concurrent.ExecutionContext.fromExecutor(
+ java.util.concurrent.Executors.newSingleThreadExecutor()
+ )
+){
+ override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = {
+ println(s"$a <- $msg")
+ super.reportRun(a, msg, token)
+ }
+}
+```
+
+Any asynchronous Actor pipeline should be able to run no a
+`newSingleThreadExecutor`. While it would be slower than running on the default
+thread pool, it should make execution of your actors much more deterministic -
+only one actor will be running at a time - and make it easier to track down
+logical bugs without multithreaded parallelism getting in the way. \ No newline at end of file