From aaca9ae2a09364fd8ae5bf85e080734cec351657 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Mon, 4 Nov 2019 07:44:27 +0800 Subject: add BatchActor unit test and docs --- cask/actor/test/src-jvm/JvmActorsTest.scala | 59 ++++++++++++++++ docs/pages/4 - Cask Actors.md | 105 ++++++++++++++++++++++++++-- 2 files changed, 160 insertions(+), 4 deletions(-) diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala index 45c8546..63da92c 100644 --- a/cask/actor/test/src-jvm/JvmActorsTest.scala +++ b/cask/actor/test/src-jvm/JvmActorsTest.scala @@ -123,6 +123,65 @@ object JvmActorsTest extends TestSuite{ decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") } + test("batch"){ + sealed trait Msg + case class Text(value: String) extends Msg + case class Rotate() extends Msg + class Writer(log: os.Path, old: os.Path) + (implicit ac: Context) extends BatchActor[Msg]{ + def runBatch(msgs: Seq[Msg]): Unit = { + msgs.lastIndexOf(Rotate()) match{ + case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true) + case rotateIndex => + val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1) + if (prevRotateIndex != -1) os.remove.all(log) + os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true) + os.move(log, old, replaceExisting = true) + os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true) + } + } + def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n" + } + + class Logger(dest: Actor[Msg], rotateSize: Int) + (implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = s.length + dest.send(Rotate()) + } + dest.send(Text(s)) + } + private var logSize = 0 + } + + implicit val ac = new Context.Test() + + val logPath = os.pwd / "out" / "scratch" / "log.txt" + val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + + val writer = new Writer(logPath, oldPath) + val logger = new Logger(writer, rotateSize = 50) + + logger.send("I am cow") + logger.send("hear me moo") + logger.send("I weight twice as much as you") + logger.send("And I look good on the barbecue") + logger.send("Yoghurt curds cream cheese and butter") + logger.send("Comes from liquids from my udder") + logger.send("I am cow, I am cow") + logger.send("Hear me moo, moooo") + + // Logger hasn't finished yet, running in the background + ac.waitForInactivity() + // Now logger has finished + + os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder") + os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") + } + test("debounce"){ sealed trait Msg case class Debounced() extends Msg diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md index 5c7cd27..1b899e4 100644 --- a/docs/pages/4 - Cask Actors.md +++ b/docs/pages/4 - Cask Actors.md @@ -272,11 +272,104 @@ You can imagine adding additional stages to this actor pipeline, to perform other sorts of processing, and have those additional stages running in parallel as well. +### Batch Logging using BatchActor + +Sometimes it is more efficient for an Actor to handle all incoming messages at +once. You may be working with a HTTP API that lets you send one batch request +rather than a hundred small ones, or with a database that lets you send one +batch query to settle all incoming messages. In these situations, you can use a +`BatchActor`. + +This example again shows a logging pipeline, but instead of the two stages being +"encoding" and "writing to disk", our two stages are "handling log rotating" and +"batch writing": + +```scala +sealed trait Msg +case class Text(value: String) extends Msg +case class Rotate() extends Msg +class Writer(log: os.Path, old: os.Path) + (implicit ac: Context) extends BatchActor[Msg]{ + def runBatch(msgs: Seq[Msg]): Unit = { + msgs.lastIndexOf(Rotate()) match{ + case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true) + case rotateIndex => + val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1) + if (prevRotateIndex != -1) os.remove.all(log) + os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true) + os.move(log, old, replaceExisting = true) + os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true) + } + } + def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n" +} + +class Logger(dest: Actor[Msg], rotateSize: Int) + (implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = s.length + dest.send(Rotate()) + } + dest.send(Text(s)) + } + private var logSize = 0 +} + +implicit val ac = new Context.Test() + +val logPath = os.pwd / "out" / "scratch" / "log.txt" +val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + +val writer = new Writer(logPath, oldPath) +val logger = new Logger(writer, rotateSize = 50) + +logger.send("I am cow") +logger.send("hear me moo") +logger.send("I weight twice as much as you") +logger.send("And I look good on the barbecue") +logger.send("Yoghurt curds cream cheese and butter") +logger.send("Comes from liquids from my udder") +logger.send("I am cow, I am cow") +logger.send("Hear me moo, moooo") + +ac.waitForInactivity() + +os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder") +os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") +``` + +Here the `Logger` actor takes incoming log lines and decides when it needs to +trigger a log rotation, while sending both the log lines and rotation commands +as `Text` and `Rotate` commands to the `Writer` batch actor which handles +batches of these messages via its `runBatch` method. `Writer` filters through +the list of incoming messages to decide what it needs to do: either there are +zero `Rotate` commands and it simply appends all incoming `Text`s to the log +file, or there are one-or-more `Rotate` commands it needs to do a log rotation, +writing the batched messages once to the log file pre- and post-rotation. + +Using a `BatchActor` here helps reduce the number of writes to the filesystem: +no matter how many messages get queued up, our batch actor only makes two +writes. Furthermore, if there are more than two `Rotate` commands in the same +batch, earlier `Text` log lines can be discarded without being written at all! +Together this can greatly improve the performance of working with external APIs. + +Note that when extending `BatchActor`, it is up to the implementer to ensure +that the `BatchActor`s `runBatch` method has the same visible effect as if they +had run a single `run` method on each message individually. Violating that +assumption may lead to weird bugs, where the actor behaves differently depending +on how the messages are batched (which is nondeterministic, and may depend on +thread scheduling and other performance related details). + ### Debounced Logging using State Machines -The last common API we will look at is using `StateMachineActor`. In this -example, we use `StateMachineActor` to define a `Logger` actor with two states -`Idle` and `Buffering`: +The last common API we will look at is using `StateMachineActor`. We will define +an actor that debounces writes to disk, ensuring they do not happen any more +frequently than once every 50 milliseconds. This is a common pattern when +working with an external API that you do not want to overload with large numbers +of API calls. ```scala sealed trait Msg @@ -325,12 +418,16 @@ os.read.lines(logPath) ==> Seq( ) ``` +In this example, we use `StateMachineActor` to define a `Logger` actor with two +states `Idle` and `Buffering`. + This actor starts out with its `initalState = Idle()`. When it receives a `Text` message, it schedules a `Flush` message to be sent 50 milliseconds in the future, and transitions into the `Buffering` state. While in `Buffering`, any additional `Text` messages are simply accumulated onto the buffer, until the `Flush` is received again and all the buffered messages are flushed to disk. -Each group of messages is written as a single line, separated by newlines. +Each group of messages is written as a single line, separated by newlines (just +so we can see the effect of the batching in the output) You can see that we send the text messages to the `logger` in three groups separated by 100 millisecond waits, and as a result the final log file ends up -- cgit v1.2.3