diff options
-rw-r--r-- | build.sc | 6 | ||||
-rw-r--r-- | cask/src/cask/endpoints/WebSocketEndpoint.scala | 60 | ||||
-rw-r--r-- | cask/src/cask/main/Routes.scala | 2 | ||||
-rw-r--r-- | cask/src/cask/package.scala | 3 | ||||
-rw-r--r-- | cask/util/src-js/cask/util/Scheduler.scala | 6 | ||||
-rw-r--r-- | cask/util/src-js/cask/util/WebsocketClientImpl.scala | 33 | ||||
-rw-r--r-- | cask/util/src-jvm/cask/util/Scheduler.scala | 16 | ||||
-rw-r--r-- | cask/util/src-jvm/cask/util/WebsocketClientImpl.scala | 39 | ||||
-rw-r--r-- | cask/util/src/cask/util/BatchActor.scala | 11 | ||||
-rw-r--r-- | cask/util/src/cask/util/Logger.scala | 3 | ||||
-rw-r--r-- | cask/util/src/cask/util/WebsocketBase.scala | 14 | ||||
-rw-r--r-- | cask/util/src/cask/util/Ws.scala | 24 | ||||
-rw-r--r-- | cask/util/src/cask/util/WsClient.scala | 55 | ||||
-rw-r--r-- | example/websockets/app/src/Websockets.scala | 6 | ||||
-rw-r--r-- | example/websockets/app/test/src/ExampleTests.scala | 56 | ||||
-rw-r--r-- | example/websockets2/app/test/src/ExampleTests.scala | 57 |
16 files changed, 263 insertions, 128 deletions
@@ -72,9 +72,15 @@ object cask extends CaskModule { object js extends UtilModule with ScalaJSModule{ def platformSegment = "js" def scalaJSVersion = "0.6.28" + def ivyDeps = super.ivyDeps() ++ Agg( + ivy"org.scala-js::scalajs-dom::0.9.7" + ) } object jvm extends UtilModule{ def platformSegment = "jvm" + def ivyDeps = super.ivyDeps() ++ Agg( + ivy"org.java-websocket:Java-WebSocket:1.4.0" + ) } } diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala index 6ca5def..994f015 100644 --- a/cask/src/cask/endpoints/WebSocketEndpoint.scala +++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala @@ -4,7 +4,7 @@ import java.nio.ByteBuffer import cask.model.Request import cask.router.Result -import cask.util.Logger +import cask.util.{Logger, Ws} import io.undertow.websockets.WebSocketConnectionCallback import io.undertow.websockets.core.{AbstractReceiveListener, BufferedBinaryMessage, BufferedTextMessage, CloseMessage, WebSocketChannel, WebSockets} import io.undertow.websockets.spi.WebSocketHttpExchange @@ -32,36 +32,41 @@ class websocket(val path: String, override val subpath: Boolean = false) def wrapPathSegment(s: String): Seq[String] = Seq(s) } -case class WsHandler(f: WsChannelActor => cask.util.BatchActor[WsActor.Event]) +case class WsHandler(f: WsChannelActor => cask.util.BatchActor[Ws.Event]) (implicit ec: ExecutionContext, log: Logger) extends WebsocketResult with WebSocketConnectionCallback { def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { + channel.suspendReceives() val actor = f(new WsChannelActor(channel)) + // Somehow browsers closing tabs and Java processes being killed appear + // as different events here; the former goes to AbstractReceiveListener#onClose, + // while the latter to ChannelListener#handleEvent. Make sure we handle both cases. + channel.addCloseTask(channel => actor.send(Ws.ChannelClosed())) channel.getReceiveSetter.set( new AbstractReceiveListener() { override def onFullTextMessage(channel: WebSocketChannel, message: BufferedTextMessage) = { - actor.send(WsActor.Text(message.getData)) + actor.send(Ws.Text(message.getData)) } override def onFullBinaryMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Binary( + actor.send(Ws.Binary( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onFullPingMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Ping( + actor.send(Ws.Ping( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onFullPongMessage(channel: WebSocketChannel, message: BufferedBinaryMessage): Unit = { - actor.send(WsActor.Pong( + actor.send(Ws.Pong( WebSockets.mergeBuffers(message.getData.getResource:_*).array() )) } override def onCloseMessage(cm: CloseMessage, channel: WebSocketChannel) = { - actor.send(WsActor.Close(cm.getCode, cm.getReason)) + actor.send(Ws.Close(cm.getCode, cm.getReason)) } } ) @@ -71,40 +76,21 @@ extends WebsocketResult with WebSocketConnectionCallback { class WsChannelActor(channel: WebSocketChannel) (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[WsActor.Event]{ - def run(items: Seq[WsActor.Event]): Unit = items.foreach{ - case WsActor.Text(value) => WebSockets.sendTextBlocking(value, channel) - case WsActor.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) - case WsActor.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel) +extends cask.util.BatchActor[Ws.Event]{ + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(value) => WebSockets.sendTextBlocking(value, channel) + case Ws.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) + case Ws.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) + case Ws.Pong(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) + case Ws.Close(code, reason) => WebSockets.sendCloseBlocking(code, reason, channel) } } -case class WsActor(handle: PartialFunction[WsActor.Event, Unit]) +case class WsActor(handle: PartialFunction[Ws.Event, Unit]) (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[WsActor.Event]{ - def run(items: Seq[WsActor.Event]): Unit = { - items.foreach(handle.applyOrElse(_, (x: WsActor.Event) => ())) +extends cask.util.BatchActor[Ws.Event]{ + def run(items: Seq[Ws.Event]): Unit = { + items.foreach(handle.applyOrElse(_, (x: Ws.Event) => ())) } } -object WsActor{ - trait Event - case class Text(value: String) extends Event - case class Binary(value: Array[Byte]) extends Event - case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event - case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event - case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event - object Close{ - val NormalClosure = CloseMessage.NORMAL_CLOSURE - val GoingAway = CloseMessage.GOING_AWAY - val WrongCode = CloseMessage.WRONG_CODE - val ProtocolError = CloseMessage.PROTOCOL_ERROR - val MsgContainsInvalidData = CloseMessage.MSG_CONTAINS_INVALID_DATA - val MsgViolatesPolicy = CloseMessage.MSG_VIOLATES_POLICY - val MsgTooBig = CloseMessage.MSG_TOO_BIG - val MissingExtensions = CloseMessage.MISSING_EXTENSIONS - val UnexpectedError = CloseMessage.UNEXPECTED_ERROR - } -} diff --git a/cask/src/cask/main/Routes.scala b/cask/src/cask/main/Routes.scala index 98c5b78..f93e641 100644 --- a/cask/src/cask/main/Routes.scala +++ b/cask/src/cask/main/Routes.scala @@ -17,5 +17,5 @@ trait Routes{ metadata0 = routes } - def log: cask.util.Logger + implicit def log: cask.util.Logger } diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala index d34fe26..7c1d61c 100644 --- a/cask/src/cask/package.scala +++ b/cask/src/cask/package.scala @@ -47,6 +47,9 @@ package object cask { type WsActor = cask.endpoints.WsActor val WsActor = cask.endpoints.WsActor type WsChannelActor = cask.endpoints.WsChannelActor + type WsClient = cask.util.WsClient + val WsClient = cask.util.WsClient + val Ws = cask.util.Ws // util type Logger = util.Logger diff --git a/cask/util/src-js/cask/util/Scheduler.scala b/cask/util/src-js/cask/util/Scheduler.scala new file mode 100644 index 0000000..dec3450 --- /dev/null +++ b/cask/util/src-js/cask/util/Scheduler.scala @@ -0,0 +1,6 @@ +package cask.util +object Scheduler{ + def schedule(millis: Long)(body: => Unit) = { + scala.scalajs.js.timers.setTimeout(millis)(body) + } +} diff --git a/cask/util/src-js/cask/util/WebsocketClientImpl.scala b/cask/util/src-js/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..f129893 --- /dev/null +++ b/cask/util/src-js/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,33 @@ +package cask.util + +import org.scalajs.dom + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: dom.WebSocket = null + var closed = false + def connect(): Unit = { + websocket = new dom.WebSocket(url) + + websocket.onopen = (e: dom.Event) => onOpen() + websocket.onmessage = (e: dom.MessageEvent) => onMessage(e.data.asInstanceOf[String]) + websocket.onclose = (e: dom.CloseEvent) => { + closed = true + onClose(e.code, e.reason) + } + websocket.onerror = (e: dom.Event) => onError(new Exception(e.toString)) + } + def onOpen(): Unit + + def send(value: String) = try { + websocket.send(value) + true + } catch{case e: scala.scalajs.js.JavaScriptException => false} + + + def send(value: Array[Byte]) = ??? + def onError(ex: Exception): Unit + def onMessage(value: String): Unit + def onClose(code: Int, reason: String): Unit + def close(): Unit = websocket.close() + def isClosed() = closed +}
\ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/Scheduler.scala b/cask/util/src-jvm/cask/util/Scheduler.scala new file mode 100644 index 0000000..74db37e --- /dev/null +++ b/cask/util/src-jvm/cask/util/Scheduler.scala @@ -0,0 +1,16 @@ +package cask.util + +import java.util.concurrent.{Executors, TimeUnit} + +object Scheduler{ + val scheduler = Executors.newSingleThreadScheduledExecutor() + def schedule(millis: Long)(body: => Unit) = { + scheduler.schedule( + new Runnable { + def run(): Unit = body + }, + millis, + TimeUnit.MILLISECONDS + ) + } +}
\ No newline at end of file diff --git a/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala new file mode 100644 index 0000000..5570356 --- /dev/null +++ b/cask/util/src-jvm/cask/util/WebsocketClientImpl.scala @@ -0,0 +1,39 @@ +package cask.util +import org.java_websocket.client.WebSocketClient +import org.java_websocket.handshake.ServerHandshake + +abstract class WebsocketClientImpl(url: String) extends WebsocketBase{ + var websocket: Client = null + + def connect(): Unit = { + websocket = new Client() + websocket.connect() + } + def onOpen(): Unit + def onMessage(message: String): Unit + def send(message: String) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def send(message: Array[Byte]) = try{ + websocket.send(message) + true + }catch{ + case e: org.java_websocket.exceptions.WebsocketNotConnectedException => false + } + def onClose(code: Int, reason: String): Unit + def onError(ex: Exception): Unit + def close(): Unit = websocket.close() + def isClosed() = websocket.isClosed() + class Client() extends WebSocketClient(new java.net.URI(url)){ + def onOpen(handshakedata: ServerHandshake) = { + WebsocketClientImpl.this.onOpen() + } + def onMessage(message: String) = WebsocketClientImpl.this.onMessage(message) + def onClose(code: Int, reason: String, remote: Boolean) = WebsocketClientImpl.this.onClose(code, reason) + def onError(ex: Exception) = WebsocketClientImpl.this.onError(ex) + + } +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala index 137b852..26f1c14 100644 --- a/cask/util/src/cask/util/BatchActor.scala +++ b/cask/util/src/cask/util/BatchActor.scala @@ -1,12 +1,10 @@ package cask.util -import cask.util.Logger - import scala.collection.mutable import scala.concurrent.ExecutionContext /** - * A simple asynchrous actor, allowing safe concurrent asynchronous processing + * A simple asynchronous actor, allowing safe concurrent asynchronous processing * of queued items. `run` handles items in batches, to allow for batch * processing optimizations to be used where relevant. */ @@ -16,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, private val queue = new mutable.Queue[T]() private var scheduled = false - def send(t: => T): Unit = synchronized{ + def send(t: T): Unit = synchronized{ queue.enqueue(t) if (!scheduled){ scheduled = true @@ -24,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, } } - def runWithItems(): Unit = { + private[this] def runWithItems(): Unit = { val items = synchronized(queue.dequeueAll(_ => true)) try run(items) catch{case e: Throwable => log.exception(e)} @@ -35,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, scheduled = false } } - } -} +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/Logger.scala b/cask/util/src/cask/util/Logger.scala index 8dc3156..2afbb48 100644 --- a/cask/util/src/cask/util/Logger.scala +++ b/cask/util/src/cask/util/Logger.scala @@ -8,6 +8,9 @@ trait Logger { def debug(t: sourcecode.Text[Any])(implicit f: sourcecode.File, line: sourcecode.Line): Unit } object Logger{ + object Console { + implicit object globalLogger extends Console() + } class Console() extends Logger{ def exception(t: Throwable): Unit = t.printStackTrace() diff --git a/cask/util/src/cask/util/WebsocketBase.scala b/cask/util/src/cask/util/WebsocketBase.scala new file mode 100644 index 0000000..bcfdae5 --- /dev/null +++ b/cask/util/src/cask/util/WebsocketBase.scala @@ -0,0 +1,14 @@ +package cask.util + +abstract class WebsocketBase{ + def connect(): Unit + def onOpen(): Unit + def onMessage(message: String): Unit + def onMessage(message: Array[Byte]): Unit + def send(message: String): Boolean + def send(message: Array[Byte]): Boolean + def onClose(code: Int, reason: String): Unit + def close(): Unit + def isClosed(): Boolean + def onError(ex: Exception): Unit +}
\ No newline at end of file diff --git a/cask/util/src/cask/util/Ws.scala b/cask/util/src/cask/util/Ws.scala new file mode 100644 index 0000000..52e7260 --- /dev/null +++ b/cask/util/src/cask/util/Ws.scala @@ -0,0 +1,24 @@ +package cask.util + +object Ws{ + trait Event + case class Text(value: String) extends Event + case class Binary(value: Array[Byte]) extends Event + case class Ping(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Pong(value: Array[Byte] = Array.empty[Byte]) extends Event + case class Close(code: Int = Close.NormalClosure, reason: String = "") extends Event + case class Error(e: Throwable) extends Event + case class ChannelClosed() extends Event + object Close{ + // Taken from io.undertow.websockets.core.CloseMessage.* + val NormalClosure = 1000 + val GoingAway = 1001 + val WrongCode = 1002 + val ProtocolError = 1003 + val MsgContainsInvalidData = 1007 + val MsgViolatesPolicy = 1008 + val MsgTooBig = 1009 + val MissingExtensions = 1010 + val UnexpectedError = 1011 + } +} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala new file mode 100644 index 0000000..b0f56d0 --- /dev/null +++ b/cask/util/src/cask/util/WsClient.scala @@ -0,0 +1,55 @@ +package cask.util + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, ExecutionContext, Promise} + +class WsClient(impl: WebsocketBase) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger) + extends cask.util.BatchActor[Ws.Event]{ + + def run(items: Seq[Ws.Event]): Unit = items.foreach{ + case Ws.Text(s) => impl.send(s) + case Ws.Binary(s) => impl.send(s) + case Ws.Close(_, _) => impl.close() + case Ws.ChannelClosed() => impl.close() + } + def close() = impl.close() +} + +object WsClient{ + def connect(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): WsClient = { + Await.result(connectAsync(url)(f), Duration.Inf) + } + def connectAsync(url: String) + (f: PartialFunction[cask.util.Ws.Event, Unit]) + (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.util.BatchActor[Ws.Event] { + def run(items: Seq[Ws.Event]) = items.collect(f) + } + val p = Promise[WsClient] + val impl = new WebsocketClientImpl(url) { + def onOpen() = { + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onMessage(message: String) = { + receiveActor.send(Ws.Text(message)) + } + def onMessage(message: Array[Byte]) = { + receiveActor.send(Ws.Binary(message)) + } + def onClose(code: Int, reason: String) = { + receiveActor.send(Ws.Close(code, reason)) + if (!p.isCompleted) p.success(new WsClient(this)(f)) + } + def onError(ex: Exception): Unit = { + receiveActor.send(Ws.Error(ex)) + } + } + + impl.connect() + p.future + } +}
\ No newline at end of file diff --git a/example/websockets/app/src/Websockets.scala b/example/websockets/app/src/Websockets.scala index fe32c5d..997c1ce 100644 --- a/example/websockets/app/src/Websockets.scala +++ b/example/websockets/app/src/Websockets.scala @@ -6,9 +6,9 @@ object Websockets extends cask.MainRoutes{ if (userName != "haoyi") cask.Response("", statusCode = 403) else cask.WsHandler { channel => cask.WsActor { - case cask.WsActor.Text("") => channel.send(cask.WsActor.Close()) - case cask.WsActor.Text(data) => - channel.send(cask.WsActor.Text(userName + " " + data)) + case cask.Ws.Text("") => channel.send(cask.Ws.Close()) + case cask.Ws.Text(data) => + channel.send(cask.Ws.Text(userName + " " + data)) } } } diff --git a/example/websockets/app/test/src/ExampleTests.scala b/example/websockets/app/test/src/ExampleTests.scala index d4d96da..e8889b0 100644 --- a/example/websockets/app/test/src/ExampleTests.scala +++ b/example/websockets/app/test/src/ExampleTests.scala @@ -4,6 +4,9 @@ import java.util.concurrent.atomic.AtomicInteger import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ +import concurrent.ExecutionContext.Implicits.global +import cask.Logger.Console.globalLogger + object ExampleTests extends TestSuite{ @@ -19,57 +22,32 @@ object ExampleTests extends TestSuite{ finally server.stop() res } - val tests = Tests{ test("Websockets") - withServer(Websockets){ host => @volatile var out = List.empty[String] - val client = org.asynchttpclient.Dsl.asyncHttpClient(); - try{ - - // 4. open websocket - val ws: WebSocket = client.prepareGet("ws://localhost:8080/connect/haoyi") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) { - out = payload :: out - } - - def onOpen(websocket: WebSocket) = () + // 4. open websocket - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = () - }).build() - ).get() + val ws = cask.WsClient.connect("ws://localhost:8080/connect/haoyi"){ + case cask.Ws.Text(s) => out = s :: out + } + try { // 5. send messages - ws.sendTextFrame("hello") - ws.sendTextFrame("world") - ws.sendTextFrame("") + ws.send(cask.Ws.Text("hello")) + ws.send(cask.Ws.Text("world")) + ws.send(cask.Ws.Text("")) Thread.sleep(100) out ==> List("haoyi world", "haoyi hello") var error: String = "" - val cli2 = client.prepareGet("ws://localhost:8080/connect/nobody") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - def onOpen(websocket: WebSocket) = () - - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = { - error = t.toString - } - }).build() - ).get() + val ws2 = cask.WsClient.connect("ws://localhost:8080/connect/nobody") { + case cask.Ws.Text(s) => out = s :: out + case cask.Ws.Error(t) => error += t.toString + case cask.Ws.Close(code, reason) => error += reason + } assert(error.contains("403")) - - } finally{ - client.close() - } + }finally ws.close() } test("Websockets2000") - withServer(Websockets){ host => diff --git a/example/websockets2/app/test/src/ExampleTests.scala b/example/websockets2/app/test/src/ExampleTests.scala index 96dfd7e..11796ed 100644 --- a/example/websockets2/app/test/src/ExampleTests.scala +++ b/example/websockets2/app/test/src/ExampleTests.scala @@ -4,10 +4,10 @@ import java.util.concurrent.atomic.AtomicInteger import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ - +import concurrent.ExecutionContext.Implicits.global +import cask.Logger.Console.globalLogger object ExampleTests extends TestSuite{ - def withServer[T](example: cask.main.Main)(f: String => T): T = { val server = io.undertow.Undertow.builder .addHttpListener(8080, "localhost") @@ -23,53 +23,28 @@ object ExampleTests extends TestSuite{ val tests = Tests{ test("Websockets") - withServer(Websockets2){ host => @volatile var out = List.empty[String] - val client = org.asynchttpclient.Dsl.asyncHttpClient(); - try{ - - // 4. open websocket - val ws: WebSocket = client.prepareGet("ws://localhost:8080/connect/haoyi") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - override def onTextFrame(payload: String, finalFragment: Boolean, rsv: Int) { - out = payload :: out - } - - def onOpen(websocket: WebSocket) = () - - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = () - }).build() - ).get() + // 4. open websocket + val ws = cask.WsClient.connect("ws://localhost:8080/connect/haoyi"){ + case cask.Ws.Text(s) => out = s :: out + } + try { // 5. send messages - ws.sendTextFrame("hello") - ws.sendTextFrame("world") - ws.sendTextFrame("") + ws.send(cask.Ws.Text("hello")) + ws.send(cask.Ws.Text("world")) + ws.send(cask.Ws.Text("")) Thread.sleep(100) out ==> List("haoyi world", "haoyi hello") var error: String = "" - val cli2 = client.prepareGet("ws://localhost:8080/connect/nobody") - .execute(new WebSocketUpgradeHandler.Builder().addWebSocketListener( - new WebSocketListener() { - - def onOpen(websocket: WebSocket) = () - - def onClose(websocket: WebSocket, code: Int, reason: String) = () - - def onError(t: Throwable) = { - error = t.toString - } - }).build() - ).get() + val ws2 = cask.WsClient.connect("ws://localhost:8080/connect/nobody") { + case cask.Ws.Text(s) => out = s :: out + case cask.Ws.Error(t) => error += t.toString + case cask.Ws.Close(code, reason) => error += reason + } assert(error.contains("403")) - - } finally{ - client.close() - } + }finally ws.close() } test("Websockets2000") - withServer(Websockets2){ host => |