From 60a23d3250db88f6147adf4e74f7497f870cd2ec Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sun, 15 Sep 2019 13:20:44 +0800 Subject: Move `internal.BatchActor` to `util.BatchActor` --- cask/src/cask/endpoints/WebSocketEndpoint.scala | 8 ++--- cask/src/cask/internal/BatchActor.scala | 40 ------------------------- cask/src/cask/package.scala | 3 ++ cask/src/cask/util/BatchActor.scala | 40 +++++++++++++++++++++++++ 4 files changed, 47 insertions(+), 44 deletions(-) delete mode 100644 cask/src/cask/internal/BatchActor.scala create mode 100644 cask/src/cask/util/BatchActor.scala (limited to 'cask') diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala index a836321..fae7fde 100644 --- a/cask/src/cask/endpoints/WebSocketEndpoint.scala +++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala @@ -2,7 +2,7 @@ package cask.endpoints import java.nio.ByteBuffer -import cask.internal.{BatchActor, Router} +import cask.internal.Router import cask.model.Request import cask.util.Logger import io.undertow.websockets.WebSocketConnectionCallback @@ -32,7 +32,7 @@ class websocket(val path: String, override val subpath: Boolean = false) def wrapPathSegment(s: String): Seq[String] = Seq(s) } -case class WsHandler(f: WsChannelActor => BatchActor[WsActor.Event]) +case class WsHandler(f: WsChannelActor => cask.util.BatchActor[WsActor.Event]) (implicit ec: ExecutionContext, log: Logger) extends WebsocketResult with WebSocketConnectionCallback { def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { @@ -71,7 +71,7 @@ extends WebsocketResult with WebSocketConnectionCallback { class WsChannelActor(channel: WebSocketChannel) (implicit ec: ExecutionContext, log: Logger) -extends BatchActor[WsActor.Event]{ +extends cask.util.BatchActor[WsActor.Event]{ def run(items: Seq[WsActor.Event]): Unit = items.foreach{ case WsActor.Text(value) => WebSockets.sendTextBlocking(value, channel) case WsActor.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) @@ -83,7 +83,7 @@ extends BatchActor[WsActor.Event]{ case class WsActor(handle: PartialFunction[WsActor.Event, Unit]) (implicit ec: ExecutionContext, log: Logger) -extends BatchActor[WsActor.Event]{ +extends cask.util.BatchActor[WsActor.Event]{ def run(items: Seq[WsActor.Event]): Unit = { items.foreach(handle.applyOrElse(_, (x: WsActor.Event) => ())) } diff --git a/cask/src/cask/internal/BatchActor.scala b/cask/src/cask/internal/BatchActor.scala deleted file mode 100644 index 60b5f57..0000000 --- a/cask/src/cask/internal/BatchActor.scala +++ /dev/null @@ -1,40 +0,0 @@ -package cask.internal - -import cask.util.Logger - -import scala.collection.mutable -import scala.concurrent.ExecutionContext - -/** - * A simple asynchrous actor, allowing safe concurrent asynchronous processing - * of queued items. `run` handles items in batches, to allow for batch - * processing optimizations to be used where relevant. - */ -abstract class BatchActor[T]()(implicit ec: ExecutionContext, - log: Logger) { - def run(items: Seq[T]): Unit - - private val queue = new mutable.Queue[T]() - private var scheduled = false - def send(t: => T): Unit = synchronized{ - queue.enqueue(t) - if (!scheduled){ - scheduled = true - ec.execute(() => runWithItems()) - } - } - - def runWithItems(): Unit = { - val items = synchronized(queue.dequeueAll(_ => true)) - try run(items) - catch{case e: Throwable => log.exception(e)} - synchronized{ - if (queue.nonEmpty) ec.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - - } -} diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala index 93c1161..d9e29ba 100644 --- a/cask/src/cask/package.scala +++ b/cask/src/cask/package.scala @@ -51,4 +51,7 @@ package object cask { // util type Logger = util.Logger val Logger = util.Logger + + type BatchActor[T] = util.BatchActor[T] + } diff --git a/cask/src/cask/util/BatchActor.scala b/cask/src/cask/util/BatchActor.scala new file mode 100644 index 0000000..137b852 --- /dev/null +++ b/cask/src/cask/util/BatchActor.scala @@ -0,0 +1,40 @@ +package cask.util + +import cask.util.Logger + +import scala.collection.mutable +import scala.concurrent.ExecutionContext + +/** + * A simple asynchrous actor, allowing safe concurrent asynchronous processing + * of queued items. `run` handles items in batches, to allow for batch + * processing optimizations to be used where relevant. + */ +abstract class BatchActor[T]()(implicit ec: ExecutionContext, + log: Logger) { + def run(items: Seq[T]): Unit + + private val queue = new mutable.Queue[T]() + private var scheduled = false + def send(t: => T): Unit = synchronized{ + queue.enqueue(t) + if (!scheduled){ + scheduled = true + ec.execute(() => runWithItems()) + } + } + + def runWithItems(): Unit = { + val items = synchronized(queue.dequeueAll(_ => true)) + try run(items) + catch{case e: Throwable => log.exception(e)} + synchronized{ + if (queue.nonEmpty) ec.execute(() => runWithItems()) + else{ + assert(scheduled) + scheduled = false + } + } + + } +} -- cgit v1.2.3