summaryrefslogtreecommitdiff
path: root/cask/util
diff options
context:
space:
mode:
Diffstat (limited to 'cask/util')
-rw-r--r--cask/util/src/cask/util/BatchActor.scala46
-rw-r--r--cask/util/src/cask/util/WsClient.scala14
2 files changed, 7 insertions, 53 deletions
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala
deleted file mode 100644
index 4985fc3..0000000
--- a/cask/util/src/cask/util/BatchActor.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-package cask.util
-
-import scala.collection.mutable
-import scala.concurrent.ExecutionContext
-
-/**
- * A simple asynchronous actor, allowing safe concurrent asynchronous processing
- * of queued items. `run` handles items in batches, to allow for batch
- * processing optimizations to be used where relevant.
- */
-abstract class BatchActor[T]()(implicit ec: ExecutionContext,
- log: Logger) {
- def run(items: Seq[T]): Unit
-
- private val queue = new mutable.Queue[T]()
- private var scheduled = false
- def send(t: T): Unit = synchronized{
- queue.enqueue(t)
- if (!scheduled){
- scheduled = true
- ec.execute(() => runWithItems())
- }
- }
-
- private[this] def runWithItems(): Unit = {
- val items = synchronized(queue.dequeueAll(_ => true))
- try run(items)
- catch{case e: Throwable => log.exception(e)}
- synchronized{
- if (queue.nonEmpty) ec.execute(() => runWithItems())
- else{
- assert(scheduled)
- scheduled = false
- }
- }
- }
-}
-
-abstract class StateMachineActor[T]()
- (implicit ec: ExecutionContext,
- log: Logger) extends BatchActor[T](){
- class State(val run: T => State)
- protected[this] def initialState: State
- protected[this] var state: State = initialState
- def run(items: Seq[T]): Unit = items.foreach{i => state = state.run(i)}
-}
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala
index fbde444..28277bb 100644
--- a/cask/util/src/cask/util/WsClient.scala
+++ b/cask/util/src/cask/util/WsClient.scala
@@ -4,10 +4,10 @@ import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContext, Promise}
class WsClient(impl: WebsocketBase)
- (implicit ec: ExecutionContext, log: Logger)
- extends cask.util.BatchActor[Ws.Event]{
+ (implicit ac: cask.actor.Context, log: Logger)
+ extends cask.actor.SimpleActor[Ws.Event]{
- def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ def run(item: Ws.Event): Unit = item match{
case Ws.Text(s) => impl.send(s)
case Ws.Binary(s) => impl.send(s)
case Ws.Close(_, _) => impl.close()
@@ -18,14 +18,14 @@ class WsClient(impl: WebsocketBase)
object WsClient{
def connect(url: String)
(f: PartialFunction[cask.util.Ws.Event, Unit])
- (implicit ec: ExecutionContext, log: Logger): WsClient = {
+ (implicit ac: cask.actor.Context, log: Logger): WsClient = {
Await.result(connectAsync(url)(f), Duration.Inf)
}
def connectAsync(url: String)
(f: PartialFunction[cask.util.Ws.Event, Unit])
- (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = {
- object receiveActor extends cask.util.BatchActor[Ws.Event] {
- def run(items: Seq[Ws.Event]) = items.foreach(x => f.applyOrElse(x, (_: Ws.Event) => ()))
+ (implicit ac: cask.actor.Context, log: Logger): scala.concurrent.Future[WsClient] = {
+ object receiveActor extends cask.actor.SimpleActor[Ws.Event] {
+ def run(item: Ws.Event) = f.lift(item)
}
val p = Promise[WsClient]
val impl = new WebsocketClientImpl(url) {