diff options
author | Li Haoyi <haoyi.li@databricks.com> | 2019-09-16 22:31:03 +0800 |
---|---|---|
committer | Li Haoyi <haoyi.li@databricks.com> | 2019-09-16 22:31:03 +0800 |
commit | bfe26d5a9705011359658c45b364e9b65ce697b5 (patch) | |
tree | 656565b3d147b1837d4cfc1d949e7af8c250f16c /cask/util/src/cask/util/WsClient.scala | |
parent | 84ea971b1261919aca7b31635ddc7d0dca830fea (diff) | |
download | cask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.gz cask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.bz2 cask-bfe26d5a9705011359658c45b364e9b65ce697b5.zip |
Provide a simple builtin websocket client in `cask.WsClient`
Harmonize the actor-based APIs of `cask.WsClient`/`cask.WsHandler`/`cask.WsActor`, letting them share the same set of `cask.Ws` events
The default implementation of `cask.WsClient` on the JVM spawns one thread per connection, and doesn't really scale to large numbers of connections. For now we just continue using AsyncHttpClient in the load tests. Wrapping AsyncHttpClient in a nice API is TBD
Diffstat (limited to 'cask/util/src/cask/util/WsClient.scala')
-rw-r--r-- | cask/util/src/cask/util/WsClient.scala | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala new file mode 100644 index 0000000..b0f56d0 --- /dev/null +++ b/cask/util/src/cask/util/WsClient.scala @@ -0,0 +1,55 @@ +package cask.util + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Promise} + +class WsClient(impl: WebsocketBase) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger) + extends cask.util.BatchActor[Ws.Event]{ + + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(s) => impl.send(s) + case Ws.Binary(s) => impl.send(s) + case Ws.Close(_, _) => impl.close() + case Ws.ChannelClosed() => impl.close() + } + def close() = impl.close() +} + +object WsClient{ + def connect(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): WsClient = { + Await.result(connectAsync(url)(f), Duration.Inf) + } + def connectAsync(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.util.BatchActor[Ws.Event] { + def run(items: Seq[Ws.Event]) = items.collect(f) + } + val p = Promise[WsClient] + val impl = new WebsocketClientImpl(url) { + def onOpen() = { + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onMessage(message: String) = { + receiveActor.send(Ws.Text(message)) + } + def onMessage(message: Array[Byte]) = { + receiveActor.send(Ws.Binary(message)) + } + def onClose(code: Int, reason: String) = { + receiveActor.send(Ws.Close(code, reason)) + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onError(ex: Exception): Unit = { + receiveActor.send(Ws.Error(ex)) + } + } + + impl.connect() + p.future + } +}
\ No newline at end of file |