summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-11-03 22:56:30 +0800
committerLi Haoyi <haoyi.sg@gmail.com>2019-11-03 22:56:30 +0800
commit0c58de44a581acedc624ec118c8a2feb57334ad4 (patch)
treead69495ecfe5afcf8e3dd15fc3c464d1d10196dc
parent27b5d45d919cbc8d37fcc0194bb9bd39603a532e (diff)
downloadcask-0c58de44a581acedc624ec118c8a2feb57334ad4.tar.gz
cask-0c58de44a581acedc624ec118c8a2feb57334ad4.tar.bz2
cask-0c58de44a581acedc624ec118c8a2feb57334ad4.zip
add debouncing state machine to docs and unit tests
-rw-r--r--cask/actor/test/src-jvm/JvmActorsTest.scala48
-rw-r--r--docs/pages/4 - Cask Actors.md119
2 files changed, 166 insertions, 1 deletions
diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala
index 61c8e74..dc92555 100644
--- a/cask/actor/test/src-jvm/JvmActorsTest.scala
+++ b/cask/actor/test/src-jvm/JvmActorsTest.scala
@@ -122,5 +122,53 @@ object JvmActorsTest extends TestSuite{
decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
+
+ test("debounce"){
+ sealed trait Msg
+ case class Debounced() extends Msg
+ case class Text(value: String) extends Msg
+
+ class Logger(log: os.Path, debounceTime: java.time.Duration)
+ (implicit ac: Context) extends StateMachineActor[Msg]{
+ def initialState = Idle()
+ case class Idle() extends State({
+ case Text(value) =>
+ ac.scheduleMsg(this, Debounced(), debounceTime)
+ Buffering(Vector(value))
+ })
+ case class Buffering(buffer: Vector[String]) extends State({
+ case Text(value) => Buffering(buffer :+ value)
+ case Debounced() =>
+ os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true)
+ Idle()
+ })
+ }
+
+ implicit val ac = new Context.Test()
+
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+
+ val logger = new Logger(logPath, java.time.Duration.ofMillis(50))
+
+ logger.send(Text("I am cow"))
+ logger.send(Text("hear me moo"))
+ Thread.sleep(100)
+ logger.send(Text("I weight twice as much as you"))
+ logger.send(Text("And I look good on the barbecue"))
+ Thread.sleep(100)
+ logger.send(Text("Yoghurt curds cream cheese and butter"))
+ logger.send(Text("Comes from liquids from my udder"))
+ logger.send(Text("I am cow, I am cow"))
+ logger.send(Text("Hear me moo, moooo"))
+
+ ac.waitForInactivity()
+
+ os.read.lines(logPath) ==> Seq(
+ "I am cow hear me moo",
+ "I weight twice as much as you And I look good on the barbecue",
+ "Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo",
+ )
+ }
+
}
} \ No newline at end of file
diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md
index fdff1dd..ecfaedb 100644
--- a/docs/pages/4 - Cask Actors.md
+++ b/docs/pages/4 - Cask Actors.md
@@ -18,6 +18,52 @@ like Akka: Cask Actors do not support any sort of distribution or clustering,
and run entirely within a single process. Cask Actors are garbage collectible,
and you do not need to manually terminate them or manage their lifecycle.
+
+## Cask Actors
+
+
+At their core, Actors are simply objects who receive messages via a `send`
+method, and asynchronously process those messages one after the other:
+
+```scala
+trait Actor[T]{
+ def send(t: T): Unit
+
+ def sendAsync(f: scala.concurrent.Future[T]): Unit
+}
+```
+
+This processing happens in the background, and can take place without blocking.
+After a messsage is sent, the thread or actor that called `.send()` can
+immediately go on to do other things, even if the message hasn't been processed
+yet. Messages sent to an actor that is already busy will be queued up until the
+actor is free.
+
+Cask provides three primary classes you can inherit from to define actors:
+
+```scala
+abstract class SimpleActor[T]()(implicit ac: Context) extends Actor[T]{
+ def run(msg: T): Unit
+}
+
+abstract class BatchActor[T]()(implicit ac: Context) extends Actor[T]{
+ def runBatch(msgs: Seq[T]): Unit
+}
+
+abstract class StateMachineActor[T]()(implicit ac: Context) extends Actor[T]() {
+ class State(val run: T => State)
+ protected[this] def initialState: State
+}
+```
+
+`SimpleActor` works by providing a `run` function that will be run on each
+message. `BatchActor` allows you to provide a `runBatch` function that works on
+groups of messages at a time: this is useful when message processing can be
+batched together for better efficiency, e.g. making batched database queries
+instead of many individual. `StateMachineActor` allows you to define actors via
+a set of distinct states, each of which has a separate `run` callback that
+transitions the actor to a different state.
+
## Example: Asynchronous Logging using an Actor
Here is a small demonstration of using a `cask.actor.SimpleActor` to perform
@@ -209,4 +255,75 @@ down the execution of the main program.
You can imagine adding additional stages to this actor pipeline, to perform
other sorts of processing, and have those additional stages running in parallel
-as well. \ No newline at end of file
+as well.
+
+## Debounced Logging using State Machines
+
+The last common API we will look at is using `StateMachineActor`. In this
+example, we use `StateMachineActor` to define a `Logger` actor with two states
+`Idle` and `Buffering`:
+
+```scala
+sealed trait Msg
+case class Flush() extends Msg
+case class Text(value: String) extends Msg
+
+class Logger(log: os.Path, debounceTime: java.time.Duration)
+ (implicit ac: Context) extends StateMachineActor[Msg]{
+ def initialState = Idle()
+ case class Idle() extends State({
+ case Text(value) =>
+ ac.scheduleMsg(this, Flush(), debounceTime)
+ Buffering(Vector(value))
+ })
+ case class Buffering(buffer: Vector[String]) extends State({
+ case Text(value) => Buffering(buffer :+ value)
+ case Flush() =>
+ os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true)
+ Idle()
+ })
+}
+
+implicit val ac = new Context.Test()
+
+val logPath = os.pwd / "out" / "scratch" / "log.txt"
+
+val logger = new Logger(logPath, java.time.Duration.ofMillis(50))
+
+logger.send(Text("I am cow"))
+logger.send(Text("hear me moo"))
+Thread.sleep(100)
+logger.send(Text("I weight twice as much as you"))
+logger.send(Text("And I look good on the barbecue"))
+Thread.sleep(100)
+logger.send(Text("Yoghurt curds cream cheese and butter"))
+logger.send(Text("Comes from liquids from my udder"))
+logger.send(Text("I am cow, I am cow"))
+logger.send(Text("Hear me moo, moooo"))
+
+ac.waitForInactivity()
+
+os.read.lines(logPath) ==> Seq(
+ "I am cow hear me moo",
+ "I weight twice as much as you And I look good on the barbecue",
+ "Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo",
+)
+```
+
+This actor starts out with its `initalState = Idle()`. When it receives a `Text`
+message, it schedules a `Flush` message to be sent 50 milliseconds in the
+future, and transitions into the `Buffering` state. While in `Buffering`, any
+additional `Text` messages are simply accumulated onto the buffer, until the
+`Flush` is received again and all the buffered messages are flushed to disk.
+Each group of messages is written as a single line, separated by newlines.
+
+You can see that we send the text messages to the `logger` in three groups
+separated by 100 millisecond waits, and as a result the final log file ends up
+having three lines of logs each of which contains multiple messages buffered
+together.
+
+In general, `StateMachineActor` is very useful in cases where there are multiple
+distinct states which an Actor can be in, as it forces you explicitly define the
+states, the members of each state, as well as the state transitions that occur
+when each state receives each message. When the number of distinct states grows,
+`StateMachineActor` can be significantly easier to use than `SimpleActor`.