summaryrefslogtreecommitdiff
path: root/cask/util/src/cask/util/WsClient.scala
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.sg@gmail.com>2019-09-16 22:32:39 +0800
committerGitHub <noreply@github.com>2019-09-16 22:32:39 +0800
commitd22a0e75bded29a28529b87ed5ef9db2f6ae86c9 (patch)
treeed8248ed943327f6011309a10ef6b03a1025ab9f /cask/util/src/cask/util/WsClient.scala
parentb83eec01c8db8a8aa499d6c498ff85987005fe83 (diff)
parentbfe26d5a9705011359658c45b364e9b65ce697b5 (diff)
downloadcask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.gz
cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.tar.bz2
cask-d22a0e75bded29a28529b87ed5ef9db2f6ae86c9.zip
Merge pull request #14 from lihaoyi-databricks/master
Provide a simple builtin websocket client in `cask.WsClient`
Diffstat (limited to 'cask/util/src/cask/util/WsClient.scala')
-rw-r--r--cask/util/src/cask/util/WsClient.scala55
1 files changed, 55 insertions, 0 deletions
diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala
new file mode 100644
index 0000000..b0f56d0
--- /dev/null
+++ b/cask/util/src/cask/util/WsClient.scala
@@ -0,0 +1,55 @@
+package cask.util
+
+import scala.concurrent.duration.Duration
+import scala.concurrent.{Await, ExecutionContext, Promise}
+
+class WsClient(impl: WebsocketBase)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger)
+ extends cask.util.BatchActor[Ws.Event]{
+
+ def run(items: Seq[Ws.Event]): Unit = items.foreach{
+ case Ws.Text(s) => impl.send(s)
+ case Ws.Binary(s) => impl.send(s)
+ case Ws.Close(_, _) => impl.close()
+ case Ws.ChannelClosed() => impl.close()
+ }
+ def close() = impl.close()
+}
+
+object WsClient{
+ def connect(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): WsClient = {
+ Await.result(connectAsync(url)(f), Duration.Inf)
+ }
+ def connectAsync(url: String)
+ (f: PartialFunction[cask.util.Ws.Event, Unit])
+ (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = {
+ object receiveActor extends cask.util.BatchActor[Ws.Event] {
+ def run(items: Seq[Ws.Event]) = items.collect(f)
+ }
+ val p = Promise[WsClient]
+ val impl = new WebsocketClientImpl(url) {
+ def onOpen() = {
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onMessage(message: String) = {
+ receiveActor.send(Ws.Text(message))
+ }
+ def onMessage(message: Array[Byte]) = {
+ receiveActor.send(Ws.Binary(message))
+ }
+ def onClose(code: Int, reason: String) = {
+ receiveActor.send(Ws.Close(code, reason))
+ if (!p.isCompleted) p.success(new WsClient(this)(f))
+ }
+ def onError(ex: Exception): Unit = {
+ receiveActor.send(Ws.Error(ex))
+ }
+ }
+
+ impl.connect()
+ p.future
+ }
+} \ No newline at end of file