summaryrefslogtreecommitdiff
path: root/cask/util/src/cask/util/BatchActor.scala
diff options
context:
space:
mode:
authorLi Haoyi <haoyi.li@databricks.com>2019-09-16 22:31:03 +0800
committerLi Haoyi <haoyi.li@databricks.com>2019-09-16 22:31:03 +0800
commitbfe26d5a9705011359658c45b364e9b65ce697b5 (patch)
tree656565b3d147b1837d4cfc1d949e7af8c250f16c /cask/util/src/cask/util/BatchActor.scala
parent84ea971b1261919aca7b31635ddc7d0dca830fea (diff)
downloadcask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.gz
cask-bfe26d5a9705011359658c45b364e9b65ce697b5.tar.bz2
cask-bfe26d5a9705011359658c45b364e9b65ce697b5.zip
Provide a simple builtin websocket client in `cask.WsClient`
Harmonize the actor-based APIs of `cask.WsClient`/`cask.WsHandler`/`cask.WsActor`, letting them share the same set of `cask.Ws` events The default implementation of `cask.WsClient` on the JVM spawns one thread per connection, and doesn't really scale to large numbers of connections. For now we just continue using AsyncHttpClient in the load tests. Wrapping AsyncHttpClient in a nice API is TBD
Diffstat (limited to 'cask/util/src/cask/util/BatchActor.scala')
-rw-r--r--cask/util/src/cask/util/BatchActor.scala11
1 files changed, 4 insertions, 7 deletions
diff --git a/cask/util/src/cask/util/BatchActor.scala b/cask/util/src/cask/util/BatchActor.scala
index 137b852..26f1c14 100644
--- a/cask/util/src/cask/util/BatchActor.scala
+++ b/cask/util/src/cask/util/BatchActor.scala
@@ -1,12 +1,10 @@
package cask.util
-import cask.util.Logger
-
import scala.collection.mutable
import scala.concurrent.ExecutionContext
/**
- * A simple asynchrous actor, allowing safe concurrent asynchronous processing
+ * A simple asynchronous actor, allowing safe concurrent asynchronous processing
* of queued items. `run` handles items in batches, to allow for batch
* processing optimizations to be used where relevant.
*/
@@ -16,7 +14,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
private val queue = new mutable.Queue[T]()
private var scheduled = false
- def send(t: => T): Unit = synchronized{
+ def send(t: T): Unit = synchronized{
queue.enqueue(t)
if (!scheduled){
scheduled = true
@@ -24,7 +22,7 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
}
}
- def runWithItems(): Unit = {
+ private[this] def runWithItems(): Unit = {
val items = synchronized(queue.dequeueAll(_ => true))
try run(items)
catch{case e: Throwable => log.exception(e)}
@@ -35,6 +33,5 @@ abstract class BatchActor[T]()(implicit ec: ExecutionContext,
scheduled = false
}
}
-
}
-}
+} \ No newline at end of file