diff options
author | Li Haoyi <haoyi.sg@gmail.com> | 2019-11-03 17:33:31 +0800 |
---|---|---|
committer | Li Haoyi <haoyi.sg@gmail.com> | 2019-11-03 18:33:31 +0800 |
commit | c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415 (patch) | |
tree | 48006bcf854ef635f64da57dd23f59027af652ce /cask/util | |
parent | 12a91e2b6c78cd347996663f56eadb9616834823 (diff) | |
download | cask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.tar.gz cask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.tar.bz2 cask-c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415.zip |
Flesh out `BatchActor.scala` into its own module, `cask.Actor`. Add the first unit test for an asynchronous logging actor
Diffstat (limited to 'cask/util')
-rw-r--r-- | cask/util/src/cask/util/BatchActor.scala | 46 | ||||
-rw-r--r-- | cask/util/src/cask/util/WsClient.scala | 14 |
2 files changed, 7 insertions, 53 deletions
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala deleted file mode 100644 index 4985fc3..0000000 --- a/cask/util/src/cask/util/BatchActor.scala +++ /dev/null @@ -1,46 +0,0 @@ -package cask.util - -import scala.collection.mutable -import scala.concurrent.ExecutionContext - -/** - * A simple asynchronous actor, allowing safe concurrent asynchronous processing - * of queued items. `run` handles items in batches, to allow for batch - * processing optimizations to be used where relevant. - */ -abstract class BatchActor[T]()(implicit ec: ExecutionContext, - log: Logger) { - def run(items: Seq[T]): Unit - - private val queue = new mutable.Queue[T]() - private var scheduled = false - def send(t: T): Unit = synchronized{ - queue.enqueue(t) - if (!scheduled){ - scheduled = true - ec.execute(() => runWithItems()) - } - } - - private[this] def runWithItems(): Unit = { - val items = synchronized(queue.dequeueAll(_ => true)) - try run(items) - catch{case e: Throwable => log.exception(e)} - synchronized{ - if (queue.nonEmpty) ec.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - } -} - -abstract class StateMachineActor[T]() - (implicit ec: ExecutionContext, - log: Logger) extends BatchActor[T](){ - class State(val run: T => State) - protected[this] def initialState: State - protected[this] var state: State = initialState - def run(items: Seq[T]): Unit = items.foreach{i => state = state.run(i)} -} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala index fbde444..28277bb 100644 --- a/cask/util/src/cask/util/WsClient.scala +++ b/cask/util/src/cask/util/WsClient.scala @@ -4,10 +4,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Promise} class WsClient(impl: WebsocketBase) - (implicit ec: ExecutionContext, log: Logger) - extends cask.util.BatchActor[Ws.Event]{ + (implicit ac: cask.actor.Context, log: Logger) + extends cask.actor.SimpleActor[Ws.Event]{ - def run(items: Seq[Ws.Event]): Unit = items.foreach{ + def run(item: Ws.Event): Unit = item match{ case Ws.Text(s) => impl.send(s) case Ws.Binary(s) => impl.send(s) case Ws.Close(_, _) => impl.close() @@ -18,14 +18,14 @@ class WsClient(impl: WebsocketBase) object WsClient{ def connect(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): WsClient = { + (implicit ac: cask.actor.Context, log: Logger): WsClient = { Await.result(connectAsync(url)(f), Duration.Inf) } def connectAsync(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { - object receiveActor extends cask.util.BatchActor[Ws.Event] { - def run(items: Seq[Ws.Event]) = items.foreach(x => f.applyOrElse(x, (_: Ws.Event) => ())) + (implicit ac: cask.actor.Context, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.actor.SimpleActor[Ws.Event] { + def run(item: Ws.Event) = f.lift(item) } val p = Promise[WsClient] val impl = new WebsocketClientImpl(url) { |