summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-11-04 07:44:27 +0800
committerLi Haoyi <haoyi.sg@gmail.com>2019-11-04 07:44:27 +0800
commitaaca9ae2a09364fd8ae5bf85e080734cec351657 (patch)
tree32d2ec9e989f53ad8dec4a64fab141933f58d973
parentb9716a7da6c3f3587ddd9918fd5048903ec78296 (diff)
downloadcask-aaca9ae2a09364fd8ae5bf85e080734cec351657.tar.gz
cask-aaca9ae2a09364fd8ae5bf85e080734cec351657.tar.bz2
cask-aaca9ae2a09364fd8ae5bf85e080734cec351657.zip
add BatchActor unit test and docs
-rw-r--r--cask/actor/test/src-jvm/JvmActorsTest.scala59
-rw-r--r--docs/pages/4 - Cask Actors.md105
2 files changed, 160 insertions, 4 deletions
diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala
index 45c8546..63da92c 100644
--- a/cask/actor/test/src-jvm/JvmActorsTest.scala
+++ b/cask/actor/test/src-jvm/JvmActorsTest.scala
@@ -123,6 +123,65 @@ object JvmActorsTest extends TestSuite{
decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
+ test("batch"){
+ sealed trait Msg
+ case class Text(value: String) extends Msg
+ case class Rotate() extends Msg
+ class Writer(log: os.Path, old: os.Path)
+ (implicit ac: Context) extends BatchActor[Msg]{
+ def runBatch(msgs: Seq[Msg]): Unit = {
+ msgs.lastIndexOf(Rotate()) match{
+ case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true)
+ case rotateIndex =>
+ val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1)
+ if (prevRotateIndex != -1) os.remove.all(log)
+ os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true)
+ os.move(log, old, replaceExisting = true)
+ os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true)
+ }
+ }
+ def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n"
+ }
+
+ class Logger(dest: Actor[Msg], rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ dest.send(Rotate())
+ }
+ dest.send(Text(s))
+ }
+ private var logSize = 0
+ }
+
+ implicit val ac = new Context.Test()
+
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+ val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+ val writer = new Writer(logPath, oldPath)
+ val logger = new Logger(writer, rotateSize = 50)
+
+ logger.send("I am cow")
+ logger.send("hear me moo")
+ logger.send("I weight twice as much as you")
+ logger.send("And I look good on the barbecue")
+ logger.send("Yoghurt curds cream cheese and butter")
+ logger.send("Comes from liquids from my udder")
+ logger.send("I am cow, I am cow")
+ logger.send("Hear me moo, moooo")
+
+ // Logger hasn't finished yet, running in the background
+ ac.waitForInactivity()
+ // Now logger has finished
+
+ os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
+ os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+ }
+
test("debounce"){
sealed trait Msg
case class Debounced() extends Msg
diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md
index 5c7cd27..1b899e4 100644
--- a/docs/pages/4 - Cask Actors.md
+++ b/docs/pages/4 - Cask Actors.md
@@ -272,11 +272,104 @@ You can imagine adding additional stages to this actor pipeline, to perform
other sorts of processing, and have those additional stages running in parallel
as well.
+### Batch Logging using BatchActor
+
+Sometimes it is more efficient for an Actor to handle all incoming messages at
+once. You may be working with a HTTP API that lets you send one batch request
+rather than a hundred small ones, or with a database that lets you send one
+batch query to settle all incoming messages. In these situations, you can use a
+`BatchActor`.
+
+This example again shows a logging pipeline, but instead of the two stages being
+"encoding" and "writing to disk", our two stages are "handling log rotating" and
+"batch writing":
+
+```scala
+sealed trait Msg
+case class Text(value: String) extends Msg
+case class Rotate() extends Msg
+class Writer(log: os.Path, old: os.Path)
+ (implicit ac: Context) extends BatchActor[Msg]{
+ def runBatch(msgs: Seq[Msg]): Unit = {
+ msgs.lastIndexOf(Rotate()) match{
+ case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true)
+ case rotateIndex =>
+ val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1)
+ if (prevRotateIndex != -1) os.remove.all(log)
+ os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true)
+ os.move(log, old, replaceExisting = true)
+ os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true)
+ }
+ }
+ def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n"
+}
+
+class Logger(dest: Actor[Msg], rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ dest.send(Rotate())
+ }
+ dest.send(Text(s))
+ }
+ private var logSize = 0
+}
+
+implicit val ac = new Context.Test()
+
+val logPath = os.pwd / "out" / "scratch" / "log.txt"
+val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+val writer = new Writer(logPath, oldPath)
+val logger = new Logger(writer, rotateSize = 50)
+
+logger.send("I am cow")
+logger.send("hear me moo")
+logger.send("I weight twice as much as you")
+logger.send("And I look good on the barbecue")
+logger.send("Yoghurt curds cream cheese and butter")
+logger.send("Comes from liquids from my udder")
+logger.send("I am cow, I am cow")
+logger.send("Hear me moo, moooo")
+
+ac.waitForInactivity()
+
+os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
+os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+```
+
+Here the `Logger` actor takes incoming log lines and decides when it needs to
+trigger a log rotation, while sending both the log lines and rotation commands
+as `Text` and `Rotate` commands to the `Writer` batch actor which handles
+batches of these messages via its `runBatch` method. `Writer` filters through
+the list of incoming messages to decide what it needs to do: either there are
+zero `Rotate` commands and it simply appends all incoming `Text`s to the log
+file, or there are one-or-more `Rotate` commands it needs to do a log rotation,
+writing the batched messages once to the log file pre- and post-rotation.
+
+Using a `BatchActor` here helps reduce the number of writes to the filesystem:
+no matter how many messages get queued up, our batch actor only makes two
+writes. Furthermore, if there are more than two `Rotate` commands in the same
+batch, earlier `Text` log lines can be discarded without being written at all!
+Together this can greatly improve the performance of working with external APIs.
+
+Note that when extending `BatchActor`, it is up to the implementer to ensure
+that the `BatchActor`s `runBatch` method has the same visible effect as if they
+had run a single `run` method on each message individually. Violating that
+assumption may lead to weird bugs, where the actor behaves differently depending
+on how the messages are batched (which is nondeterministic, and may depend on
+thread scheduling and other performance related details).
+
### Debounced Logging using State Machines
-The last common API we will look at is using `StateMachineActor`. In this
-example, we use `StateMachineActor` to define a `Logger` actor with two states
-`Idle` and `Buffering`:
+The last common API we will look at is using `StateMachineActor`. We will define
+an actor that debounces writes to disk, ensuring they do not happen any more
+frequently than once every 50 milliseconds. This is a common pattern when
+working with an external API that you do not want to overload with large numbers
+of API calls.
```scala
sealed trait Msg
@@ -325,12 +418,16 @@ os.read.lines(logPath) ==> Seq(
)
```
+In this example, we use `StateMachineActor` to define a `Logger` actor with two
+states `Idle` and `Buffering`.
+
This actor starts out with its `initalState = Idle()`. When it receives a `Text`
message, it schedules a `Flush` message to be sent 50 milliseconds in the
future, and transitions into the `Buffering` state. While in `Buffering`, any
additional `Text` messages are simply accumulated onto the buffer, until the
`Flush` is received again and all the buffered messages are flushed to disk.
-Each group of messages is written as a single line, separated by newlines.
+Each group of messages is written as a single line, separated by newlines (just
+so we can see the effect of the batching in the output)
You can see that we send the text messages to the `logger` in three groups
separated by 100 millisecond waits, and as a result the final log file ends up