diff options
author | Li Haoyi <haoyi.sg@gmail.com> | 2019-09-16 22:32:39 +0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-09-16 22:32:39 +0800 |
commit | d22a0e75bded29a28529b87ed5ef9db2f6ae86c9 (patch) | |
tree | ed8248ed943327f6011309a10ef6b03a1025ab9f /cask/util/src/cask/util/WsClient.scala | |
parent | b83eec01c8db8a8aa499d6c498ff85987005fe83 (diff) | |
parent | bfe26d5a9705011359658c45b364e9b65ce697b5 (diff) | |
download | cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.gz cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.bz2 cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.zip |
Merge pull request #14 from lihaoyi-databricks/master
Provide a simple builtin websocket client in `cask.WsClient`
Diffstat (limited to 'cask/util/src/cask/util/WsClient.scala')
-rw-r--r-- | cask/util/src/cask/util/WsClient.scala | 55 |
1 files changed, 55 insertions, 0 deletions
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala new file mode 100644 index 0000000..b0f56d0 --- /dev/null +++ b/cask/util/src/cask/util/WsClient.scala @@ -0,0 +1,55 @@ +package cask.util + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Promise} + +class WsClient(impl: WebsocketBase) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger) + extends cask.util.BatchActor[Ws.Event]{ + + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(s) => impl.send(s) + case Ws.Binary(s) => impl.send(s) + case Ws.Close(_, _) => impl.close() + case Ws.ChannelClosed() => impl.close() + } + def close() = impl.close() +} + +object WsClient{ + def connect(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): WsClient = { + Await.result(connectAsync(url)(f), Duration.Inf) + } + def connectAsync(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.util.BatchActor[Ws.Event] { + def run(items: Seq[Ws.Event]) = items.collect(f) + } + val p = Promise[WsClient] + val impl = new WebsocketClientImpl(url) { + def onOpen() = { + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onMessage(message: String) = { + receiveActor.send(Ws.Text(message)) + } + def onMessage(message: Array[Byte]) = { + receiveActor.send(Ws.Binary(message)) + } + def onClose(code: Int, reason: String) = { + receiveActor.send(Ws.Close(code, reason)) + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onError(ex: Exception): Unit = { + receiveActor.send(Ws.Error(ex)) + } + } + + impl.connect() + p.future + } +}
\ No newline at end of file |