From bfe26d5a9705011359658c45b364e9b65ce697b5 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Mon, 16 Sep 2019 22:31:03 +0800 Subject: Provide a simple builtin websocket client in `cask.WsClient` Harmonize the actor-based APIs of `cask.WsClient`/`cask.WsHandler`/`cask.WsActor`, letting them share the same set of `cask.Ws` events The default implementation of `cask.WsClient` on the JVM spawns one thread per connection, and doesn't really scale to large numbers of connections. For now we just continue using AsyncHttpClient in the load tests. Wrapping AsyncHttpClient in a nice API is TBD --- cask/src/cask/endpoints/WebSocketEndpoint.scala | 60 +++++++++------------- cask/src/cask/main/Routes.scala | 2 +- cask/src/cask/package.scala | 3 ++ cask/util/src-js/cask/util/Scheduler.scala | 6 +++ .../src-js/cask/util/WebsocketClientImpl.scala | 33 ++++++++++++ cask/util/src-jvm/cask/util/Scheduler.scala | 16 ++++++ .../src-jvm/cask/util/WebsocketClientImpl.scala | 39 ++++++++++++++ cask/util/src/cask/util/BatchActor.scala | 11 ++-- cask/util/src/cask/util/Logger.scala | 3 ++ cask/util/src/cask/util/WebsocketBase.scala | 14 +++++ cask/util/src/cask/util/Ws.scala | 24 +++++++++ cask/util/src/cask/util/WsClient.scala | 55 ++++++++++++++++++++ 12 files changed, 221 insertions(+), 45 deletions(-) create mode 100644 cask/util/src-js/cask/util/Scheduler.scala create mode 100644 cask/util/src-js/cask/util/WebsocketClientImpl.scala create mode 100644 cask/util/src-jvm/cask/util/Scheduler.scala create mode 100644 cask/util/src-jvm/cask/util/WebsocketClientImpl.scala create mode 100644 cask/util/src/cask/util/WebsocketBase.scala create mode 100644 cask/util/src/cask/util/Ws.scala create mode 100644 cask/util/src/cask/util/WsClient.scala (limited to 'cask') diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala index 6ca5def..994f015 100644 --- a/cask/src/cask/endpoints/WebSocketEndpoint.scala +++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala @@ -4,7 +4,7 @@ import java.nio.ByteBuffer import cask.model.Request import cask.router.Result -import cask.util.Logger +import cask.util.{Logger, Ws} import io.undertow.websockets.WebSocketConnectionCallback import io.undertow.websockets.core.{AbstractReceiveListener, BufferedBinaryMessage, BufferedTextMessage, CloseMessage, WebSocketChannel, WebSockets} import io.undertow.websockets.spi.WebSocketHttpExchange @@ -32,36 +32,41 @@ class websocket(val path: String, override val subpath: Boolean = false) def wrapPathSegment(s: String): Seq[String] = Seq(s) } -case class WsHandler(f: WsChannelActor => cask.util.BatchActor[WsActor.Event]) +case class WsHandler(f: WsChannelActor => cask.util.BatchActor[Ws.Event]) (implicit ec: ExecutionContext, log: Logger) extends WebsocketResult with WebSocketConnectionCallback { def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { + channel.suspendReceives() val actor = f(new WsChannelActor(channel)) + // Somehow browsers closing tabs and Java processes being killed appear + // as different events here; the former goes to AbstractReceiveListener#onClose, + // while the latter to ChannelListener#handleEvent. Make sure we handle both cases. + channel.addCloseTask(channel => actor.send(Ws.ChannelClosed())) channel.getReceiveSetter.set( new AbstractReceiveListener() { override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = { - actor.send(WsActor.Text(message.getData)) + actor.send(Ws.Text(message.getData)) } override def onFullBinaryMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Binary( + actor.send(Ws.Binary( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onFullPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Ping( + actor.send(Ws.Ping( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onFullPongMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Pong( + actor.send(Ws.Pong( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onCloseMessage(cm: CloseMessage, channel: WebSocketChannel) = { - actor.send(WsActor.Close(cm.getCode, cm.getReason)) + actor.send(Ws.Close(cm.getCode, cm.getReason)) } } ) @@ -71,40 +76,21 @@ extends WebsocketResult with WebSocketConnectionCallback { class WsChannelActor(channel: WebSocketChannel) (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[WsActor.Event]{ - def run(items: Seq[WsActor.Event]): Unit = items.foreach{ - case WsActor.Text(value) => WebSockets.sendTextBlocking(value, channel) - case WsActor.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel) +extends cask.util.BatchActor[Ws.Event]{ + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(value) => WebSockets.sendTextBlocking(value, channel) + case Ws.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) + case Ws.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) + case Ws.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) + case Ws.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel) } } -case class WsActor(handle: PartialFunction[WsActor.Event, Unit]) +case class WsActor(handle: PartialFunction[Ws.Event, Unit]) (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[WsActor.Event]{ - def run(items: Seq[WsActor.Event]): Unit = { - items.foreach(handle.applyOrElse(_, (x: WsActor.Event) => ())) +extends cask.util.BatchActor[Ws.Event]{ + def run(items: Seq[Ws.Event]): Unit = { + items.foreach(handle.applyOrElse(_, (x: Ws.Event) => ())) } } -object WsActor{ - trait Event - case class Text(value: String) extends Event - case class Binary(value: Array[Byte]) extends Event - case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event - case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event - case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event - object Close{ - val NormalClosure = CloseMessage.NORMAL_CLOSURE - val GoingAway = CloseMessage.GOING_AWAY - val WrongCode = CloseMessage.WRONG_CODE - val ProtocolError = CloseMessage.PROTOCOL_ERROR - val MsgContainsInvalidData = CloseMessage.MSG_CONTAINS_INVALID_DATA - val MsgViolatesPolicy = CloseMessage.MSG_VIOLATES_POLICY - val MsgTooBig = CloseMessage.MSG_TOO_BIG - val MissingExtensions = CloseMessage.MISSING_EXTENSIONS - val UnexpectedError = CloseMessage.UNEXPECTED_ERROR - } -} diff --git a/cask/src/cask/main/Routes.scala b/cask/src/cask/main/Routes.scala index 98c5b78..f93e641 100644 --- a/cask/src/cask/main/Routes.scala +++ b/cask/src/cask/main/Routes.scala @@ -17,5 +17,5 @@ trait Routes{ metadata0 = routes } - def log: cask.util.Logger + implicit def log: cask.util.Logger } diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala index d34fe26..7c1d61c 100644 --- a/cask/src/cask/package.scala +++ b/cask/src/cask/package.scala @@ -47,6 +47,9 @@ package object cask { type WsActor = cask.endpoints.WsActor val WsActor = cask.endpoints.WsActor type WsChannelActor = cask.endpoints.WsChannelActor + type WsClient = cask.util.WsClient + val WsClient = cask.util.WsClient + val Ws = cask.util.Ws // util type Logger = util.Logger diff --git a/cask/util/src-js/cask/util/Scheduler.scala b/cask/util/src-js/cask/util/Scheduler.scala new file mode 100644 index 0000000..dec3450 --- /dev/null +++ b/cask/util/src-js/cask/util/Scheduler.scala @@ -0,0 +1,6 @@ +package cask.util +object Scheduler{ + def schedule(millis: Long)(body: => Unit) = { + scala.scalajs.js.timers.setTimeout(millis)(body) + } +} diff --git a/cask/util/src-js/cask/util/WebsocketClientImpl.scala b/cask/util/src-js/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..f129893 --- /dev/null +++ b/cask/util/src-js/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,33 @@ +package cask.util + +import org.scalajs.dom + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: dom.WebSocket = null + var closed = false + def connect(): Unit = { + websocket = new dom.WebSocket(url) + + websocket.onopen = (e: dom.Event) => onOpen() + websocket.onmessage = (e: dom.MessageEvent) => onMessage(e.data.asInstanceOf[String]) + websocket.onclose = (e: dom.CloseEvent) => { + closed = true + onClose(e.code, e.reason) + } + websocket.onerror = (e: dom.Event) => onError(new Exception(e.toString)) + } + def onOpen(): Unit + + def send(value: String) = try { + websocket.send(value) + true + } catch{case e: scala.scalajs.js.JavaScriptException => false} + + + def send(value: Array[Byte]) = ??? + def onError(ex: Exception): Unit + def onMessage(value: String): Unit + def onClose(code: Int, reason: String): Unit + def close(): Unit = websocket.close() + def isClosed() = closed +} \ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/Scheduler.scala b/cask/util/src-jvm/cask/util/Scheduler.scala new file mode 100644 index 0000000..74db37e --- /dev/null +++ b/cask/util/src-jvm/cask/util/Scheduler.scala @@ -0,0 +1,16 @@ +package cask.util + +import java.util.concurrent.{Executors, TimeUnit} + +object Scheduler{ + val scheduler = Executors.newSingleThreadScheduledExecutor() + def schedule(millis: Long)(body: => Unit) = { + scheduler.schedule( + new Runnable { + def run(): Unit = body + }, + millis, + TimeUnit.MILLISECONDS + ) + } +} \ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..5570356 --- /dev/null +++ b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,39 @@ +package cask.util +import org.java_websocket.client.WebSocketClient +import org.java_websocket.handshake.ServerHandshake + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: Client = null + + def connect(): Unit = { + websocket = new Client() + websocket.connect() + } + def onOpen(): Unit + def onMessage(message: String): Unit + def send(message: String) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def send(message: Array[Byte]) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def onClose(code: Int, reason: String): Unit + def onError(ex: Exception): Unit + def close(): Unit = websocket.close() + def isClosed() = websocket.isClosed() + class Client() extends WebSocketClient(new java.net.URI(url)){ + def onOpen(handshakedata: ServerHandshake) = { + WebsocketClientImpl.this.onOpen() + } + def onMessage(message: String) = WebsocketClientImpl.this.onMessage(message) + def onClose(code: Int, reason: String, remote: Boolean) = WebsocketClientImpl.this.onClose(code, reason) + def onError(ex: Exception) = WebsocketClientImpl.this.onError(ex) + + } +} \ No newline at end of file diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala index 137b852..26f1c14 100644 --- a/cask/util/src/cask/util/BatchActor.scala +++ b/cask/util/src/cask/util/BatchActor.scala @@ -1,12 +1,10 @@ package cask.util -import cask.util.Logger - import scala.collection.mutable import scala.concurrent.ExecutionContext /** - * A simple asynchrous actor, allowing safe concurrent asynchronous processing + * A simple asynchronous actor, allowing safe concurrent asynchronous processing * of queued items. `run` handles items in batches, to allow for batch * processing optimizations to be used where relevant. */ @@ -16,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, private val queue = new mutable.Queue[T]() private var scheduled = false - def send(t: => T): Unit = synchronized{ + def send(t: T): Unit = synchronized{ queue.enqueue(t) if (!scheduled){ scheduled = true @@ -24,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, } } - def runWithItems(): Unit = { + private[this] def runWithItems(): Unit = { val items = synchronized(queue.dequeueAll(_ => true)) try run(items) catch{case e: Throwable => log.exception(e)} @@ -35,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, scheduled = false } } - } -} +} \ No newline at end of file diff --git a/cask/util/src/cask/util/Logger.scala b/cask/util/src/cask/util/Logger.scala index 8dc3156..2afbb48 100644 --- a/cask/util/src/cask/util/Logger.scala +++ b/cask/util/src/cask/util/Logger.scala @@ -8,6 +8,9 @@ trait Logger { def debug(t: sourcecode.Text[Any])(implicit f: sourcecode.File, line: sourcecode.Line): Unit } object Logger{ + object Console { + implicit object globalLogger extends Console() + } class Console() extends Logger{ def exception(t: Throwable): Unit = t.printStackTrace() diff --git a/cask/util/src/cask/util/WebsocketBase.scala b/cask/util/src/cask/util/WebsocketBase.scala new file mode 100644 index 0000000..bcfdae5 --- /dev/null +++ b/cask/util/src/cask/util/WebsocketBase.scala @@ -0,0 +1,14 @@ +package cask.util + +abstract class WebsocketBase{ + def connect(): Unit + def onOpen(): Unit + def onMessage(message: String): Unit + def onMessage(message: Array[Byte]): Unit + def send(message: String): Boolean + def send(message: Array[Byte]): Boolean + def onClose(code: Int, reason: String): Unit + def close(): Unit + def isClosed(): Boolean + def onError(ex: Exception): Unit +} \ No newline at end of file diff --git a/cask/util/src/cask/util/Ws.scala b/cask/util/src/cask/util/Ws.scala new file mode 100644 index 0000000..52e7260 --- /dev/null +++ b/cask/util/src/cask/util/Ws.scala @@ -0,0 +1,24 @@ +package cask.util + +object Ws{ + trait Event + case class Text(value: String) extends Event + case class Binary(value: Array[Byte]) extends Event + case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event + case class Error(e: Throwable) extends Event + case class ChannelClosed() extends Event + object Close{ + // Taken from io.undertow.websockets.core.CloseMessage.* + val NormalClosure = 1000 + val GoingAway = 1001 + val WrongCode = 1002 + val ProtocolError = 1003 + val MsgContainsInvalidData = 1007 + val MsgViolatesPolicy = 1008 + val MsgTooBig = 1009 + val MissingExtensions = 1010 + val UnexpectedError = 1011 + } +} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala new file mode 100644 index 0000000..b0f56d0 --- /dev/null +++ b/cask/util/src/cask/util/WsClient.scala @@ -0,0 +1,55 @@ +package cask.util + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Promise} + +class WsClient(impl: WebsocketBase) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger) + extends cask.util.BatchActor[Ws.Event]{ + + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(s) => impl.send(s) + case Ws.Binary(s) => impl.send(s) + case Ws.Close(_, _) => impl.close() + case Ws.ChannelClosed() => impl.close() + } + def close() = impl.close() +} + +object WsClient{ + def connect(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): WsClient = { + Await.result(connectAsync(url)(f), Duration.Inf) + } + def connectAsync(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.util.BatchActor[Ws.Event] { + def run(items: Seq[Ws.Event]) = items.collect(f) + } + val p = Promise[WsClient] + val impl = new WebsocketClientImpl(url) { + def onOpen() = { + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onMessage(message: String) = { + receiveActor.send(Ws.Text(message)) + } + def onMessage(message: Array[Byte]) = { + receiveActor.send(Ws.Binary(message)) + } + def onClose(code: Int, reason: String) = { + receiveActor.send(Ws.Close(code, reason)) + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onError(ex: Exception): Unit = { + receiveActor.send(Ws.Error(ex)) + } + } + + impl.connect() + p.future + } +} \ No newline at end of file -- cgit v1.2.3