summaryrefslogtreecommitdiff
path: root/cask/util/src
diff options
context:
space:
mode:
Diffstat (limited to 'cask/util/src')
-rw-r--r--cask/util/src/cask/util/BatchActor.scala9
-rw-r--r--cask/util/src/cask/util/Logger.scala3
-rw-r--r--cask/util/src/cask/util/WebsocketBase.scala14
-rw-r--r--cask/util/src/cask/util/Ws.scala24
-rw-r--r--cask/util/src/cask/util/WsClient.scala55
5 files changed, 100 insertions, 5 deletions
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala
index bee4386..26f1c14 100644
--- a/cask/util/src/cask/util/BatchActor.scala
+++ b/cask/util/src/cask/util/BatchActor.scala
@@ -4,7 +4,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
/**
- * A simple asynchrous actor, allowing safe concurrent asynchronous processing
+ * A simple asynchronous actor, allowing safe concurrent asynchronous processing
* of queued items. `run` handles items in batches, to allow for batch
* processing optimizations to be used where relevant.
*/
@@ -14,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
private val queue = new mutable.Queue[T]()
private var scheduled = false
- def send(t: => T): Unit = synchronized{
+ def send(t: T): Unit = synchronized{
queue.enqueue(t)
if (!scheduled){
scheduled = true
@@ -22,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
}
}
- def runWithItems(): Unit = {
+ private[this] def runWithItems(): Unit = {
val items = synchronized(queue.dequeueAll(_ => true))
try run(items)
catch{case e: Throwable => log.exception(e)}
@@ -33,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
scheduled = false
}
}
-
}
-}
+} \ No newline at end of file
diff --git a/cask/util/src/cask/util/Logger.scala b/cask/util/src/cask/util/Logger.scala
index 8dc3156..2afbb48 100644
--- a/cask/util/src/cask/util/Logger.scala
+++ b/cask/util/src/cask/util/Logger.scala
@@ -8,6 +8,9 @@ trait Logger {
def debug(t: sourcecode.Text[Any])(implicit f: sourcecode.File, line: sourcecode.Line): Unit
}
object Logger{
+ object Console {
+ implicit object globalLogger extends Console()
+ }
class Console() extends Logger{
def exception(t: Throwable): Unit = t.printStackTrace()
diff --git a/cask/util/src/cask/util/WebsocketBase.scala b/cask/util/src/cask/util/WebsocketBase.scala
new file mode 100644
index 0000000..bcfdae5
--- /dev/null
+++ b/cask/util/src/cask/util/WebsocketBase.scala
@@ -0,0 +1,14 @@
+package cask.util
+
+abstract class WebsocketBase{
+ def connect(): Unit
+ def onOpen(): Unit
+ def onMessage(message: String): Unit
+ def onMessage(message: Array[Byte]): Unit
+ def send(message: String): Boolean
+ def send(message: Array[Byte]): Boolean
+ def onClose(code: Int, reason: String): Unit
+ def close(): Unit
+ def isClosed(): Boolean
+ def onError(ex: Exception): Unit
+} \ No newline at end of file
diff --git a/cask/util/src/cask/util/Ws.scala b/cask/util/src/cask/util/Ws.scala
new file mode 100644
index 0000000..52e7260
--- /dev/null
+++ b/cask/util/src/cask/util/Ws.scala
@@ -0,0 +1,24 @@
+package cask.util
+
+object Ws{
+ trait Event
+ case class Text(value: String) extends Event
+ case class Binary(value: Array[Byte]) extends Event
+ case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event
+ case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event
+ case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event
+ case class Error(e: Throwable) extends Event
+ case class ChannelClosed() extends Event
+ object Close{
+ // Taken from io.undertow.websockets.core.CloseMessage.*
+ val NormalClosure = 1000
+ val GoingAway = 1001
+ val WrongCode = 1002
+ val ProtocolError = 1003
+ val MsgContainsInvalidData = 1007
+ val MsgViolatesPolicy = 1008
+ val MsgTooBig = 1009
+ val MissingExtensions = 1010
+ val UnexpectedError = 1011
+ }
+}
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala
new file mode 100644
index 0000000..b0f56d0
--- /dev/null
+++ b/cask/util/src/cask/util/WsClient.scala
@@ -0,0 +1,55 @@
+package cask.util
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Promise}
+
+class WsClient(impl: WebsocketBase)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger)
+ extends cask.util.BatchActor[Ws.Event]{
+
+ def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ case Ws.Text(s) => impl.send(s)
+ case Ws.Binary(s) => impl.send(s)
+ case Ws.Close(_, _) => impl.close()
+ case Ws.ChannelClosed() => impl.close()
+ }
+ def close() = impl.close()
+}
+
+object WsClient{
+ def connect(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): WsClient = {
+ Await.result(connectAsync(url)(f), Duration.Inf)
+ }
+ def connectAsync(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = {
+ object receiveActor extends cask.util.BatchActor[Ws.Event] {
+ def run(items: Seq[Ws.Event]) = items.collect(f)
+ }
+ val p = Promise[WsClient]
+ val impl = new WebsocketClientImpl(url) {
+ def onOpen() = {
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onMessage(message: String) = {
+ receiveActor.send(Ws.Text(message))
+ }
+ def onMessage(message: Array[Byte]) = {
+ receiveActor.send(Ws.Binary(message))
+ }
+ def onClose(code: Int, reason: String) = {
+ receiveActor.send(Ws.Close(code, reason))
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onError(ex: Exception): Unit = {
+ receiveActor.send(Ws.Error(ex))
+ }
+ }
+
+ impl.connect()
+ p.future
+ }
+} \ No newline at end of file