diff options
Diffstat (limited to 'example/websockets2/app')
-rw-r--r-- | example/websockets2/app/src/Websockets2.scala | 29 | ||||
-rw-r--r-- | example/websockets2/app/test/src/ExampleTests.scala | 123 |
2 files changed, 152 insertions, 0 deletions
diff --git a/example/websockets2/app/src/Websockets2.scala b/example/websockets2/app/src/Websockets2.scala new file mode 100644 index 0000000..b78bee3 --- /dev/null +++ b/example/websockets2/app/src/Websockets2.scala @@ -0,0 +1,29 @@ +package app + +import io.undertow.websockets.WebSocketConnectionCallback +import io.undertow.websockets.core.{AbstractReceiveListener, BufferedTextMessage, WebSocketChannel, WebSockets} +import io.undertow.websockets.spi.WebSocketHttpExchange + +object Websockets2 extends cask.MainRoutes{ + @cask.websocket("/connect/:userName") + def showUserProfile(userName: String): cask.WebsocketResult = { + if (userName != "haoyi") cask.Response("", statusCode = 403) + else new WebSocketConnectionCallback() { + override def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { + channel.getReceiveSetter.set( + new AbstractReceiveListener() { + override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = { + message.getData match{ + case "" => channel.close() + case data => WebSockets.sendTextBlocking(userName + " " + data, channel) + } + } + } + ) + channel.resumeReceives() + } + } + } + + initialize() +} diff --git a/example/websockets2/app/test/src/ExampleTests.scala b/example/websockets2/app/test/src/ExampleTests.scala new file mode 100644 index 0000000..27bff5e --- /dev/null +++ b/example/websockets2/app/test/src/ExampleTests.scala @@ -0,0 +1,123 @@ +package app + +import java.util.concurrent.atomic.AtomicInteger + +import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} +import utest._ + +object ExampleTests extends TestSuite{ + + + def withServer[T](example: cask.main.BaseMain)(f: String => T): T = { + val server = io.undertow.Undertow.builder + .addHttpListener(8080, "localhost") + .setHandler(example.defaultHandler) + .build + server.start() + val res = + try f("http://localhost:8080") + finally server.stop() + res + } + + val tests = Tests{ + test("Websockets") - withServer(Websockets2){ host => + @volatile var out = List.empty[String] + val client = org.asynchttpclient.Dsl.asyncHttpClient(); + try{ + + // 4. open websocket + val ws: WebSocket = client.prepareGet("ws://localhost:8080/connect/haoyi") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) { + out = payload :: out + } + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = () + + def onError(t: Throwable) = () + }).build() + ).get() + + // 5. send messages + ws.sendTextFrame("hello") + ws.sendTextFrame("world") + ws.sendTextFrame("") + Thread.sleep(100) + out ==> List("haoyi world", "haoyi hello") + + var error: String = "" + val cli2 = client.prepareGet("ws://localhost:8080/connect/nobody") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = () + + def onError(t: Throwable) = { + error = t.toString + } + }).build() + ).get() + + assert(error.contains("403")) + + } finally{ + client.close() + } + } + + test("Websockets2000") - withServer(Websockets2){ host => + @volatile var out = List.empty[String] + val closed = new AtomicInteger(0) + val client = org.asynchttpclient.Dsl.asyncHttpClient(); + val ws = Seq.fill(2000)(client.prepareGet("ws://localhost:8080/connect/haoyi") + .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( + new WebSocketListener() { + + override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) = { + ExampleTests.synchronized { + out = payload :: out + } + } + + def onOpen(websocket: WebSocket) = () + + def onClose(websocket: WebSocket, code: Int, reason: String) = { + closed.incrementAndGet() + } + + def onError(t: Throwable) = () + }).build() + ).get()) + + try{ + // 5. send messages + ws.foreach(_.sendTextFrame("hello")) + + Thread.sleep(1500) + out.length ==> 2000 + + ws.foreach(_.sendTextFrame("world")) + + Thread.sleep(1500) + out.length ==> 4000 + closed.get() ==> 0 + + ws.foreach(_.sendTextFrame("")) + + Thread.sleep(1500) + closed.get() ==> 2000 + + }finally{ + client.close() + } + } + + } +} |