summaryrefslogtreecommitdiff
path: root/src/main/scala/escale/api.scala
diff options
context:
space:
mode:
Diffstat (limited to 'src/main/scala/escale/api.scala')
-rw-r--r--src/main/scala/escale/api.scala73
1 files changed, 0 insertions, 73 deletions
diff --git a/src/main/scala/escale/api.scala b/src/main/scala/escale/api.scala
deleted file mode 100644
index 57cb31d..0000000
--- a/src/main/scala/escale/api.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-package escale
-
-import scala.collection.mutable
-import scala.concurrent.{Future, Promise}
-
-class Channel[A](capacity: Int) {
- require(capacity >= 0, "capacity must be >= 0")
- import Channel._
-
- private val puts = mutable.Queue.empty[(Handler[Unit], A)]
- private val takes = mutable.Queue.empty[Handler[A]]
-
- private val buffer = mutable.Queue.empty[A]
-
- def put(value: A): Future[Unit] = synchronized {
- val handler = new Handler[Unit]
-
- if (takes.size > 0) {
- val th = takes.dequeue()
- th.promise.success(value)
- handler.promise.success(())
- } else if (buffer.size < capacity) {
- buffer.enqueue(value)
- handler.promise.success(())
- } else {
- if (puts.size >= MaxOps) {
- handler.promise.failure(
- new IllegalArgumentException("Too many pending put operations."))
- } else {
- puts.enqueue(handler -> value)
- }
- }
- handler.promise.future
- }
- def take(): Future[A] = synchronized {
- val handler = new Handler[A]
-
- if (puts.size > 0) {
- val (ph, pd) = puts.dequeue()
- val data = if (capacity == 0) {
- pd
- } else {
- val d = buffer.dequeue()
- buffer.enqueue(pd)
- d
- }
- ph.promise.success(())
- handler.promise.success(data)
- } else if (buffer.isEmpty) {
- if (takes.size >= MaxOps) {
- handler.promise.failure(
- new IllegalArgumentException("Too many pending take operations."))
- } else {
- takes.enqueue(handler)
- }
- } else {
- handler.promise.success(buffer.dequeue())
- }
- handler.promise.future
- }
-
-}
-
-object Channel {
- final val MaxOps = 2
-
- def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity)
-
-}
-
-class Handler[A] {
- val promise = Promise[A]
-}