From bfe26d5a9705011359658c45b364e9b65ce697b5 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Mon, 16 Sep 2019 22:31:03 +0800 Subject: Provide a simple builtin websocket client in `cask.WsClient` Harmonize the actor-based APIs of `cask.WsClient`/`cask.WsHandler`/`cask.WsActor`, letting them share the same set of `cask.Ws` events The default implementation of `cask.WsClient` on the JVM spawns one thread per connection, and doesn't really scale to large numbers of connections. For now we just continue using AsyncHttpClient in the load tests. Wrapping AsyncHttpClient in a nice API is TBD --- example/websockets/app/src/Websockets.scala | 6 +-- example/websockets/app/test/src/ExampleTests.scala | 56 +++++++--------------- 2 files changed, 20 insertions(+), 42 deletions(-) (limited to 'example/websockets') diff --git a/example/websockets/app/src/Websockets.scala b/example/websockets/app/src/Websockets.scala index fe32c5d..997c1ce 100644 --- a/example/websockets/app/src/Websockets.scala +++ b/example/websockets/app/src/Websockets.scala @@ -6,9 +6,9 @@ object Websockets extends cask.MainRoutes{ if (userName != "haoyi") cask.Response("", statusCode = 403) else cask.WsHandler { channel => cask.WsActor { - case cask.WsActor.Text("") => channel.send(cask.WsActor.Close()) - case cask.WsActor.Text(data) => - channel.send(cask.WsActor.Text(userName + " " + data)) + case cask.Ws.Text("") => channel.send(cask.Ws.Close()) + case cask.Ws.Text(data) => + channel.send(cask.Ws.Text(userName + " " + data)) } } } diff --git a/example/websockets/app/test/src/ExampleTests.scala b/example/websockets/app/test/src/ExampleTests.scala index d4d96da..e8889b0 100644 --- a/example/websockets/app/test/src/ExampleTests.scala +++ b/example/websockets/app/test/src/ExampleTests.scala @@ -4,6 +4,9 @@ import java.util.concurrent.atomic.AtomicInteger import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ +import concurrent.ExecutionContext.Implicits.global +import cask.Logger.Console.globalLogger + object ExampleTests extends TestSuite{ @@ -19,57 +22,32 @@ object ExampleTests extends TestSuite{ finally server.stop() res } - val tests = Tests{ test("Websockets") - withServer(Websockets){ host => @volatile var out = List.empty[String] - val client = org.asynchttpclient.Dsl.asyncHttpClient(); - try{ - - // 4. open websocket - val ws: WebSocket = client.prepareGet("ws://localhost:8080/connect/haoyi") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) { - out = payload :: out - } - - def onOpen(websocket: WebSocket) = () + // 4. open websocket - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = () - }).build() - ).get() + val ws = cask.WsClient.connect("ws://localhost:8080/connect/haoyi"){ + case cask.Ws.Text(s) => out = s :: out + } + try { // 5. send messages - ws.sendTextFrame("hello") - ws.sendTextFrame("world") - ws.sendTextFrame("") + ws.send(cask.Ws.Text("hello")) + ws.send(cask.Ws.Text("world")) + ws.send(cask.Ws.Text("")) Thread.sleep(100) out ==> List("haoyi world", "haoyi hello") var error: String = "" - val cli2 = client.prepareGet("ws://localhost:8080/connect/nobody") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - def onOpen(websocket: WebSocket) = () - - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = { - error = t.toString - } - }).build() - ).get() + val ws2 = cask.WsClient.connect("ws://localhost:8080/connect/nobody") { + case cask.Ws.Text(s) => out = s :: out + case cask.Ws.Error(t) => error += t.toString + case cask.Ws.Close(code, reason) => error += reason + } assert(error.contains("403")) - - } finally{ - client.close() - } + }finally ws.close() } test("Websockets2000") - withServer(Websockets){ host => -- cgit v1.2.3