summaryrefslogtreecommitdiff
path: root/docs/pages/4 - Cask Actors.md
blob: 440200bf31245861860286c97a7541288faf9c30 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
Cask ships with a lightweight Actor library, making it very easy for you to
define asynchronous pipelines. Cask uses these actors to model [websocket server
and client connections](http://www.lihaoyi.com/cask/#websockets), but you can
also use them for your own purposes, even outside a web application via the
standalone `cask-actor` artifact:


```scala
// Mill
ivy"com.lihaoyi::cask-actor:0.2.9"

// SBT
"com.lihaoyi" %% "cask-actor" % "0.2.9"
```

## A Logger Actor

Here is a small demonstration of using a `cask.actor.SimpleActor` to perform
asynchronous logging to disk:

```scala
import cask.actor.{SimpleActor, Context}
class Logger(log: os.Path, old: os.Path, rotateSize: Int)
            (implicit ac: Context) extends SimpleActor[String]{
  def run(s: String) = {
    val newLogSize = logSize + s.length + 1
    if (newLogSize <= rotateSize) logSize = newLogSize
    else {
      logSize = s.length
      os.move(log, old, replaceExisting = true)
    }
    os.write.append(log, s + "\n", createFolders = true)
  }
  private var logSize = 0
}

implicit val ac = new Context.Test()

val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath  = os.pwd / "out" / "scratch" / "log-old.txt"

val logger = new Logger(logPath, oldPath, rotateSize = 50)

logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")

// Logger hasn't finished yet, running in the background
ac.waitForInactivity()
// Now logger has finished

os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
```

All cask actors require a `cask.actor.Context`, which is an extended
`scala.concurrent.ExecutionContext`. Here we are using `Context.Test`, which
also provides the handy `waitForInactivity()` method which blocks until all
asynchronous actor processing has completed.

In the above example, we are defining a single `Logger` actor class, which we
are instantiating once as `val logger`. We can now send as many messages as we
want via `logger.send`: while the processing of a message make take some time
(here are are both writing to disk, as well as providing
[log-rotation](https://en.wikipedia.org/wiki/Log_rotation) to avoid the logfile
growing in size forever) the fact that it's in a separate actor means the
processing happens in the background without slowing down the main logic of your
program. This is ideal for scenarios where the dataflow is one way: e.g. when
logging, you only write logs, and never need to wait for the results of
processing them.

Note that `logger.send` is thread-safe: multiple threads can be sending logging
messages to the `logger` at once, and the `.send` method will make sure the
messages are properly queued up and executed. At no point will a thread calling
`.send` end up blocking another thread from executing.

## Strawman: Synchronized Logging

To illustrate further the use case of actors, let us consider the earlier
example but using a `synchronized` method instead of a `cask.actor.SimpleActor`
to perform the logging:

```scala
val rotateSize = 50
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"

var logSize = 0

def logLine(s: String): Unit = synchronized{
  val newLogSize = logSize + s.length + 1
  if (newLogSize <= rotateSize) logSize = newLogSize
  else {
    logSize = 0
    os.move(logPath, oldPath, replaceExisting = true)
  }

  os.write.append(logPath, s + "\n", createFolders = true)
}

logLine("I am cow")
logLine("hear me moo")
logLine("I weight twice as much as you")
logLine("And I look good on the barbecue")
logLine("Yoghurt curds cream cheese and butter")
logLine("Comes from liquids from my udder")
logLine("I am cow, I am cow")
logLine("Hear me moo, moooo")

os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder"
os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo"
```

This is similar to the earlier Actor example, but with two main caveats:

- Your program execution stops when calling `logLine`, until the call to
  `logLine` completes. Thus the calls to `logLine` can end up slowing down your
  program, even though your program really doesn't need the result of `logLine`
  in order to make progress

- Since `logLine` ends up managing some global mutable state (writing to and
  rotating log files) we need to make it `synchronized`. That means that if
  multiple threads in your program are calling `logLine`, it is possible that
  some threads will be blocked waiting for other threads to complete their
  `logLine` calls.

Using Cask Actors to perform logging avoids both these issues: calls to
`logger.send` happen in the background without slowing down your main program,
and multiple threads can call `logger.send` without being blocked by each other.

## Actor Pipelines

Another advantage of Actors is that you can get pipelined parallelism when
processing data. In the following example, we define two actor classes `Writer`
and `Logger`, and two actors `val writer` and `val logger`. `Writer` handles the
same writing-strings-to-disk-and-rotating-log-files logic we saw earlier, while
`Logger` adds another step of encoding the data (here just using Base64) before
it gets written to disk:

```scala
class Writer(log: os.Path, old: os.Path, rotateSize: Int)
            (implicit ac: Context) extends SimpleActor[String]{
  def run(s: String) = {
    val newLogSize = logSize + s.length + 1
    if (newLogSize <= rotateSize) logSize = newLogSize
    else {
      logSize = s.length
      os.move(log, old, replaceExisting = true)
    }
    os.write.append(log, s + "\n", createFolders = true)
  }
  private var logSize = 0
}

class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
  def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
}

implicit val ac = new Context.Test()

val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath  = os.pwd / "out" / "scratch" / "log-old.txt"

val writer = new Writer(logPath, oldPath, rotateSize = 50)
val logger = new Logger(writer)

logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")

ac.waitForInactivity()

os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"

def decodeFile(p: os.Path) = {
  os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
}

decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
```

Although we have added another Base64 encoding step to the logging process, this
new step lives in a separate actor from the original write-to-disk step, and
both of these can run in parallel as well as in parallel with the main logic. By
constructing our data processing flows using Actors, we can take advantage of
pipeline parallelism to distribute the processing over multiple threads and CPU
cores, so adding steps to the pipeline neither slows it down nor does it slow
down the execution of the main program.