summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-11-03 19:07:33 +0800
committerLi Haoyi <haoyi.sg@gmail.com>2019-11-03 19:07:33 +0800
commit57c550990c66845307f77c9cdb44c13d06a7a5c1 (patch)
treee8934d2c53e6ee2f7f8f1549f5c8841ec2dd66ec
parentc95bf293dbe23fa6c5fd9e23b35a4e4ce34da415 (diff)
downloadcask-57c550990c66845307f77c9cdb44c13d06a7a5c1.tar.gz
cask-57c550990c66845307f77c9cdb44c13d06a7a5c1.tar.bz2
cask-57c550990c66845307f77c9cdb44c13d06a7a5c1.zip
Basic readme for Cask actors
-rw-r--r--cask/actor/test/src-jvm/JvmActorsTest.scala4
-rw-r--r--docs/pages/4 - Cask Actors.md200
2 files changed, 203 insertions, 1 deletions
diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala
index 9cfb0d5..61c8e74 100644
--- a/cask/actor/test/src-jvm/JvmActorsTest.scala
+++ b/cask/actor/test/src-jvm/JvmActorsTest.scala
@@ -11,7 +11,7 @@ object JvmActorsTest extends TestSuite{
var logSize = 0
- def logLine(s: String): Unit = {
+ def logLine(s: String): Unit = synchronized{
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
@@ -66,7 +66,9 @@ object JvmActorsTest extends TestSuite{
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
+ // Logger hasn't finished yet, running in the background
ac.waitForInactivity()
+ // Now logger has finished
os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
diff --git a/docs/pages/4 - Cask Actors.md b/docs/pages/4 - Cask Actors.md
new file mode 100644
index 0000000..440200b
--- /dev/null
+++ b/docs/pages/4 - Cask Actors.md
@@ -0,0 +1,200 @@
+Cask ships with a lightweight Actor library, making it very easy for you to
+define asynchronous pipelines. Cask uses these actors to model [websocket server
+and client connections](http://www.lihaoyi.com/cask/#websockets), but you can
+also use them for your own purposes, even outside a web application via the
+standalone `cask-actor` artifact:
+
+
+```scala
+// Mill
+ivy"com.lihaoyi::cask-actor:0.2.9"
+
+// SBT
+"com.lihaoyi" %% "cask-actor" % "0.2.9"
+```
+
+## A Logger Actor
+
+Here is a small demonstration of using a `cask.actor.SimpleActor` to perform
+asynchronous logging to disk:
+
+```scala
+import cask.actor.{SimpleActor, Context}
+class Logger(log: os.Path, old: os.Path, rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ os.move(log, old, replaceExisting = true)
+ }
+ os.write.append(log, s + "\n", createFolders = true)
+ }
+ private var logSize = 0
+}
+
+implicit val ac = new Context.Test()
+
+val logPath = os.pwd / "out" / "scratch" / "log.txt"
+val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+val logger = new Logger(logPath, oldPath, rotateSize = 50)
+
+logger.send("I am cow")
+logger.send("hear me moo")
+logger.send("I weight twice as much as you")
+logger.send("And I look good on the barbecue")
+logger.send("Yoghurt curds cream cheese and butter")
+logger.send("Comes from liquids from my udder")
+logger.send("I am cow, I am cow")
+logger.send("Hear me moo, moooo")
+
+// Logger hasn't finished yet, running in the background
+ac.waitForInactivity()
+// Now logger has finished
+
+os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
+os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+```
+
+All cask actors require a `cask.actor.Context`, which is an extended
+`scala.concurrent.ExecutionContext`. Here we are using `Context.Test`, which
+also provides the handy `waitForInactivity()` method which blocks until all
+asynchronous actor processing has completed.
+
+In the above example, we are defining a single `Logger` actor class, which we
+are instantiating once as `val logger`. We can now send as many messages as we
+want via `logger.send`: while the processing of a message make take some time
+(here are are both writing to disk, as well as providing
+[log-rotation](https://en.wikipedia.org/wiki/Log_rotation) to avoid the logfile
+growing in size forever) the fact that it's in a separate actor means the
+processing happens in the background without slowing down the main logic of your
+program. This is ideal for scenarios where the dataflow is one way: e.g. when
+logging, you only write logs, and never need to wait for the results of
+processing them.
+
+Note that `logger.send` is thread-safe: multiple threads can be sending logging
+messages to the `logger` at once, and the `.send` method will make sure the
+messages are properly queued up and executed. At no point will a thread calling
+`.send` end up blocking another thread from executing.
+
+## Strawman: Synchronized Logging
+
+To illustrate further the use case of actors, let us consider the earlier
+example but using a `synchronized` method instead of a `cask.actor.SimpleActor`
+to perform the logging:
+
+```scala
+val rotateSize = 50
+val logPath = os.pwd / "out" / "scratch" / "log.txt"
+val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+var logSize = 0
+
+def logLine(s: String): Unit = synchronized{
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = 0
+ os.move(logPath, oldPath, replaceExisting = true)
+ }
+
+ os.write.append(logPath, s + "\n", createFolders = true)
+}
+
+logLine("I am cow")
+logLine("hear me moo")
+logLine("I weight twice as much as you")
+logLine("And I look good on the barbecue")
+logLine("Yoghurt curds cream cheese and butter")
+logLine("Comes from liquids from my udder")
+logLine("I am cow, I am cow")
+logLine("Hear me moo, moooo")
+
+os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder"
+os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo"
+```
+
+This is similar to the earlier Actor example, but with two main caveats:
+
+- Your program execution stops when calling `logLine`, until the call to
+ `logLine` completes. Thus the calls to `logLine` can end up slowing down your
+ program, even though your program really doesn't need the result of `logLine`
+ in order to make progress
+
+- Since `logLine` ends up managing some global mutable state (writing to and
+ rotating log files) we need to make it `synchronized`. That means that if
+ multiple threads in your program are calling `logLine`, it is possible that
+ some threads will be blocked waiting for other threads to complete their
+ `logLine` calls.
+
+Using Cask Actors to perform logging avoids both these issues: calls to
+`logger.send` happen in the background without slowing down your main program,
+and multiple threads can call `logger.send` without being blocked by each other.
+
+## Actor Pipelines
+
+Another advantage of Actors is that you can get pipelined parallelism when
+processing data. In the following example, we define two actor classes `Writer`
+and `Logger`, and two actors `val writer` and `val logger`. `Writer` handles the
+same writing-strings-to-disk-and-rotating-log-files logic we saw earlier, while
+`Logger` adds another step of encoding the data (here just using Base64) before
+it gets written to disk:
+
+```scala
+class Writer(log: os.Path, old: os.Path, rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ os.move(log, old, replaceExisting = true)
+ }
+ os.write.append(log, s + "\n", createFolders = true)
+ }
+ private var logSize = 0
+}
+
+class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
+}
+
+implicit val ac = new Context.Test()
+
+val logPath = os.pwd / "out" / "scratch" / "log.txt"
+val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+val writer = new Writer(logPath, oldPath, rotateSize = 50)
+val logger = new Logger(writer)
+
+logger.send("I am cow")
+logger.send("hear me moo")
+logger.send("I weight twice as much as you")
+logger.send("And I look good on the barbecue")
+logger.send("Yoghurt curds cream cheese and butter")
+logger.send("Comes from liquids from my udder")
+logger.send("I am cow, I am cow")
+logger.send("Hear me moo, moooo")
+
+ac.waitForInactivity()
+
+os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
+os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"
+
+def decodeFile(p: os.Path) = {
+ os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
+}
+
+decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
+decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+```
+
+Although we have added another Base64 encoding step to the logging process, this
+new step lives in a separate actor from the original write-to-disk step, and
+both of these can run in parallel as well as in parallel with the main logic. By
+constructing our data processing flows using Actors, we can take advantage of
+pipeline parallelism to distribute the processing over multiple threads and CPU
+cores, so adding steps to the pipeline neither slows it down nor does it slow
+down the execution of the main program. \ No newline at end of file