diff options
author | Li Haoyi <haoyi.li@databricks.com> | 2019-09-16 22:31:03 +0800 |
---|---|---|
committer | Li Haoyi <haoyi.li@databricks.com> | 2019-09-16 22:31:03 +0800 |
commit | bfe26d5a9705011359658c45b364e9b65ce697b5 (patch) | |
tree | 656565b3d147b1837d4cfc1d949e7af8c250f16c /cask/util/src/cask/util/BatchActor.scala | |
parent | 84ea971b1261919aca7b31635ddc7d0dca830fea (diff) | |
download | cask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.gz cask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.bz2 cask-bfe26d5a9705011359658c45b364e9b65ce697b5.zip |
Provide a simple builtin websocket client in `cask.WsClient`
Harmonize the actor-based APIs of `cask.WsClient`/`cask.WsHandler`/`cask.WsActor`, letting them share the same set of `cask.Ws` events
The default implementation of `cask.WsClient` on the JVM spawns one thread per connection, and doesn't really scale to large numbers of connections. For now we just continue using AsyncHttpClient in the load tests. Wrapping AsyncHttpClient in a nice API is TBD
Diffstat (limited to 'cask/util/src/cask/util/BatchActor.scala')
-rw-r--r-- | cask/util/src/cask/util/BatchActor.scala | 11 |
1 files changed, 4 insertions, 7 deletions
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala index 137b852..26f1c14 100644 --- a/cask/util/src/cask/util/BatchActor.scala +++ b/cask/util/src/cask/util/BatchActor.scala @@ -1,12 +1,10 @@ package cask.util -import cask.util.Logger - import scala.collection.mutable import scala.concurrent.ExecutionContext /** - * A simple asynchrous actor, allowing safe concurrent asynchronous processing + * A simple asynchronous actor, allowing safe concurrent asynchronous processing * of queued items. `run` handles items in batches, to allow for batch * processing optimizations to be used where relevant. */ @@ -16,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, private val queue = new mutable.Queue[T]() private var scheduled = false - def send(t: => T): Unit = synchronized{ + def send(t: T): Unit = synchronized{ queue.enqueue(t) if (!scheduled){ scheduled = true @@ -24,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, } } - def runWithItems(): Unit = { + private[this] def runWithItems(): Unit = { val items = synchronized(queue.dequeueAll(_ => true)) try run(items) catch{case e: Throwable => log.exception(e)} @@ -35,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext, scheduled = false } } - } -} +}
\ No newline at end of file |