summaryrefslogtreecommitdiff
path: root/cask
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-09-16 22:32:39 +0800
committerGitHub <noreply@github.com>2019-09-16 22:32:39 +0800
commitd22a0e75bded29a28529b87ed5ef9db2f6ae86c9 (patch)
treeed8248ed943327f6011309a10ef6b03a1025ab9f /cask
parentb83eec01c8db8a8aa499d6c498ff85987005fe83 (diff)
parentbfe26d5a9705011359658c45b364e9b65ce697b5 (diff)
downloadcask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.gz
cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.bz2
cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.zip
Merge pull request #14 from lihaoyi-databricks/master
Provide a simple builtin websocket client in `cask.WsClient`
Diffstat (limited to 'cask')
-rw-r--r--cask/src/cask/endpoints/WebSocketEndpoint.scala60
-rw-r--r--cask/src/cask/package.scala3
-rw-r--r--cask/util/src-js/cask/util/Scheduler.scala6
-rw-r--r--cask/util/src-js/cask/util/WebsocketClientImpl.scala33
-rw-r--r--cask/util/src-jvm/cask/util/Scheduler.scala16
-rw-r--r--cask/util/src-jvm/cask/util/WebsocketClientImpl.scala39
-rw-r--r--cask/util/src/cask/util/BatchActor.scala9
-rw-r--r--cask/util/src/cask/util/Logger.scala3
-rw-r--r--cask/util/src/cask/util/WebsocketBase.scala14
-rw-r--r--cask/util/src/cask/util/Ws.scala24
-rw-r--r--cask/util/src/cask/util/WsClient.scala55
11 files changed, 220 insertions, 42 deletions
diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala
index 6ca5def..994f015 100644
--- a/cask/src/cask/endpoints/WebSocketEndpoint.scala
+++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala
@@ -4,7 +4,7 @@ import java.nio.ByteBuffer
import cask.model.Request
import cask.router.Result
-import cask.util.Logger
+import cask.util.{Logger, Ws}
import io.undertow.websockets.WebSocketConnectionCallback
import io.undertow.websockets.core.{AbstractReceiveListener, BufferedBinaryMessage, BufferedTextMessage, CloseMessage, WebSocketChannel, WebSockets}
import io.undertow.websockets.spi.WebSocketHttpExchange
@@ -32,36 +32,41 @@ class websocket(val path: String, override val subpath: Boolean = false)
def wrapPathSegment(s: String): Seq[String] = Seq(s)
}
-case class WsHandler(f: WsChannelActor => cask.util.BatchActor[WsActor.Event])
+case class WsHandler(f: WsChannelActor => cask.util.BatchActor[Ws.Event])
(implicit ec: ExecutionContext, log: Logger)
extends WebsocketResult with WebSocketConnectionCallback {
def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = {
+ channel.suspendReceives()
val actor = f(new WsChannelActor(channel))
+ // Somehow browsers closing tabs and Java processes being killed appear
+ // as different events here; the former goes to AbstractReceiveListener#onClose,
+ // while the latter to ChannelListener#handleEvent. Make sure we handle both cases.
+ channel.addCloseTask(channel => actor.send(Ws.ChannelClosed()))
channel.getReceiveSetter.set(
new AbstractReceiveListener() {
override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = {
- actor.send(WsActor.Text(message.getData))
+ actor.send(Ws.Text(message.getData))
}
override def onFullBinaryMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = {
- actor.send(WsActor.Binary(
+ actor.send(Ws.Binary(
WebSockets.mergeBuffers(message.getData.getResource:_*).array()
))
}
override def onFullPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = {
- actor.send(WsActor.Ping(
+ actor.send(Ws.Ping(
WebSockets.mergeBuffers(message.getData.getResource:_*).array()
))
}
override def onFullPongMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = {
- actor.send(WsActor.Pong(
+ actor.send(Ws.Pong(
WebSockets.mergeBuffers(message.getData.getResource:_*).array()
))
}
override def onCloseMessage(cm: CloseMessage, channel: WebSocketChannel) = {
- actor.send(WsActor.Close(cm.getCode, cm.getReason))
+ actor.send(Ws.Close(cm.getCode, cm.getReason))
}
}
)
@@ -71,40 +76,21 @@ extends WebsocketResult with WebSocketConnectionCallback {
class WsChannelActor(channel: WebSocketChannel)
(implicit ec: ExecutionContext, log: Logger)
-extends cask.util.BatchActor[WsActor.Event]{
- def run(items: Seq[WsActor.Event]): Unit = items.foreach{
- case WsActor.Text(value) => WebSockets.sendTextBlocking(value, channel)
- case WsActor.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel)
- case WsActor.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel)
- case WsActor.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel)
- case WsActor.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel)
+extends cask.util.BatchActor[Ws.Event]{
+ def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ case Ws.Text(value) => WebSockets.sendTextBlocking(value, channel)
+ case Ws.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel)
+ case Ws.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel)
+ case Ws.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel)
+ case Ws.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel)
}
}
-case class WsActor(handle: PartialFunction[WsActor.Event, Unit])
+case class WsActor(handle: PartialFunction[Ws.Event, Unit])
(implicit ec: ExecutionContext, log: Logger)
-extends cask.util.BatchActor[WsActor.Event]{
- def run(items: Seq[WsActor.Event]): Unit = {
- items.foreach(handle.applyOrElse(_, (x: WsActor.Event) => ()))
+extends cask.util.BatchActor[Ws.Event]{
+ def run(items: Seq[Ws.Event]): Unit = {
+ items.foreach(handle.applyOrElse(_, (x: Ws.Event) => ()))
}
}
-object WsActor{
- trait Event
- case class Text(value: String) extends Event
- case class Binary(value: Array[Byte]) extends Event
- case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event
- case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event
- case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event
- object Close{
- val NormalClosure = CloseMessage.NORMAL_CLOSURE
- val GoingAway = CloseMessage.GOING_AWAY
- val WrongCode = CloseMessage.WRONG_CODE
- val ProtocolError = CloseMessage.PROTOCOL_ERROR
- val MsgContainsInvalidData = CloseMessage.MSG_CONTAINS_INVALID_DATA
- val MsgViolatesPolicy = CloseMessage.MSG_VIOLATES_POLICY
- val MsgTooBig = CloseMessage.MSG_TOO_BIG
- val MissingExtensions = CloseMessage.MISSING_EXTENSIONS
- val UnexpectedError = CloseMessage.UNEXPECTED_ERROR
- }
-}
diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala
index d34fe26..7c1d61c 100644
--- a/cask/src/cask/package.scala
+++ b/cask/src/cask/package.scala
@@ -47,6 +47,9 @@ package object cask {
type WsActor = cask.endpoints.WsActor
val WsActor = cask.endpoints.WsActor
type WsChannelActor = cask.endpoints.WsChannelActor
+ type WsClient = cask.util.WsClient
+ val WsClient = cask.util.WsClient
+ val Ws = cask.util.Ws
// util
type Logger = util.Logger
diff --git a/cask/util/src-js/cask/util/Scheduler.scala b/cask/util/src-js/cask/util/Scheduler.scala
new file mode 100644
index 0000000..dec3450
--- /dev/null
+++ b/cask/util/src-js/cask/util/Scheduler.scala
@@ -0,0 +1,6 @@
+package cask.util
+object Scheduler{
+ def schedule(millis: Long)(body: => Unit) = {
+ scala.scalajs.js.timers.setTimeout(millis)(body)
+ }
+}
diff --git a/cask/util/src-js/cask/util/WebsocketClientImpl.scala b/cask/util/src-js/cask/util/WebsocketClientImpl.scala
new file mode 100644
index 0000000..f129893
--- /dev/null
+++ b/cask/util/src-js/cask/util/WebsocketClientImpl.scala
@@ -0,0 +1,33 @@
+package cask.util
+
+import org.scalajs.dom
+
+abstract class WebsocketClientImpl(url: String) extends WebsocketBase{
+ var websocket: dom.WebSocket = null
+ var closed = false
+ def connect(): Unit = {
+ websocket = new dom.WebSocket(url)
+
+ websocket.onopen = (e: dom.Event) => onOpen()
+ websocket.onmessage = (e: dom.MessageEvent) => onMessage(e.data.asInstanceOf[String])
+ websocket.onclose = (e: dom.CloseEvent) => {
+ closed = true
+ onClose(e.code, e.reason)
+ }
+ websocket.onerror = (e: dom.Event) => onError(new Exception(e.toString))
+ }
+ def onOpen(): Unit
+
+ def send(value: String) = try {
+ websocket.send(value)
+ true
+ } catch{case e: scala.scalajs.js.JavaScriptException => false}
+
+
+ def send(value: Array[Byte]) = ???
+ def onError(ex: Exception): Unit
+ def onMessage(value: String): Unit
+ def onClose(code: Int, reason: String): Unit
+ def close(): Unit = websocket.close()
+ def isClosed() = closed
+} \ No newline at end of file
diff --git a/cask/util/src-jvm/cask/util/Scheduler.scala b/cask/util/src-jvm/cask/util/Scheduler.scala
new file mode 100644
index 0000000..74db37e
--- /dev/null
+++ b/cask/util/src-jvm/cask/util/Scheduler.scala
@@ -0,0 +1,16 @@
+package cask.util
+
+import java.util.concurrent.{Executors, TimeUnit}
+
+object Scheduler{
+ val scheduler = Executors.newSingleThreadScheduledExecutor()
+ def schedule(millis: Long)(body: => Unit) = {
+ scheduler.schedule(
+ new Runnable {
+ def run(): Unit = body
+ },
+ millis,
+ TimeUnit.MILLISECONDS
+ )
+ }
+} \ No newline at end of file
diff --git a/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala
new file mode 100644
index 0000000..5570356
--- /dev/null
+++ b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala
@@ -0,0 +1,39 @@
+package cask.util
+import org.java_websocket.client.WebSocketClient
+import org.java_websocket.handshake.ServerHandshake
+
+abstract class WebsocketClientImpl(url: String) extends WebsocketBase{
+ var websocket: Client = null
+
+ def connect(): Unit = {
+ websocket = new Client()
+ websocket.connect()
+ }
+ def onOpen(): Unit
+ def onMessage(message: String): Unit
+ def send(message: String) = try{
+ websocket.send(message)
+ true
+ }catch{
+ case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false
+ }
+ def send(message: Array[Byte]) = try{
+ websocket.send(message)
+ true
+ }catch{
+ case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false
+ }
+ def onClose(code: Int, reason: String): Unit
+ def onError(ex: Exception): Unit
+ def close(): Unit = websocket.close()
+ def isClosed() = websocket.isClosed()
+ class Client() extends WebSocketClient(new java.net.URI(url)){
+ def onOpen(handshakedata: ServerHandshake) = {
+ WebsocketClientImpl.this.onOpen()
+ }
+ def onMessage(message: String) = WebsocketClientImpl.this.onMessage(message)
+ def onClose(code: Int, reason: String, remote: Boolean) = WebsocketClientImpl.this.onClose(code, reason)
+ def onError(ex: Exception) = WebsocketClientImpl.this.onError(ex)
+
+ }
+} \ No newline at end of file
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala
index bee4386..26f1c14 100644
--- a/cask/util/src/cask/util/BatchActor.scala
+++ b/cask/util/src/cask/util/BatchActor.scala
@@ -4,7 +4,7 @@ import scala.collection.mutable
import scala.concurrent.ExecutionContext
/**
- * A simple asynchrous actor, allowing safe concurrent asynchronous processing
+ * A simple asynchronous actor, allowing safe concurrent asynchronous processing
* of queued items. `run` handles items in batches, to allow for batch
* processing optimizations to be used where relevant.
*/
@@ -14,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
private val queue = new mutable.Queue[T]()
private var scheduled = false
- def send(t: => T): Unit = synchronized{
+ def send(t: T): Unit = synchronized{
queue.enqueue(t)
if (!scheduled){
scheduled = true
@@ -22,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
}
}
- def runWithItems(): Unit = {
+ private[this] def runWithItems(): Unit = {
val items = synchronized(queue.dequeueAll(_ => true))
try run(items)
catch{case e: Throwable => log.exception(e)}
@@ -33,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
scheduled = false
}
}
-
}
-}
+} \ No newline at end of file
diff --git a/cask/util/src/cask/util/Logger.scala b/cask/util/src/cask/util/Logger.scala
index 8dc3156..2afbb48 100644
--- a/cask/util/src/cask/util/Logger.scala
+++ b/cask/util/src/cask/util/Logger.scala
@@ -8,6 +8,9 @@ trait Logger {
def debug(t: sourcecode.Text[Any])(implicit f: sourcecode.File, line: sourcecode.Line): Unit
}
object Logger{
+ object Console {
+ implicit object globalLogger extends Console()
+ }
class Console() extends Logger{
def exception(t: Throwable): Unit = t.printStackTrace()
diff --git a/cask/util/src/cask/util/WebsocketBase.scala b/cask/util/src/cask/util/WebsocketBase.scala
new file mode 100644
index 0000000..bcfdae5
--- /dev/null
+++ b/cask/util/src/cask/util/WebsocketBase.scala
@@ -0,0 +1,14 @@
+package cask.util
+
+abstract class WebsocketBase{
+ def connect(): Unit
+ def onOpen(): Unit
+ def onMessage(message: String): Unit
+ def onMessage(message: Array[Byte]): Unit
+ def send(message: String): Boolean
+ def send(message: Array[Byte]): Boolean
+ def onClose(code: Int, reason: String): Unit
+ def close(): Unit
+ def isClosed(): Boolean
+ def onError(ex: Exception): Unit
+} \ No newline at end of file
diff --git a/cask/util/src/cask/util/Ws.scala b/cask/util/src/cask/util/Ws.scala
new file mode 100644
index 0000000..52e7260
--- /dev/null
+++ b/cask/util/src/cask/util/Ws.scala
@@ -0,0 +1,24 @@
+package cask.util
+
+object Ws{
+ trait Event
+ case class Text(value: String) extends Event
+ case class Binary(value: Array[Byte]) extends Event
+ case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event
+ case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event
+ case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event
+ case class Error(e: Throwable) extends Event
+ case class ChannelClosed() extends Event
+ object Close{
+ // Taken from io.undertow.websockets.core.CloseMessage.*
+ val NormalClosure = 1000
+ val GoingAway = 1001
+ val WrongCode = 1002
+ val ProtocolError = 1003
+ val MsgContainsInvalidData = 1007
+ val MsgViolatesPolicy = 1008
+ val MsgTooBig = 1009
+ val MissingExtensions = 1010
+ val UnexpectedError = 1011
+ }
+}
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala
new file mode 100644
index 0000000..b0f56d0
--- /dev/null
+++ b/cask/util/src/cask/util/WsClient.scala
@@ -0,0 +1,55 @@
+package cask.util
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Promise}
+
+class WsClient(impl: WebsocketBase)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger)
+ extends cask.util.BatchActor[Ws.Event]{
+
+ def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ case Ws.Text(s) => impl.send(s)
+ case Ws.Binary(s) => impl.send(s)
+ case Ws.Close(_, _) => impl.close()
+ case Ws.ChannelClosed() => impl.close()
+ }
+ def close() = impl.close()
+}
+
+object WsClient{
+ def connect(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): WsClient = {
+ Await.result(connectAsync(url)(f), Duration.Inf)
+ }
+ def connectAsync(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = {
+ object receiveActor extends cask.util.BatchActor[Ws.Event] {
+ def run(items: Seq[Ws.Event]) = items.collect(f)
+ }
+ val p = Promise[WsClient]
+ val impl = new WebsocketClientImpl(url) {
+ def onOpen() = {
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onMessage(message: String) = {
+ receiveActor.send(Ws.Text(message))
+ }
+ def onMessage(message: Array[Byte]) = {
+ receiveActor.send(Ws.Binary(message))
+ }
+ def onClose(code: Int, reason: String) = {
+ receiveActor.send(Ws.Close(code, reason))
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onError(ex: Exception): Unit = {
+ receiveActor.send(Ws.Error(ex))
+ }
+ }
+
+ impl.connect()
+ p.future
+ }
+} \ No newline at end of file