From c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sun, 3 Nov 2019 17:33:31 +0800 Subject: Flesh out `BatchActor.scala` into its own module, `cask.Actor`. Add the first unit test for an asynchronous logging actor --- cask/util/src/cask/util/BatchActor.scala | 46 -------------------------------- cask/util/src/cask/util/WsClient.scala | 14 +++++----- 2 files changed, 7 insertions(+), 53 deletions(-) delete mode 100644 cask/util/src/cask/util/BatchActor.scala (limited to 'cask/util') diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala deleted file mode 100644 index 4985fc3..0000000 --- a/cask/util/src/cask/util/BatchActor.scala +++ /dev/null @@ -1,46 +0,0 @@ -package cask.util - -import scala.collection.mutable -import scala.concurrent.ExecutionContext - -/** - * A simple asynchronous actor, allowing safe concurrent asynchronous processing - * of queued items. `run` handles items in batches, to allow for batch - * processing optimizations to be used where relevant. - */ -abstract class BatchActor[T]()(implicit ec: ExecutionContext, - log: Logger) { - def run(items: Seq[T]): Unit - - private val queue = new mutable.Queue[T]() - private var scheduled = false - def send(t: T): Unit = synchronized{ - queue.enqueue(t) - if (!scheduled){ - scheduled = true - ec.execute(() => runWithItems()) - } - } - - private[this] def runWithItems(): Unit = { - val items = synchronized(queue.dequeueAll(_ => true)) - try run(items) - catch{case e: Throwable => log.exception(e)} - synchronized{ - if (queue.nonEmpty) ec.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - } -} - -abstract class StateMachineActor[T]() - (implicit ec: ExecutionContext, - log: Logger) extends BatchActor[T](){ - class State(val run: T => State) - protected[this] def initialState: State - protected[this] var state: State = initialState - def run(items: Seq[T]): Unit = items.foreach{i => state = state.run(i)} -} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala index fbde444..28277bb 100644 --- a/cask/util/src/cask/util/WsClient.scala +++ b/cask/util/src/cask/util/WsClient.scala @@ -4,10 +4,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Promise} class WsClient(impl: WebsocketBase) - (implicit ec: ExecutionContext, log: Logger) - extends cask.util.BatchActor[Ws.Event]{ + (implicit ac: cask.actor.Context, log: Logger) + extends cask.actor.SimpleActor[Ws.Event]{ - def run(items: Seq[Ws.Event]): Unit = items.foreach{ + def run(item: Ws.Event): Unit = item match{ case Ws.Text(s) => impl.send(s) case Ws.Binary(s) => impl.send(s) case Ws.Close(_, _) => impl.close() @@ -18,14 +18,14 @@ class WsClient(impl: WebsocketBase) object WsClient{ def connect(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): WsClient = { + (implicit ac: cask.actor.Context, log: Logger): WsClient = { Await.result(connectAsync(url)(f), Duration.Inf) } def connectAsync(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { - object receiveActor extends cask.util.BatchActor[Ws.Event] { - def run(items: Seq[Ws.Event]) = items.foreach(x => f.applyOrElse(x, (_: Ws.Event) => ())) + (implicit ac: cask.actor.Context, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.actor.SimpleActor[Ws.Event] { + def run(item: Ws.Event) = f.lift(item) } val p = Promise[WsClient] val impl = new WebsocketClientImpl(url) { -- cgit v1.2.3