package cask.actor
import utest._
object JvmActorsTest extends TestSuite{
def tests = Tests{
os.remove.all(os.pwd / "out" / "scratch")
test("lock"){
val rotateSize = 50
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
var logSize = 0
def logLine(s: String): Unit = synchronized{
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
logSize = 0
os.move(logPath, oldPath, replaceExisting = true)
}
os.write.append(logPath, s + "\n", createFolders = true)
}
logLine("I am cow")
logLine("hear me moo")
logLine("I weight twice as much as you")
logLine("And I look good on the barbecue")
logLine("Yoghurt curds cream cheese and butter")
logLine("Comes from liquids from my udder")
logLine("I am cow, I am cow")
logLine("Hear me moo, moooo")
os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder"
os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo"
}
test("actor"){
class Logger(log: os.Path, old: os.Path, rotateSize: Int)
(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = {
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
logSize = s.length
os.move(log, old, replaceExisting = true)
}
os.write.append(log, s + "\n", createFolders = true)
}
private var logSize = 0
}
implicit val ac = new Context.Test()
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
val logger = new Logger(logPath, oldPath, rotateSize = 50)
logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
// Logger hasn't finished yet, running in the background
ac.waitForInactivity()
// Now logger has finished
os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
test("pipeline"){
class Writer(log: os.Path, old: os.Path, rotateSize: Int)
(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = {
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
logSize = s.length
os.move(log, old, replaceExisting = true)
}
os.write.append(log, s + "\n", createFolders = true)
}
private var logSize = 0
}
class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
}
implicit val ac = new Context.Test()
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
val writer = new Writer(logPath, oldPath, rotateSize = 50)
val logger = new Logger(writer)
logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
ac.waitForInactivity()
os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"
def decodeFile(p: os.Path) = {
os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
}
decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
test("batch"){
sealed trait Msg
case class Text(value: String) extends Msg
case class Rotate() extends Msg
class Writer(log: os.Path, old: os.Path)
(implicit ac: Context) extends BatchActor[Msg]{
def runBatch(msgs: Seq[Msg]): Unit = {
msgs.lastIndexOf(Rotate()) match{
case -1 => os.write.append(log, groupMsgs(msgs), createFolders = true)
case rotateIndex =>
val prevRotateIndex = msgs.lastIndexOf(Rotate(), rotateIndex - 1)
if (prevRotateIndex != -1) os.remove.all(log)
os.write.append(log, groupMsgs(msgs.slice(prevRotateIndex, rotateIndex)), createFolders = true)
os.move(log, old, replaceExisting = true)
os.write.over(log, groupMsgs(msgs.drop(rotateIndex)), createFolders = true)
}
}
def groupMsgs(msgs: Seq[Msg]) = msgs.collect{case Text(value) => value}.mkString("\n") + "\n"
}
class Logger(dest: Actor[Msg], rotateSize: Int)
(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = {
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
logSize = s.length
dest.send(Rotate())
}
dest.send(Text(s))
}
private var logSize = 0
}
implicit val ac = new Context.Test()
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
val writer = new Writer(logPath, oldPath)
val logger = new Logger(writer, rotateSize = 50)
logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
// Logger hasn't finished yet, running in the background
ac.waitForInactivity()
// Now logger has finished
os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
test("debounce"){
sealed trait Msg
case class Debounced() extends Msg
case class Text(value: String) extends Msg
class Logger(log: os.Path, debounceTime: java.time.Duration)
(implicit ac: Context) extends StateMachineActor[Msg]{
def initialState = Idle()
case class Idle() extends State({
case Text(value) =>
ac.scheduleMsg(this, Debounced(), debounceTime)
Buffering(Vector(value))
})
case class Buffering(buffer: Vector[String]) extends State({
case Text(value) => Buffering(buffer :+ value)
case Debounced() =>
os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true)
Idle()
})
}
implicit val ac = new Context.Test()
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val logger = new Logger(logPath, java.time.Duration.ofMillis(50))
logger.send(Text("I am cow"))
logger.send(Text("hear me moo"))
Thread.sleep(100)
logger.send(Text("I weight twice as much as you"))
logger.send(Text("And I look good on the barbecue"))
Thread.sleep(100)
logger.send(Text("Yoghurt curds cream cheese and butter"))
logger.send(Text("Comes from liquids from my udder"))
logger.send(Text("I am cow, I am cow"))
logger.send(Text("Hear me moo, moooo"))
ac.waitForInactivity()
os.read.lines(logPath) ==> Seq(
"I am cow hear me moo",
"I weight twice as much as you And I look good on the barbecue",
"Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo",
)
}
test("log"){
sealed trait Msg
case class Debounced() extends Msg
case class Text(value: String) extends Msg
class Logger(log: os.Path, debounceTime: java.time.Duration)
(implicit ac: Context) extends StateMachineActor[Msg]{
def initialState = Idle()
case class Idle() extends State({
case Text(value) =>
ac.scheduleMsg(this, Debounced(), debounceTime)
Buffering(Vector(value))
})
case class Buffering(buffer: Vector[String]) extends State({
case Text(value) => Buffering(buffer :+ value)
case Debounced() =>
os.write.append(log, buffer.mkString(" ") + "\n", createFolders = true)
Idle()
})
override def run(msg: Msg): Unit = {
println(s"$state + $msg -> ")
super.run(msg)
println(state)
}
}
implicit val ac = new Context.Test()
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val logger = new Logger(logPath, java.time.Duration.ofMillis(50))
logger.send(Text("I am cow"))
// Idle() + Text(I am cow) ->
// Buffering(Vector(I am cow))
logger.send(Text("hear me moo"))
// Buffering(Vector(I am cow)) + Text(hear me moo) ->
// Buffering(Vector(I am cow, hear me moo))
Thread.sleep(100)
// Buffering(Vector(I am cow, hear me moo)) + Debounced() ->
// Idle()
logger.send(Text("I weight twice as much as you"))
// Idle() + Text(I weight twice as much as you) ->
// Buffering(Vector(I weight twice as much as you))
logger.send(Text("And I look good on the barbecue"))
// Buffering(Vector(I weight twice as much as you)) + Text(And I look good on the barbecue) ->
// Buffering(Vector(I weight twice as much as you, And I look good on the barbecue))
Thread.sleep(100)
// Buffering(Vector(I weight twice as much as you, And I look good on the barbecue)) + Debounced() ->
// Idle()
logger.send(Text("Yoghurt curds cream cheese and butter"))
// Idle() + Text(Yoghurt curds cream cheese and butter) ->
// Buffering(Vector(Yoghurt curds cream cheese and butter))
logger.send(Text("Comes from liquids from my udder"))
// Buffering(Vector(Yoghurt curds cream cheese and butter)) +
// Text(Comes from liquids from my udder) -> Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder))
logger.send(Text("I am cow, I am cow"))
// Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder)) + Text(I am cow, I am cow) ->
// Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow))
logger.send(Text("Hear me moo, moooo"))
// Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow)) + Text(Hear me moo, moooo) ->
// Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow, Hear me moo, moooo))
ac.waitForInactivity()
// Buffering(Vector(Yoghurt curds cream cheese and butter, Comes from liquids from my udder, I am cow, I am cow, Hear me moo, moooo)) + Debounced() ->
// Idle()
os.read.lines(logPath) ==> Seq(
"I am cow hear me moo",
"I weight twice as much as you And I look good on the barbecue",
"Yoghurt curds cream cheese and butter Comes from liquids from my udder I am cow, I am cow Hear me moo, moooo",
)
}
test("pipelineLog"){
class Writer(log: os.Path, old: os.Path, rotateSize: Int)
(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = {
val newLogSize = logSize + s.length + 1
if (newLogSize <= rotateSize) logSize = newLogSize
else {
logSize = s.length
os.move(log, old, replaceExisting = true)
}
os.write.append(log, s + "\n", createFolders = true)
}
private var logSize = 0
}
class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
}
implicit val ac = new Context.Test(
scala.concurrent.ExecutionContext.fromExecutor(
java.util.concurrent.Executors.newSingleThreadExecutor()
)
){
override def reportRun(a: Actor[_], msg: Any, token: Context.Token): Unit = {
println(s"$a <- $msg")
super.reportRun(a, msg, token)
}
}
val logPath = os.pwd / "out" / "scratch" / "log.txt"
val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
val writer = new Writer(logPath, oldPath, rotateSize = 50)
val logger = new Logger(writer)
logger.send("I am cow")
logger.send("hear me moo")
logger.send("I weight twice as much as you")
logger.send("And I look good on the barbecue")
logger.send("Yoghurt curds cream cheese and butter")
logger.send("Comes from liquids from my udder")
logger.send("I am cow, I am cow")
logger.send("Hear me moo, moooo")
ac.waitForInactivity()
os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"
def decodeFile(p: os.Path) = {
os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
}
decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
}
}
}