From 9e58e95add96a075d2cb70aa477441261f481ebd Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sat, 14 Sep 2019 22:37:49 +0800 Subject: First pass at providing a convenient API for handling websockets --- example/websockets/app/src/Websockets.scala | 23 +--- example/websockets2/app/src/Websockets2.scala | 29 +++++ .../websockets2/app/test/src/ExampleTests.scala | 123 +++++++++++++++++++++ example/websockets2/build.sc | 17 +++ 4 files changed, 175 insertions(+), 17 deletions(-) create mode 100644 example/websockets2/app/src/Websockets2.scala create mode 100644 example/websockets2/app/test/src/ExampleTests.scala create mode 100644 example/websockets2/build.sc (limited to 'example') diff --git a/example/websockets/app/src/Websockets.scala b/example/websockets/app/src/Websockets.scala index a6ceb73..6cada0f 100644 --- a/example/websockets/app/src/Websockets.scala +++ b/example/websockets/app/src/Websockets.scala @@ -1,26 +1,15 @@ package app -import io.undertow.websockets.WebSocketConnectionCallback -import io.undertow.websockets.core.{AbstractReceiveListener, BufferedTextMessage, WebSocketChannel, WebSockets} -import io.undertow.websockets.spi.WebSocketHttpExchange - +import concurrent.ExecutionContext.Implicits.global object Websockets extends cask.MainRoutes{ @cask.websocket("/connect/:userName") def showUserProfile(userName: String): cask.WebsocketResult = { if (userName != "haoyi") cask.Response("", statusCode = 403) - else new WebSocketConnectionCallback() { - override def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { - channel.getReceiveSetter.set( - new AbstractReceiveListener() { - override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = { - message.getData match{ - case "" => channel.close() - case data => WebSockets.sendTextBlocking(userName + " " + data, channel) - } - } - } - ) - channel.resumeReceives() + else cask.WsHandler { channel => + cask.WsActor { + case cask.WsActor.Text("") => channel.send(cask.WsActor.Close()) + case cask.WsActor.Text(data) => + channel.send(cask.WsActor.Text(userName + " " + data)) } } } diff --git a/example/websockets2/app/src/Websockets2.scala b/example/websockets2/app/src/Websockets2.scala new file mode 100644 index 0000000..b78bee3 --- /dev/null +++ b/example/websockets2/app/src/Websockets2.scala @@ -0,0 +1,29 @@ +package app + +import io.undertow.websockets.WebSocketConnectionCallback +import io.undertow.websockets.core.{AbstractReceiveListener, BufferedTextMessage, WebSocketChannel, WebSockets} +import io.undertow.websockets.spi.WebSocketHttpExchange + +object Websockets2 extends cask.MainRoutes{ + @cask.websocket("/connect/:userName") + def showUserProfile(userName: String): cask.WebsocketResult = { + if (userName != "haoyi") cask.Response("", statusCode = 403) + else new WebSocketConnectionCallback() { + override def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { + channel.getReceiveSetter.set( + new AbstractReceiveListener() { + override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = { + message.getData match{ + case "" => channel.close() + case data => WebSockets.sendTextBlocking(userName + " " + data, channel) + } + } + } + ) + channel.resumeReceives() + } + } + } + + initialize() +} diff --git a/example/websockets2/app/test/src/ExampleTests.scala b/example/websockets2/app/test/src/ExampleTests.scala new file mode 100644 index 0000000..27bff5e --- /dev/null +++ b/example/websockets2/app/test/src/ExampleTests.scala @@ -0,0 +1,123 @@ +package app + +import java.util.concurrent.atomic.AtomicInteger + +import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} +import utest._ + +object ExampleTests extends TestSuite{ + + + def withServer[T](example: cask.main.BaseMain)(f: String => T): T = { + val server = io.undertow.Undertow.builder + .addHttpListener(8080, "localhost") + .setHandler(example.defaultHandler) + .build + server.start() + val res = + try f("http://localhost:8080") + finally server.stop() + res + } + + val tests = Tests{ + test("Websockets") - withServer(Websockets2){ host => + @volatile var out = List.empty[String] + val client = org.asynchttpclient.Dsl.asyncHttpClient(); + try{ + + // 4. open websocket + val ws: WebSocket = client.prepareGet("ws://localhost:8080/connect/haoyi") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) { + out = payload :: out + } + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = () + + def onError(t: Throwable) = () + }).build() + ).get() + + // 5. send messages + ws.sendTextFrame("hello") + ws.sendTextFrame("world") + ws.sendTextFrame("") + Thread.sleep(100) + out ==> List("haoyi world", "haoyi hello") + + var error: String = "" + val cli2 = client.prepareGet("ws://localhost:8080/connect/nobody") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = () + + def onError(t: Throwable) = { + error = t.toString + } + }).build() + ).get() + + assert(error.contains("403")) + + } finally{ + client.close() + } + } + + test("Websockets2000") - withServer(Websockets2){ host => + @volatile var out = List.empty[String] + val closed = new AtomicInteger(0) + val client = org.asynchttpclient.Dsl.asyncHttpClient(); + val ws = Seq.fill(2000)(client.prepareGet("ws://localhost:8080/connect/haoyi") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) = { + ExampleTests.synchronized { + out = payload :: out + } + } + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = { + closed.incrementAndGet() + } + + def onError(t: Throwable) = () + }).build() + ).get()) + + try{ + // 5. send messages + ws.foreach(_.sendTextFrame("hello")) + + Thread.sleep(1500) + out.length ==> 2000 + + ws.foreach(_.sendTextFrame("world")) + + Thread.sleep(1500) + out.length ==> 4000 + closed.get() ==> 0 + + ws.foreach(_.sendTextFrame("")) + + Thread.sleep(1500) + closed.get() ==> 2000 + + }finally{ + client.close() + } + } + + } +} diff --git a/example/websockets2/build.sc b/example/websockets2/build.sc new file mode 100644 index 0000000..197e285 --- /dev/null +++ b/example/websockets2/build.sc @@ -0,0 +1,17 @@ +import mill._, scalalib._ + + +trait AppModule extends ScalaModule{ + def scalaVersion = "2.13.0" + def ivyDeps = Agg[Dep]( + ) + object test extends Tests{ + def testFrameworks = Seq("utest.runner.Framework") + + def ivyDeps = Agg( + ivy"com.lihaoyi::utest::0.7.1", + ivy"com.lihaoyi::requests::0.2.0", + ivy"org.asynchttpclient:async-http-client:2.5.2" + ) + } +} \ No newline at end of file -- cgit v1.2.3