diff options
Diffstat (limited to 'cask/util')
-rw-r--r-- | cask/util/src-js/cask/util/Scheduler.scala | 6 | ||||
-rw-r--r-- | cask/util/src-js/cask/util/WebsocketClientImpl.scala | 33 | ||||
-rw-r--r-- | cask/util/src-jvm/cask/util/Scheduler.scala | 16 | ||||
-rw-r--r-- | cask/util/src-jvm/cask/util/WebsocketClientImpl.scala | 39 | ||||
-rw-r--r-- | cask/util/src/cask/util/BatchActor.scala | 11 | ||||
-rw-r--r-- | cask/util/src/cask/util/Logger.scala | 3 | ||||
-rw-r--r-- | cask/util/src/cask/util/WebsocketBase.scala | 14 | ||||
-rw-r--r-- | cask/util/src/cask/util/Ws.scala | 24 | ||||
-rw-r--r-- | cask/util/src/cask/util/WsClient.scala | 55 |
9 files changed, 194 insertions, 7 deletions
diff --git a/cask/util/src-js/cask/util/Scheduler.scala b/cask/util/src-js/cask/util/Scheduler.scala new file mode 100644 index 0000000..dec3450 --- /dev/null +++ b/cask/util/src-js/cask/util/Scheduler.scala @@ -0,0 +1,6 @@ +package cask.util +object Scheduler{ + def schedule(millis: Long)(body: => Unit) = { + scala.scalajs.js.timers.setTimeout(millis)(body) + } +} diff --git a/cask/util/src-js/cask/util/WebsocketClientImpl.scala b/cask/util/src-js/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..f129893 --- /dev/null +++ b/cask/util/src-js/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,33 @@ +package cask.util + +import org.scalajs.dom + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: dom.WebSocket = null + var closed = false + def connect(): Unit = { + websocket = new dom.WebSocket(url) + + websocket.onopen = (e: dom.Event) => onOpen() + websocket.onmessage = (e: dom.MessageEvent) => onMessage(e.data.asInstanceOf[String]) + websocket.onclose = (e: dom.CloseEvent) => { + closed = true + onClose(e.code, e.reason) + } + websocket.onerror = (e: dom.Event) => onError(new Exception(e.toString)) + } + def onOpen(): Unit + + def send(value: String) = try { + websocket.send(value) + true + } catch{case e: scala.scalajs.js.JavaScriptException => false} + + + def send(value: Array[Byte]) = ??? + def onError(ex: Exception): Unit + def onMessage(value: String): Unit + def onClose(code: Int, reason: String): Unit + def close(): Unit = websocket.close() + def isClosed() = closed +}
\ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/Scheduler.scala b/cask/util/src-jvm/cask/util/Scheduler.scala new file mode 100644 index 0000000..74db37e --- /dev/null +++ b/cask/util/src-jvm/cask/util/Scheduler.scala @@ -0,0 +1,16 @@ +package cask.util + +import java.util.concurrent.{Executors, TimeUnit} + +object Scheduler{ + val scheduler = Executors.newSingleThreadScheduledExecutor() + def schedule(millis: Long)(body: => Unit) = { + scheduler.schedule( + new Runnable { + def run(): Unit = body + }, + millis, + TimeUnit.MILLISECONDS + ) + } +}
\ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..5570356 --- /dev/null +++ b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,39 @@ +package cask.util +import org.java_websocket.client.WebSocketClient +import org.java_websocket.handshake.ServerHandshake + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: Client = null + + def connect(): Unit = { + websocket = new Client() + websocket.connect() + } + def onOpen(): Unit + def onMessage(message: String): Unit + def send(message: String) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def send(message: Array[Byte]) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def onClose(code: Int, reason: String): Unit + def onError(ex: Exception): Unit + def close(): Unit = websocket.close() + def isClosed() = websocket.isClosed() + class Client() extends WebSocketClient(new java.net.URI(url)){ + def onOpen(handshakedata: ServerHandshake) = { + WebsocketClientImpl.this.onOpen() + } + def onMessage(message: String) = WebsocketClientImpl.this.onMessage(message) + def onClose(code: Int, reason: String, remote: Boolean) = WebsocketClientImpl.this.onClose(code, reason) + def onError(ex: Exception) = WebsocketClientImpl.this.onError(ex) + + } +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala index 137b852..26f1c14 100644 --- a/cask/util/src/cask/util/BatchActor.scala +++ b/cask/util/src/cask/util/BatchActor.scala @@ -1,12 +1,10 @@ package cask.util -import cask.util.Logger - import scala.collection.mutable import scala.concurrent.ExecutionContext /** - * A simple asynchrous actor, allowing safe concurrent asynchronous processing + * A simple asynchronous actor, allowing safe concurrent asynchronous processing * of queued items. `run` handles items in batches, to allow for batch * processing optimizations to be used where relevant. */ @@ -16,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, private val queue = new mutable.Queue[T]() private var scheduled = false - def send(t: => T): Unit = synchronized{ + def send(t: T): Unit = synchronized{ queue.enqueue(t) if (!scheduled){ scheduled = true @@ -24,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, } } - def runWithItems(): Unit = { + private[this] def runWithItems(): Unit = { val items = synchronized(queue.dequeueAll(_ => true)) try run(items) catch{case e: Throwable => log.exception(e)} @@ -35,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, scheduled = false } } - } -} +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/Logger.scala b/cask/util/src/cask/util/Logger.scala index 8dc3156..2afbb48 100644 --- a/cask/util/src/cask/util/Logger.scala +++ b/cask/util/src/cask/util/Logger.scala @@ -8,6 +8,9 @@ trait Logger { def debug(t: sourcecode.Text[Any])(implicit f: sourcecode.File, line: sourcecode.Line): Unit } object Logger{ + object Console { + implicit object globalLogger extends Console() + } class Console() extends Logger{ def exception(t: Throwable): Unit = t.printStackTrace() diff --git a/cask/util/src/cask/util/WebsocketBase.scala b/cask/util/src/cask/util/WebsocketBase.scala new file mode 100644 index 0000000..bcfdae5 --- /dev/null +++ b/cask/util/src/cask/util/WebsocketBase.scala @@ -0,0 +1,14 @@ +package cask.util + +abstract class WebsocketBase{ + def connect(): Unit + def onOpen(): Unit + def onMessage(message: String): Unit + def onMessage(message: Array[Byte]): Unit + def send(message: String): Boolean + def send(message: Array[Byte]): Boolean + def onClose(code: Int, reason: String): Unit + def close(): Unit + def isClosed(): Boolean + def onError(ex: Exception): Unit +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/Ws.scala b/cask/util/src/cask/util/Ws.scala new file mode 100644 index 0000000..52e7260 --- /dev/null +++ b/cask/util/src/cask/util/Ws.scala @@ -0,0 +1,24 @@ +package cask.util + +object Ws{ + trait Event + case class Text(value: String) extends Event + case class Binary(value: Array[Byte]) extends Event + case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event + case class Error(e: Throwable) extends Event + case class ChannelClosed() extends Event + object Close{ + // Taken from io.undertow.websockets.core.CloseMessage.* + val NormalClosure = 1000 + val GoingAway = 1001 + val WrongCode = 1002 + val ProtocolError = 1003 + val MsgContainsInvalidData = 1007 + val MsgViolatesPolicy = 1008 + val MsgTooBig = 1009 + val MissingExtensions = 1010 + val UnexpectedError = 1011 + } +} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala new file mode 100644 index 0000000..b0f56d0 --- /dev/null +++ b/cask/util/src/cask/util/WsClient.scala @@ -0,0 +1,55 @@ +package cask.util + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Promise} + +class WsClient(impl: WebsocketBase) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger) + extends cask.util.BatchActor[Ws.Event]{ + + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(s) => impl.send(s) + case Ws.Binary(s) => impl.send(s) + case Ws.Close(_, _) => impl.close() + case Ws.ChannelClosed() => impl.close() + } + def close() = impl.close() +} + +object WsClient{ + def connect(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): WsClient = { + Await.result(connectAsync(url)(f), Duration.Inf) + } + def connectAsync(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.util.BatchActor[Ws.Event] { + def run(items: Seq[Ws.Event]) = items.collect(f) + } + val p = Promise[WsClient] + val impl = new WebsocketClientImpl(url) { + def onOpen() = { + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onMessage(message: String) = { + receiveActor.send(Ws.Text(message)) + } + def onMessage(message: Array[Byte]) = { + receiveActor.send(Ws.Binary(message)) + } + def onClose(code: Int, reason: String) = { + receiveActor.send(Ws.Close(code, reason)) + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onError(ex: Exception): Unit = { + receiveActor.send(Ws.Error(ex)) + } + } + + impl.connect() + p.future + } +}
\ No newline at end of file |