From c95bf293dbe23fa6c5fd9e23b35a4e4ce34da415 Mon Sep 17 00:00:00 2001 From: Li Haoyi Date: Sun, 3 Nov 2019 17:33:31 +0800 Subject: Flesh out `BatchActor.scala` into its own module, `cask.Actor`. Add the first unit test for an asynchronous logging actor --- build.sc | 50 +++++- cask/actor/src/Actors.scala | 78 +++++++++ cask/actor/src/Context.scala | 190 +++++++++++++++++++++ cask/actor/test/src-jvm/JvmActorsTest.scala | 124 ++++++++++++++ cask/actor/test/src/ActorsTest.scala | 17 ++ cask/src/cask/endpoints/WebSocketEndpoint.scala | 18 +- cask/src/cask/main/Routes.scala | 5 +- cask/src/cask/package.scala | 2 +- cask/util/src/cask/util/BatchActor.scala | 46 ----- cask/util/src/cask/util/WsClient.scala | 14 +- example/websockets/app/test/src/ExampleTests.scala | 5 +- .../websockets2/app/test/src/ExampleTests.scala | 4 +- .../websockets3/app/test/src/ExampleTests.scala | 6 +- .../websockets4/app/test/src/ExampleTests.scala | 5 +- 14 files changed, 482 insertions(+), 82 deletions(-) create mode 100644 cask/actor/src/Actors.scala create mode 100644 cask/actor/src/Context.scala create mode 100644 cask/actor/test/src-jvm/JvmActorsTest.scala create mode 100644 cask/actor/test/src/ActorsTest.scala delete mode 100644 cask/util/src/cask/util/BatchActor.scala diff --git a/build.sc b/build.sc index 47ac6bc..b28417c 100644 --- a/build.sc +++ b/build.sc @@ -67,19 +67,21 @@ object cask extends CaskModule { millSourcePath / s"src-$platformSegment" ) def ivyDeps = Agg( - ivy"com.lihaoyi::sourcecode:0.1.7", + ivy"com.lihaoyi::sourcecode:0.1.8", ivy"com.lihaoyi::pprint:0.5.5" ) } object js extends UtilModule with ScalaJSModule{ + def moduleDeps = Seq(actor.js) def platformSegment = "js" - def scalaJSVersion = "0.6.28" + def scalaJSVersion = "0.6.29" def ivyDeps = super.ivyDeps() ++ Agg( ivy"org.scala-js::scalajs-dom::0.9.7" ) } object jvm extends UtilModule{ + def moduleDeps = Seq(actor.jvm) def platformSegment = "jvm" def ivyDeps = super.ivyDeps() ++ Agg( ivy"org.java-websocket:Java-WebSocket:1.4.0" @@ -87,14 +89,52 @@ object cask extends CaskModule { } } + object actor extends Module { + trait ActorModule extends CaskModule { + def artifactName = "cask-actor" + def platformSegment: String + def millSourcePath = super.millSourcePath / os.up + + def sources = T.sources( + millSourcePath / "src", + millSourcePath / s"src-$platformSegment" + ) + + def ivyDeps = Agg(ivy"com.lihaoyi::sourcecode::0.1.8") + + trait ActorTestModule extends Tests { + def sources = T.sources( + millSourcePath / "src", + millSourcePath / s"src-$platformSegment" + ) + def testFrameworks = Seq("utest.runner.Framework") + def ivyDeps = Agg(ivy"com.lihaoyi::utest::0.7.1") + } + } + + object js extends ActorModule with ScalaJSModule{ + def platformSegment = "js" + def scalaJSVersion = "0.6.29" + + object test extends ActorTestModule with Tests + } + object jvm extends ActorModule{ + def platformSegment = "jvm" + + object test extends ActorTestModule with Tests{ + def ivyDeps = super.ivyDeps() ++ Agg( + ivy"com.lihaoyi::os-lib:0.4.2" + ) + } + } + } + object test extends Tests{ def testFrameworks = Seq("utest.runner.Framework") def ivyDeps = Agg( ivy"com.lihaoyi::utest::0.7.1", - ivy"com.lihaoyi::requests::0.2.0", -// ivy"org.xerial:sqlite-jdbc:3.18.0", -// ivy"io.getquill::quill-jdbc:2.6.0" + ivy"com.lihaoyi::requests::0.2.0" ) } } diff --git a/cask/actor/src/Actors.scala b/cask/actor/src/Actors.scala new file mode 100644 index 0000000..69a5289 --- /dev/null +++ b/cask/actor/src/Actors.scala @@ -0,0 +1,78 @@ +package cask.actor +import collection.mutable + +abstract class BaseActor[T]()(implicit ac: Context) extends Actor[T]{ + private val queue = new mutable.Queue[(T, Context.Token)]() + + private var scheduled = false + + def send(t: T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit = synchronized{ + val token = ac.reportSchedule(this, t, fileName, line) + queue.enqueue((t, token)) + if (!scheduled){ + scheduled = true + ac.execute(() => runWithItems()) + } + } + def sendAsync(f: scala.concurrent.Future[T]) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line) = { + f.onComplete{ + case scala.util.Success(v) => this.send(v) + case scala.util.Failure(e) => ac.reportFailure(e) + } + } + + def runBatch0(msgs: Seq[(T, Context.Token)]): Unit + private[this] def runWithItems(): Unit = { + val msgs = synchronized(queue.dequeueAll(_ => true)) + + runBatch0(msgs) + + synchronized{ + if (queue.nonEmpty) ac.execute(() => runWithItems()) + else{ + assert(scheduled) + scheduled = false + } + } + } +} + +abstract class BatchActor[T]()(implicit ac: Context) extends BaseActor[T]{ + def runBatch(msgs: Seq[T]): Unit + def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { + try { + msgs.foreach{case (m, token) => ac.reportRun(this, m, token)} + runBatch(msgs.map(_._1)) + } + catch{case e: Throwable => ac.reportFailure(e)} + finally msgs.foreach{case (m, token) => ac.reportComplete(token)} + + } +} + +abstract class SimpleActor[T]()(implicit ac: Context) extends BaseActor[T]{ + def run(msg: T): Unit + override def runBatch0(msgs: Seq[(T, Context.Token)]): Unit = { + for((msg, token) <- msgs) { + try { + ac.reportRun(this, msg, token) + run(msg) + } + catch{case e: Throwable => ac.reportFailure(e)} + finally ac.reportComplete(token) + } + } +} + +abstract class StateMachineActor[T]()(implicit ac: Context) extends SimpleActor[T]() { + class State(val run: T => State) + protected[this] def initialState: State + protected[this] var state: State = initialState + def run(msg: T): Unit = { + state = state.run(msg) + } +} \ No newline at end of file diff --git a/cask/actor/src/Context.scala b/cask/actor/src/Context.scala new file mode 100644 index 0000000..6b56f2e --- /dev/null +++ b/cask/actor/src/Context.scala @@ -0,0 +1,190 @@ +package cask.actor +import java.util.concurrent.{Executors, ThreadFactory, TimeUnit} + +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, CanAwait, ExecutionContext, Future, Promise} +import scala.util.Try + +/** + * An extended `scala.concurrent.ExecutionContext`; provides the ability to + * schedule messages to be sent later, and hooks to track the current number of + * outstanding tasks or log the actor message sends for debugging purporses + */ +trait Context extends ExecutionContext { + def reportSchedule(): Context.Token = new Context.Token.Simple() + + def reportSchedule(fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + new Context.Token.Future(fileName, line) + } + + def reportSchedule(a: Actor[_], + msg: Any, + fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + new Context.Token.Send(a, msg, fileName, line) + } + + def reportRun(a: Actor[_], + msg: Any, + token: Context.Token): Unit = () + + def reportComplete(token: Context.Token): Unit = () + + def scheduleMsg[T](a: Actor[T], msg: T, time: java.time.Duration) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit + + def future[T](t: => T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Future[T] + + def execute(runnable: Runnable): Unit +} + +object Context{ + trait Token + object Token{ + class Simple extends Token(){ + override def toString = "token@" + Integer.toHexString(hashCode()) + } + + class Future(val fileName: sourcecode.FileName, + val line: sourcecode.Line) extends Token(){ + override def toString = + "token@" + Integer.toHexString(hashCode()) + "@" + + fileName.value + ":" + line.value + } + + class Send(val a: Actor[_], + val msg: Any, + val fileName: sourcecode.FileName, + val line: sourcecode.Line) extends Token(){ + override def toString = + "token@" + Integer.toHexString(hashCode()) + "@" + + fileName.value + ":" + line.value + } + } + + class Simple(ec: ExecutionContext, logEx: Throwable => Unit) extends Context.Impl { + def executionContext = ec + def reportFailure(t: Throwable) = logEx(t) + } + + object Simple{ + implicit val global: Simple = new Simple(scala.concurrent.ExecutionContext.Implicits.global, _.printStackTrace()) + } + + class Test(ec: ExecutionContext = scala.concurrent.ExecutionContext.global, + logEx: Throwable => Unit = _.printStackTrace()) extends Context.Impl { + private[this] val active = collection.mutable.Set.empty[Context.Token] + private[this] var promise = concurrent.Promise.successful[Unit](()) + + def executionContext = ec + + def reportFailure(t: Throwable) = logEx(t) + + def handleReportSchedule(token: Context.Token) = synchronized{ + if (active.isEmpty) { + assert(promise.isCompleted) + promise = concurrent.Promise[Unit] + } + active.add(token) + token + } + override def reportSchedule() = { + handleReportSchedule(super.reportSchedule()) + } + override def reportSchedule(fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + handleReportSchedule(super.reportSchedule(fileName, line)) + } + + override def reportSchedule(a: Actor[_], + msg: Any, + fileName: sourcecode.FileName, + line: sourcecode.Line): Context.Token = { + handleReportSchedule(super.reportSchedule(a, msg, fileName, line)) + } + + override def reportComplete(token: Context.Token) = this.synchronized{ + assert(active.remove(token)) + + if (active.isEmpty) promise.success(()) + } + + def waitForInactivity(timeout: Option[java.time.Duration] = None) = { + Await.result( + this.synchronized(promise).future, + timeout match{ + case None => scala.concurrent.duration.Duration.Inf + case Some(t) => scala.concurrent.duration.Duration.fromNanos(t.toNanos) + } + ) + } + } + + trait Impl extends Context { + def executionContext: ExecutionContext + + def execute(runnable: Runnable): Unit = { + val token = reportSchedule() + executionContext.execute(new Runnable { + def run(): Unit = { + try runnable.run() + finally reportComplete(token) + } + }) + } + + def future[T](t: => T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Future[T] = { + val token = reportSchedule(fileName, line) + val p = Promise[T] + executionContext.execute(new Runnable { + def run(): Unit = { + p.complete(scala.util.Try(t)) + reportComplete(token) + } + }) + p.future + } + + lazy val scheduler = Executors.newSingleThreadScheduledExecutor( + new ThreadFactory { + def newThread(r: Runnable): Thread = { + val t = new Thread(r, "ActorContext-Scheduler-Thread") + t.setDaemon(true) + t + } + } + ) + + def scheduleMsg[T](a: Actor[T], + msg: T, delay: java.time.Duration) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line) = { + val token = reportSchedule(a, msg, fileName, line) + scheduler.schedule[Unit]( + () => { + a.send(msg)(fileName, line) + reportComplete(token) + }, + delay.toMillis, + TimeUnit.MILLISECONDS + ) + } + } + +} + +trait Actor[T]{ + def send(t: T) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit + + def sendAsync(f: scala.concurrent.Future[T]) + (implicit fileName: sourcecode.FileName, + line: sourcecode.Line): Unit +} diff --git a/cask/actor/test/src-jvm/JvmActorsTest.scala b/cask/actor/test/src-jvm/JvmActorsTest.scala new file mode 100644 index 0000000..9cfb0d5 --- /dev/null +++ b/cask/actor/test/src-jvm/JvmActorsTest.scala @@ -0,0 +1,124 @@ +package cask.actor + +import utest._ +object JvmActorsTest extends TestSuite{ + def tests = Tests{ + os.remove.all(os.pwd / "out" / "scratch") + test("lock"){ + val rotateSize = 50 + val logPath = os.pwd / "out" / "scratch" / "log.txt" + val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + + var logSize = 0 + + def logLine(s: String): Unit = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = 0 + os.move(logPath, oldPath, replaceExisting = true) + } + + os.write.append(logPath, s + "\n", createFolders = true) + } + + logLine("I am cow") + logLine("hear me moo") + logLine("I weight twice as much as you") + logLine("And I look good on the barbecue") + logLine("Yoghurt curds cream cheese and butter") + logLine("Comes from liquids from my udder") + logLine("I am cow, I am cow") + logLine("Hear me moo, moooo") + + os.read(oldPath).trim() ==> "Yoghurt curds cream cheese and butter\nComes from liquids from my udder" + os.read(logPath).trim() ==> "I am cow, I am cow\nHear me moo, moooo" + } + + test("actor"){ + class Logger(log: os.Path, old: os.Path, rotateSize: Int) + (implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = s.length + os.move(log, old, replaceExisting = true) + } + os.write.append(log, s + "\n", createFolders = true) + } + private var logSize = 0 + } + + implicit val ac = new Context.Test() + + val logPath = os.pwd / "out" / "scratch" / "log.txt" + val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + + val logger = new Logger(logPath, oldPath, rotateSize = 50) + + logger.send("I am cow") + logger.send("hear me moo") + logger.send("I weight twice as much as you") + logger.send("And I look good on the barbecue") + logger.send("Yoghurt curds cream cheese and butter") + logger.send("Comes from liquids from my udder") + logger.send("I am cow, I am cow") + logger.send("Hear me moo, moooo") + + ac.waitForInactivity() + + os.read.lines(oldPath) ==> Seq("Comes from liquids from my udder") + os.read.lines(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") + } + + test("pipeline"){ + class Writer(log: os.Path, old: os.Path, rotateSize: Int) + (implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = { + val newLogSize = logSize + s.length + 1 + if (newLogSize <= rotateSize) logSize = newLogSize + else { + logSize = s.length + os.move(log, old, replaceExisting = true) + } + os.write.append(log, s + "\n", createFolders = true) + } + private var logSize = 0 + } + + class Logger(dest: Actor[String])(implicit ac: Context) extends SimpleActor[String]{ + def run(s: String) = dest.send(java.util.Base64.getEncoder.encodeToString(s.getBytes)) + } + + implicit val ac = new Context.Test() + + val logPath = os.pwd / "out" / "scratch" / "log.txt" + val oldPath = os.pwd / "out" / "scratch" / "log-old.txt" + + val writer = new Writer(logPath, oldPath, rotateSize = 50) + val logger = new Logger(writer) + + logger.send("I am cow") + logger.send("hear me moo") + logger.send("I weight twice as much as you") + logger.send("And I look good on the barbecue") + logger.send("Yoghurt curds cream cheese and butter") + logger.send("Comes from liquids from my udder") + logger.send("I am cow, I am cow") + logger.send("Hear me moo, moooo") + + ac.waitForInactivity() + + os.read(oldPath) ==> "Q29tZXMgZnJvbSBsaXF1aWRzIGZyb20gbXkgdWRkZXI=\n" + os.read(logPath) ==> "SSBhbSBjb3csIEkgYW0gY293\nSGVhciBtZSBtb28sIG1vb29v\n" + + def decodeFile(p: os.Path) = { + os.read.lines(p).map(s => new String(java.util.Base64.getDecoder.decode(s))) + } + + decodeFile(oldPath) ==> Seq("Comes from liquids from my udder") + decodeFile(logPath) ==> Seq("I am cow, I am cow", "Hear me moo, moooo") + } + } +} \ No newline at end of file diff --git a/cask/actor/test/src/ActorsTest.scala b/cask/actor/test/src/ActorsTest.scala new file mode 100644 index 0000000..2c81e5b --- /dev/null +++ b/cask/actor/test/src/ActorsTest.scala @@ -0,0 +1,17 @@ +package cask.actor +import utest._ +object ActorsTest extends TestSuite{ + def tests = Tests{ + test("hello"){ + import Context.Simple.global + + sealed trait Msg + + object logger extends SimpleActor[Msg]{ + def run(msg: Msg) = { + + } + } + } + } +} \ No newline at end of file diff --git a/cask/src/cask/endpoints/WebSocketEndpoint.scala b/cask/src/cask/endpoints/WebSocketEndpoint.scala index 905c5f1..fcb40ec 100644 --- a/cask/src/cask/endpoints/WebSocketEndpoint.scala +++ b/cask/src/cask/endpoints/WebSocketEndpoint.scala @@ -32,8 +32,8 @@ class websocket(val path: String, override val subpath: Boolean = false) def wrapPathSegment(s: String): Seq[String] = Seq(s) } -case class WsHandler(f: WsChannelActor => cask.util.BatchActor[Ws.Event]) - (implicit ec: ExecutionContext, log: Logger) +case class WsHandler(f: WsChannelActor => cask.actor.Actor[Ws.Event]) + (implicit ac: cask.actor.Context, log: Logger) extends WebsocketResult with WebSocketConnectionCallback { def onConnect(exchange: WebSocketHttpExchange, channel: WebSocketChannel): Unit = { channel.suspendReceives() @@ -75,9 +75,9 @@ extends WebsocketResult with WebSocketConnectionCallback { } class WsChannelActor(channel: WebSocketChannel) - (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[Ws.Event]{ - def run(items: Seq[Ws.Event]): Unit = items.foreach{ + (implicit ac: cask.actor.Context, log: Logger) +extends cask.actor.SimpleActor[Ws.Event]{ + def run(item: Ws.Event): Unit = item match{ case Ws.Text(value) => WebSockets.sendTextBlocking(value, channel) case Ws.Binary(value) => WebSockets.sendBinaryBlocking(ByteBuffer.wrap(value), channel) case Ws.Ping(value) => WebSockets.sendPingBlocking(ByteBuffer.wrap(value), channel) @@ -87,10 +87,10 @@ extends cask.util.BatchActor[Ws.Event]{ } case class WsActor(handle: PartialFunction[Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger) -extends cask.util.BatchActor[Ws.Event]{ - def run(items: Seq[Ws.Event]): Unit = { - items.foreach(handle.applyOrElse(_, (x: Ws.Event) => ())) + (implicit ac: cask.actor.Context, log: Logger) +extends cask.actor.SimpleActor[Ws.Event]{ + def run(item: Ws.Event): Unit = { + handle.lift(item) } } diff --git a/cask/src/cask/main/Routes.scala b/cask/src/cask/main/Routes.scala index 1b83be3..68e3af4 100644 --- a/cask/src/cask/main/Routes.scala +++ b/cask/src/cask/main/Routes.scala @@ -7,7 +7,10 @@ import language.experimental.macros trait Routes{ def decorators = Seq.empty[cask.router.Decorator[_, _, _]] - implicit def executionContext = concurrent.ExecutionContext.Implicits.global + implicit val actorContext = new cask.actor.Context.Simple( + concurrent.ExecutionContext.Implicits.global, + log.exception + ) private[this] var metadata0: RoutesEndpointsMetadata[this.type] = null def caskMetadata = if (metadata0 != null) metadata0 diff --git a/cask/src/cask/package.scala b/cask/src/cask/package.scala index 7c1d61c..ebba984 100644 --- a/cask/src/cask/package.scala +++ b/cask/src/cask/package.scala @@ -55,6 +55,6 @@ package object cask { type Logger = util.Logger val Logger = util.Logger - type BatchActor[T] = util.BatchActor[T] + type BatchActor[T] = actor.BatchActor[T] } diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala deleted file mode 100644 index 4985fc3..0000000 --- a/cask/util/src/cask/util/BatchActor.scala +++ /dev/null @@ -1,46 +0,0 @@ -package cask.util - -import scala.collection.mutable -import scala.concurrent.ExecutionContext - -/** - * A simple asynchronous actor, allowing safe concurrent asynchronous processing - * of queued items. `run` handles items in batches, to allow for batch - * processing optimizations to be used where relevant. - */ -abstract class BatchActor[T]()(implicit ec: ExecutionContext, - log: Logger) { - def run(items: Seq[T]): Unit - - private val queue = new mutable.Queue[T]() - private var scheduled = false - def send(t: T): Unit = synchronized{ - queue.enqueue(t) - if (!scheduled){ - scheduled = true - ec.execute(() => runWithItems()) - } - } - - private[this] def runWithItems(): Unit = { - val items = synchronized(queue.dequeueAll(_ => true)) - try run(items) - catch{case e: Throwable => log.exception(e)} - synchronized{ - if (queue.nonEmpty) ec.execute(() => runWithItems()) - else{ - assert(scheduled) - scheduled = false - } - } - } -} - -abstract class StateMachineActor[T]() - (implicit ec: ExecutionContext, - log: Logger) extends BatchActor[T](){ - class State(val run: T => State) - protected[this] def initialState: State - protected[this] var state: State = initialState - def run(items: Seq[T]): Unit = items.foreach{i => state = state.run(i)} -} diff --git a/cask/util/src/cask/util/WsClient.scala b/cask/util/src/cask/util/WsClient.scala index fbde444..28277bb 100644 --- a/cask/util/src/cask/util/WsClient.scala +++ b/cask/util/src/cask/util/WsClient.scala @@ -4,10 +4,10 @@ import scala.concurrent.duration.Duration import scala.concurrent.{Await, ExecutionContext, Promise} class WsClient(impl: WebsocketBase) - (implicit ec: ExecutionContext, log: Logger) - extends cask.util.BatchActor[Ws.Event]{ + (implicit ac: cask.actor.Context, log: Logger) + extends cask.actor.SimpleActor[Ws.Event]{ - def run(items: Seq[Ws.Event]): Unit = items.foreach{ + def run(item: Ws.Event): Unit = item match{ case Ws.Text(s) => impl.send(s) case Ws.Binary(s) => impl.send(s) case Ws.Close(_, _) => impl.close() @@ -18,14 +18,14 @@ class WsClient(impl: WebsocketBase) object WsClient{ def connect(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): WsClient = { + (implicit ac: cask.actor.Context, log: Logger): WsClient = { Await.result(connectAsync(url)(f), Duration.Inf) } def connectAsync(url: String) (f: PartialFunction[cask.util.Ws.Event, Unit]) - (implicit ec: ExecutionContext, log: Logger): scala.concurrent.Future[WsClient] = { - object receiveActor extends cask.util.BatchActor[Ws.Event] { - def run(items: Seq[Ws.Event]) = items.foreach(x => f.applyOrElse(x, (_: Ws.Event) => ())) + (implicit ac: cask.actor.Context, log: Logger): scala.concurrent.Future[WsClient] = { + object receiveActor extends cask.actor.SimpleActor[Ws.Event] { + def run(item: Ws.Event) = f.lift(item) } val p = Promise[WsClient] val impl = new WebsocketClientImpl(url) { diff --git a/example/websockets/app/test/src/ExampleTests.scala b/example/websockets/app/test/src/ExampleTests.scala index 4ab6688..9671198 100644 --- a/example/websockets/app/test/src/ExampleTests.scala +++ b/example/websockets/app/test/src/ExampleTests.scala @@ -1,13 +1,12 @@ + package app import java.util.concurrent.atomic.AtomicInteger - +import cask.actor.Context.Simple.global import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ -import concurrent.ExecutionContext.Implicits.global import cask.Logger.Console.globalLogger - object ExampleTests extends TestSuite{ diff --git a/example/websockets2/app/test/src/ExampleTests.scala b/example/websockets2/app/test/src/ExampleTests.scala index 4d70f28..6112b10 100644 --- a/example/websockets2/app/test/src/ExampleTests.scala +++ b/example/websockets2/app/test/src/ExampleTests.scala @@ -1,10 +1,10 @@ package app import java.util.concurrent.atomic.AtomicInteger - +import cask.actor.Context.Simple.global import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ -import concurrent.ExecutionContext.Implicits.global + import cask.Logger.Console.globalLogger object ExampleTests extends TestSuite{ diff --git a/example/websockets3/app/test/src/ExampleTests.scala b/example/websockets3/app/test/src/ExampleTests.scala index 00665d5..0d42983 100644 --- a/example/websockets3/app/test/src/ExampleTests.scala +++ b/example/websockets3/app/test/src/ExampleTests.scala @@ -1,11 +1,9 @@ package app -import java.util.concurrent.atomic.AtomicInteger - -import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ -import concurrent.ExecutionContext.Implicits.global import cask.Logger.Console.globalLogger +import cask.actor.Context.Simple.global + object ExampleTests extends TestSuite{ diff --git a/example/websockets4/app/test/src/ExampleTests.scala b/example/websockets4/app/test/src/ExampleTests.scala index a2c793b..95f0fba 100644 --- a/example/websockets4/app/test/src/ExampleTests.scala +++ b/example/websockets4/app/test/src/ExampleTests.scala @@ -1,11 +1,8 @@ package app -import java.util.concurrent.atomic.AtomicInteger - -import org.asynchttpclient.ws.{WebSocket, WebSocketListener, WebSocketUpgradeHandler} import utest._ -import concurrent.ExecutionContext.Implicits.global import cask.Logger.Console.globalLogger +import cask.actor.Context.Simple.global object ExampleTests extends TestSuite{ -- cgit v1.2.3