summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-11-03 17:33:31 +0800
committerLi Haoyi <haoyi.sg@gmail.com>2019-11-03 18:33:31 +0800
commitc95bf293dbe23fa6c5fd9e23b35a4e4ce34da415 (patch)
tree48006bcf854ef635f64da57dd23f59027af652ce
parent12a91e2b6c78cd347996663f56eadb9616834823 (diff)
downloadcask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.tar.gz
cask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.tar.bz2
cask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.zip
Flesh out `BatchActor.scala` into its own module, `cask.Actor`. Add the first unit test for an asynchronous logging actor
-rw-r--r--build.sc50
-rw-r--r--cask/actor/src/Actors.scala78
-rw-r--r--cask/actor/src/Context.scala190
-rw-r--r--cask/actor/test/src-jvm/JvmActorsTest.scala124
-rw-r--r--cask/actor/test/src/ActorsTest.scala17
-rw-r--r--cask/src/cask/endpoints/WebSocketEndpoint.scala18
-rw-r--r--cask/src/cask/main/Routes.scala5
-rw-r--r--cask/src/cask/package.scala2
-rw-r--r--cask/util/src/cask/util/BatchActor.scala46
-rw-r--r--cask/util/src/cask/util/WsClient.scala14
-rw-r--r--example/websockets/app/test/src/ExampleTests.scala5
-rw-r--r--example/websockets2/app/test/src/ExampleTests.scala4
-rw-r--r--example/websockets3/app/test/src/ExampleTests.scala6
-rw-r--r--example/websockets4/app/test/src/ExampleTests.scala5
14 files changed, 482 insertions, 82 deletions
diff --git a/build.sc b/build.sc
index 47ac6bc..b28417c 100644
--- a/build.sc
+++ b/build.sc
@@ -67,19 +67,21 @@ object cask extends CaskModule {
millSourcePath / s"src-$platformSegment"
)
def ivyDeps = Agg(
- ivy"com.lihaoyi::sourcecode:0.1.7",
+ ivy"com.lihaoyi::sourcecode:0.1.8",
ivy"com.lihaoyi::pprint:0.5.5"
)
}
object js extends UtilModule with ScalaJSModule{
+ def moduleDeps = Seq(actor.js)
def platformSegment = "js"
- def scalaJSVersion = "0.6.28"
+ def scalaJSVersion = "0.6.29"
def ivyDeps = super.ivyDeps() ++ Agg(
ivy"org.scala-js::scalajs-dom::0.9.7"
)
}
object jvm extends UtilModule{
+ def moduleDeps = Seq(actor.jvm)
def platformSegment = "jvm"
def ivyDeps = super.ivyDeps() ++ Agg(
ivy"org.java-websocket:Java-WebSocket:1.4.0"
@@ -87,14 +89,52 @@ object cask extends CaskModule {
}
}
+ object actor extends Module {
+ trait ActorModule extends CaskModule {
+ def artifactName = "cask-actor"
+ def platformSegment: String
+ def millSourcePath = super.millSourcePath / os.up
+
+ def sources = T.sources(
+ millSourcePath / "src",
+ millSourcePath / s"src-$platformSegment"
+ )
+
+ def ivyDeps = Agg(ivy"com.lihaoyi::sourcecode::0.1.8")
+
+ trait ActorTestModule extends Tests {
+ def sources = T.sources(
+ millSourcePath / "src",
+ millSourcePath / s"src-$platformSegment"
+ )
+ def testFrameworks = Seq("utest.runner.Framework")
+ def ivyDeps = Agg(ivy"com.lihaoyi::utest::0.7.1")
+ }
+ }
+
+ object js extends ActorModule with ScalaJSModule{
+ def platformSegment = "js"
+ def scalaJSVersion = "0.6.29"
+
+ object test extends ActorTestModule with Tests
+ }
+ object jvm extends ActorModule{
+ def platformSegment = "jvm"
+
+ object test extends ActorTestModule with Tests{
+ def ivyDeps = super.ivyDeps() ++ Agg(
+ ivy"com.lihaoyi::os-lib:0.4.2"
+ )
+ }
+ }
+ }
+
object test extends Tests{
def testFrameworks = Seq("utest.runner.Framework")
def ivyDeps = Agg(
ivy"com.lihaoyi::utest::0.7.1",
- ivy"com.lihaoyi::requests::0.2.0",
-// ivy"org.xerial:sqlite-jdbc:3.18.0",
-// ivy"io.getquill::quill-jdbc:2.6.0"
+ ivy"com.lihaoyi::requests::0.2.0"
)
}
}
diff --git a/cask/actor/src/Actors.scala b/cask/actor/src/Actors.scala
new file mode 100644
index 0000000..69a5289
--- /dev/null
+++ b/cask/actor/src/Actors.scala
@@ -0,0 +1,78 @@
+package cask.actor
+import collection.mutable
+
+abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{
+ private val queue = new mutable.Queue[(T, Context.Token)]()
+
+ private var scheduled = false
+
+ def send(t: T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit = synchronized{
+ val token = ac.reportSchedule(this, t, fileName, line)
+ queue.enqueue((t, token))
+ if (!scheduled){
+ scheduled = true
+ ac.execute(() => runWithItems())
+ }
+ }
+ def sendAsync(f: scala.concurrent.Future[T])
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line) = {
+ f.onComplete{
+ case scala.util.Success(v) => this.send(v)
+ case scala.util.Failure(e) => ac.reportFailure(e)
+ }
+ }
+
+ def runBatch0(msgs: Seq[(T, Context.Token)]): Unit
+ private[this] def runWithItems(): Unit = {
+ val msgs = synchronized(queue.dequeueAll(_ => true))
+
+ runBatch0(msgs)
+
+ synchronized{
+ if (queue.nonEmpty) ac.execute(() => runWithItems())
+ else{
+ assert(scheduled)
+ scheduled = false
+ }
+ }
+ }
+}
+
+abstract class BatchActor[T]()(implicit ac: Context) extends BaseActor[T]{
+ def runBatch(msgs: Seq[T]): Unit
+ def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = {
+ try {
+ msgs.foreach{case (m, token) => ac.reportRun(this, m, token)}
+ runBatch(msgs.map(_._1))
+ }
+ catch{case e: Throwable => ac.reportFailure(e)}
+ finally msgs.foreach{case (m, token) => ac.reportComplete(token)}
+
+ }
+}
+
+abstract class SimpleActor[T]()(implicit ac: Context) extends BaseActor[T]{
+ def run(msg: T): Unit
+ override def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = {
+ for((msg, token) <- msgs) {
+ try {
+ ac.reportRun(this, msg, token)
+ run(msg)
+ }
+ catch{case e: Throwable => ac.reportFailure(e)}
+ finally ac.reportComplete(token)
+ }
+ }
+}
+
+abstract class StateMachineActor[T]()(implicit ac: Context) extends SimpleActor[T]() {
+ class State(val run: T => State)
+ protected[this] def initialState: State
+ protected[this] var state: State = initialState
+ def run(msg: T): Unit = {
+ state = state.run(msg)
+ }
+} \ No newline at end of file
diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala
new file mode 100644
index 0000000..6b56f2e
--- /dev/null
+++ b/cask/actor/src/Context.scala
@@ -0,0 +1,190 @@
+package cask.actor
+import java.util.concurrent.{Executors, ThreadFactory, TimeUnit}
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise}
+import scala.util.Try
+
+/**
+ * An extended `scala.concurrent.ExecutionContext`; provides the ability to
+ * schedule messages to be sent later, and hooks to track the current number of
+ * outstanding tasks or log the actor message sends for debugging purporses
+ */
+trait Context extends ExecutionContext {
+ def reportSchedule(): Context.Token = new Context.Token.Simple()
+
+ def reportSchedule(fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ new Context.Token.Future(fileName, line)
+ }
+
+ def reportSchedule(a: Actor[_],
+ msg: Any,
+ fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ new Context.Token.Send(a, msg, fileName, line)
+ }
+
+ def reportRun(a: Actor[_],
+ msg: Any,
+ token: Context.Token): Unit = ()
+
+ def reportComplete(token: Context.Token): Unit = ()
+
+ def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+
+ def future[T](t: => T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Future[T]
+
+ def execute(runnable: Runnable): Unit
+}
+
+object Context{
+ trait Token
+ object Token{
+ class Simple extends Token(){
+ override def toString = "token@" + Integer.toHexString(hashCode())
+ }
+
+ class Future(val fileName: sourcecode.FileName,
+ val line: sourcecode.Line) extends Token(){
+ override def toString =
+ "token@" + Integer.toHexString(hashCode()) + "@" +
+ fileName.value + ":" + line.value
+ }
+
+ class Send(val a: Actor[_],
+ val msg: Any,
+ val fileName: sourcecode.FileName,
+ val line: sourcecode.Line) extends Token(){
+ override def toString =
+ "token@" + Integer.toHexString(hashCode()) + "@" +
+ fileName.value + ":" + line.value
+ }
+ }
+
+ class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl {
+ def executionContext = ec
+ def reportFailure(t: Throwable) = logEx(t)
+ }
+
+ object Simple{
+ implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace())
+ }
+
+ class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global,
+ logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl {
+ private[this] val active = collection.mutable.Set.empty[Context.Token]
+ private[this] var promise = concurrent.Promise.successful[Unit](())
+
+ def executionContext = ec
+
+ def reportFailure(t: Throwable) = logEx(t)
+
+ def handleReportSchedule(token: Context.Token) = synchronized{
+ if (active.isEmpty) {
+ assert(promise.isCompleted)
+ promise = concurrent.Promise[Unit]
+ }
+ active.add(token)
+ token
+ }
+ override def reportSchedule() = {
+ handleReportSchedule(super.reportSchedule())
+ }
+ override def reportSchedule(fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ handleReportSchedule(super.reportSchedule(fileName, line))
+ }
+
+ override def reportSchedule(a: Actor[_],
+ msg: Any,
+ fileName: sourcecode.FileName,
+ line: sourcecode.Line): Context.Token = {
+ handleReportSchedule(super.reportSchedule(a, msg, fileName, line))
+ }
+
+ override def reportComplete(token: Context.Token) = this.synchronized{
+ assert(active.remove(token))
+
+ if (active.isEmpty) promise.success(())
+ }
+
+ def waitForInactivity(timeout: Option[java.time.Duration] = None) = {
+ Await.result(
+ this.synchronized(promise).future,
+ timeout match{
+ case None => scala.concurrent.duration.Duration.Inf
+ case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos)
+ }
+ )
+ }
+ }
+
+ trait Impl extends Context {
+ def executionContext: ExecutionContext
+
+ def execute(runnable: Runnable): Unit = {
+ val token = reportSchedule()
+ executionContext.execute(new Runnable {
+ def run(): Unit = {
+ try runnable.run()
+ finally reportComplete(token)
+ }
+ })
+ }
+
+ def future[T](t: => T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Future[T] = {
+ val token = reportSchedule(fileName, line)
+ val p = Promise[T]
+ executionContext.execute(new Runnable {
+ def run(): Unit = {
+ p.complete(scala.util.Try(t))
+ reportComplete(token)
+ }
+ })
+ p.future
+ }
+
+ lazy val scheduler = Executors.newSingleThreadScheduledExecutor(
+ new ThreadFactory {
+ def newThread(r: Runnable): Thread = {
+ val t = new Thread(r, "ActorContext-Scheduler-Thread")
+ t.setDaemon(true)
+ t
+ }
+ }
+ )
+
+ def scheduleMsg[T](a: Actor[T],
+ msg: T, delay: java.time.Duration)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line) = {
+ val token = reportSchedule(a, msg, fileName, line)
+ scheduler.schedule[Unit](
+ () => {
+ a.send(msg)(fileName, line)
+ reportComplete(token)
+ },
+ delay.toMillis,
+ TimeUnit.MILLISECONDS
+ )
+ }
+ }
+
+}
+
+trait Actor[T]{
+ def send(t: T)
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+
+ def sendAsync(f: scala.concurrent.Future[T])
+ (implicit fileName: sourcecode.FileName,
+ line: sourcecode.Line): Unit
+}
diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala
new file mode 100644
index 0000000..9cfb0d5
--- /dev/null
+++ b/cask/actor/test/src-jvm/JvmActorsTest.scala
@@ -0,0 +1,124 @@
+package cask.actor
+
+import utest._
+object JvmActorsTest extends TestSuite{
+ def tests = Tests{
+ os.remove.all(os.pwd / "out" / "scratch")
+ test("lock"){
+ val rotateSize = 50
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+ val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+ var logSize = 0
+
+ def logLine(s: String): Unit = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = 0
+ os.move(logPath, oldPath, replaceExisting = true)
+ }
+
+ os.write.append(logPath, s + "\n", createFolders = true)
+ }
+
+ logLine("I am cow")
+ logLine("hear me moo")
+ logLine("I weight twice as much as you")
+ logLine("And I look good on the barbecue")
+ logLine("Yoghurt curds cream cheese and butter")
+ logLine("Comes from liquids from my udder")
+ logLine("I am cow, I am cow")
+ logLine("Hear me moo, moooo")
+
+ os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder"
+ os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo"
+ }
+
+ test("actor"){
+ class Logger(log: os.Path, old: os.Path, rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ os.move(log, old, replaceExisting = true)
+ }
+ os.write.append(log, s + "\n", createFolders = true)
+ }
+ private var logSize = 0
+ }
+
+ implicit val ac = new Context.Test()
+
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+ val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+ val logger = new Logger(logPath, oldPath, rotateSize = 50)
+
+ logger.send("I am cow")
+ logger.send("hear me moo")
+ logger.send("I weight twice as much as you")
+ logger.send("And I look good on the barbecue")
+ logger.send("Yoghurt curds cream cheese and butter")
+ logger.send("Comes from liquids from my udder")
+ logger.send("I am cow, I am cow")
+ logger.send("Hear me moo, moooo")
+
+ ac.waitForInactivity()
+
+ os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder")
+ os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+ }
+
+ test("pipeline"){
+ class Writer(log: os.Path, old: os.Path, rotateSize: Int)
+ (implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = {
+ val newLogSize = logSize + s.length + 1
+ if (newLogSize <= rotateSize) logSize = newLogSize
+ else {
+ logSize = s.length
+ os.move(log, old, replaceExisting = true)
+ }
+ os.write.append(log, s + "\n", createFolders = true)
+ }
+ private var logSize = 0
+ }
+
+ class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{
+ def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes))
+ }
+
+ implicit val ac = new Context.Test()
+
+ val logPath = os.pwd / "out" / "scratch" / "log.txt"
+ val oldPath = os.pwd / "out" / "scratch" / "log-old.txt"
+
+ val writer = new Writer(logPath, oldPath, rotateSize = 50)
+ val logger = new Logger(writer)
+
+ logger.send("I am cow")
+ logger.send("hear me moo")
+ logger.send("I weight twice as much as you")
+ logger.send("And I look good on the barbecue")
+ logger.send("Yoghurt curds cream cheese and butter")
+ logger.send("Comes from liquids from my udder")
+ logger.send("I am cow, I am cow")
+ logger.send("Hear me moo, moooo")
+
+ ac.waitForInactivity()
+
+ os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n"
+ os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n"
+
+ def decodeFile(p: os.Path) = {
+ os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s)))
+ }
+
+ decodeFile(oldPath) ==> Seq("Comes from liquids from my udder")
+ decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo")
+ }
+ }
+} \ No newline at end of file
diff --git a/cask/actor/test/src/ActorsTest.scala b/cask/actor/test/src/ActorsTest.scala
new file mode 100644
index 0000000..2c81e5b
--- /dev/null
+++ b/cask/actor/test/src/ActorsTest.scala
@@ -0,0 +1,17 @@
+package cask.actor
+import utest._
+object ActorsTest extends TestSuite{
+ def tests = Tests{
+ test("hello"){
+ import Context.Simple.global
+
+ sealed trait Msg
+
+ object logger extends SimpleActor[Msg]{
+ def run(msg: Msg) = {
+
+ }
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala
index 905c5f1..fcb40ec 100644
--- a/cask/src/cask/endpoints/WebSocketEndpoint.scala
+++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala
@@ -32,8 +32,8 @@ class websocket(val path: String, override val subpath: Boolean = false)
def wrapPathSegment(s: String): Seq[String] = Seq(s)
}
-case class WsHandler(f: WsChannelActor => cask.util.BatchActor[Ws.Event])
- (implicit ec: ExecutionContext, log: Logger)
+case class WsHandler(f: WsChannelActor => cask.actor.Actor[Ws.Event])
+ (implicit ac: cask.actor.Context, log: Logger)
extends WebsocketResult with WebSocketConnectionCallback {
def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = {
channel.suspendReceives()
@@ -75,9 +75,9 @@ extends WebsocketResult with WebSocketConnectionCallback {
}
class WsChannelActor(channel: WebSocketChannel)
- (implicit ec: ExecutionContext, log: Logger)
-extends cask.util.BatchActor[Ws.Event]{
- def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ (implicit ac: cask.actor.Context, log: Logger)
+extends cask.actor.SimpleActor[Ws.Event]{
+ def run(item: Ws.Event): Unit = item match{
case Ws.Text(value) => WebSockets.sendTextBlocking(value, channel)
case Ws.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel)
case Ws.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel)
@@ -87,10 +87,10 @@ extends cask.util.BatchActor[Ws.Event]{
}
case class WsActor(handle: PartialFunction[Ws.Event, Unit])
- (implicit ec: ExecutionContext, log: Logger)
-extends cask.util.BatchActor[Ws.Event]{
- def run(items: Seq[Ws.Event]): Unit = {
- items.foreach(handle.applyOrElse(_, (x: Ws.Event) => ()))
+ (implicit ac: cask.actor.Context, log: Logger)
+extends cask.actor.SimpleActor[Ws.Event]{
+ def run(item: Ws.Event): Unit = {
+ handle.lift(item)
}
}
diff --git a/cask/src/cask/main/Routes.scala b/cask/src/cask/main/Routes.scala
index 1b83be3..68e3af4 100644
--- a/cask/src/cask/main/Routes.scala
+++ b/cask/src/cask/main/Routes.scala
@@ -7,7 +7,10 @@ import language.experimental.macros
trait Routes{
def decorators = Seq.empty[cask.router.Decorator[_, _, _]]
- implicit def executionContext = concurrent.ExecutionContext.Implicits.global
+ implicit val actorContext = new cask.actor.Context.Simple(
+ concurrent.ExecutionContext.Implicits.global,
+ log.exception
+ )
private[this] var metadata0: RoutesEndpointsMetadata[this.type] = null
def caskMetadata =
if (metadata0 != null) metadata0
diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala
index 7c1d61c..ebba984 100644
--- a/cask/src/cask/package.scala
+++ b/cask/src/cask/package.scala
@@ -55,6 +55,6 @@ package object cask {
type Logger = util.Logger
val Logger = util.Logger
- type BatchActor[T] = util.BatchActor[T]
+ type BatchActor[T] = actor.BatchActor[T]
}
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala
deleted file mode 100644
index 4985fc3..0000000
--- a/cask/util/src/cask/util/BatchActor.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package cask.util
-
-import scala.collection.mutable
-import scala.concurrent.ExecutionContext
-
-/**
- * A simple asynchronous actor, allowing safe concurrent asynchronous processing
- * of queued items. `run` handles items in batches, to allow for batch
- * processing optimizations to be used where relevant.
- */
-abstract class BatchActor[T]()(implicit ec: ExecutionContext,
- log: Logger) {
- def run(items: Seq[T]): Unit
-
- private val queue = new mutable.Queue[T]()
- private var scheduled = false
- def send(t: T): Unit = synchronized{
- queue.enqueue(t)
- if (!scheduled){
- scheduled = true
- ec.execute(() => runWithItems())
- }
- }
-
- private[this] def runWithItems(): Unit = {
- val items = synchronized(queue.dequeueAll(_ => true))
- try run(items)
- catch{case e: Throwable => log.exception(e)}
- synchronized{
- if (queue.nonEmpty) ec.execute(() => runWithItems())
- else{
- assert(scheduled)
- scheduled = false
- }
- }
- }
-}
-
-abstract class StateMachineActor[T]()
- (implicit ec: ExecutionContext,
- log: Logger) extends BatchActor[T](){
- class State(val run: T => State)
- protected[this] def initialState: State
- protected[this] var state: State = initialState
- def run(items: Seq[T]): Unit = items.foreach{i => state = state.run(i)}
-}
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala
index fbde444..28277bb 100644
--- a/cask/util/src/cask/util/WsClient.scala
+++ b/cask/util/src/cask/util/WsClient.scala
@@ -4,10 +4,10 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Promise}
class WsClient(impl: WebsocketBase)
- (implicit ec: ExecutionContext, log: Logger)
- extends cask.util.BatchActor[Ws.Event]{
+ (implicit ac: cask.actor.Context, log: Logger)
+ extends cask.actor.SimpleActor[Ws.Event]{
- def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ def run(item: Ws.Event): Unit = item match{
case Ws.Text(s) => impl.send(s)
case Ws.Binary(s) => impl.send(s)
case Ws.Close(_, _) => impl.close()
@@ -18,14 +18,14 @@ class WsClient(impl: WebsocketBase)
object WsClient{
def connect(url: String)
(f: PartialFunction[cask.util.Ws.Event, Unit])
- (implicit ec: ExecutionContext, log: Logger): WsClient = {
+ (implicit ac: cask.actor.Context, log: Logger): WsClient = {
Await.result(connectAsync(url)(f), Duration.Inf)
}
def connectAsync(url: String)
(f: PartialFunction[cask.util.Ws.Event, Unit])
- (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = {
- object receiveActor extends cask.util.BatchActor[Ws.Event] {
- def run(items: Seq[Ws.Event]) = items.foreach(x => f.applyOrElse(x, (_: Ws.Event) => ()))
+ (implicit ac: cask.actor.Context, log: Logger): scala.concurrent.Future[WsClient] = {
+ object receiveActor extends cask.actor.SimpleActor[Ws.Event] {
+ def run(item: Ws.Event) = f.lift(item)
}
val p = Promise[WsClient]
val impl = new WebsocketClientImpl(url) {
diff --git a/example/websockets/app/test/src/ExampleTests.scala b/example/websockets/app/test/src/ExampleTests.scala
index 4ab6688..9671198 100644
--- a/example/websockets/app/test/src/ExampleTests.scala
+++ b/example/websockets/app/test/src/ExampleTests.scala
@@ -1,13 +1,12 @@
+
package app
import java.util.concurrent.atomic.AtomicInteger
-
+import cask.actor.Context.Simple.global
import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler}
import utest._
-import concurrent.ExecutionContext.Implicits.global
import cask.Logger.Console.globalLogger
-
object ExampleTests extends TestSuite{
diff --git a/example/websockets2/app/test/src/ExampleTests.scala b/example/websockets2/app/test/src/ExampleTests.scala
index 4d70f28..6112b10 100644
--- a/example/websockets2/app/test/src/ExampleTests.scala
+++ b/example/websockets2/app/test/src/ExampleTests.scala
@@ -1,10 +1,10 @@
package app
import java.util.concurrent.atomic.AtomicInteger
-
+import cask.actor.Context.Simple.global
import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler}
import utest._
-import concurrent.ExecutionContext.Implicits.global
+
import cask.Logger.Console.globalLogger
object ExampleTests extends TestSuite{
diff --git a/example/websockets3/app/test/src/ExampleTests.scala b/example/websockets3/app/test/src/ExampleTests.scala
index 00665d5..0d42983 100644
--- a/example/websockets3/app/test/src/ExampleTests.scala
+++ b/example/websockets3/app/test/src/ExampleTests.scala
@@ -1,11 +1,9 @@
package app
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler}
import utest._
-import concurrent.ExecutionContext.Implicits.global
import cask.Logger.Console.globalLogger
+import cask.actor.Context.Simple.global
+
object ExampleTests extends TestSuite{
diff --git a/example/websockets4/app/test/src/ExampleTests.scala b/example/websockets4/app/test/src/ExampleTests.scala
index a2c793b..95f0fba 100644
--- a/example/websockets4/app/test/src/ExampleTests.scala
+++ b/example/websockets4/app/test/src/ExampleTests.scala
@@ -1,11 +1,8 @@
package app
-import java.util.concurrent.atomic.AtomicInteger
-
-import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler}
import utest._
-import concurrent.ExecutionContext.Implicits.global
import cask.Logger.Console.globalLogger
+import cask.actor.Context.Simple.global
object ExampleTests extends TestSuite{