diff options
Diffstat (limited to 'src/main/scala/escale/api.scala')
-rw-r--r-- | src/main/scala/escale/api.scala | 73 |
1 files changed, 0 insertions, 73 deletions
diff --git a/src/main/scala/escale/api.scala b/src/main/scala/escale/api.scala deleted file mode 100644 index 57cb31d..0000000 --- a/src/main/scala/escale/api.scala +++ /dev/null @@ -1,73 +0,0 @@ -package escale - -import scala.collection.mutable -import scala.concurrent.{Future, Promise} - -class Channel[A](capacity: Int) { - require(capacity >= 0, "capacity must be >= 0") - import Channel._ - - private val puts = mutable.Queue.empty[(Handler[Unit], A)] - private val takes = mutable.Queue.empty[Handler[A]] - - private val buffer = mutable.Queue.empty[A] - - def put(value: A): Future[Unit] = synchronized { - val handler = new Handler[Unit] - - if (takes.size > 0) { - val th = takes.dequeue() - th.promise.success(value) - handler.promise.success(()) - } else if (buffer.size < capacity) { - buffer.enqueue(value) - handler.promise.success(()) - } else { - if (puts.size >= MaxOps) { - handler.promise.failure( - new IllegalArgumentException("Too many pending put operations.")) - } else { - puts.enqueue(handler -> value) - } - } - handler.promise.future - } - def take(): Future[A] = synchronized { - val handler = new Handler[A] - - if (puts.size > 0) { - val (ph, pd) = puts.dequeue() - val data = if (capacity == 0) { - pd - } else { - val d = buffer.dequeue() - buffer.enqueue(pd) - d - } - ph.promise.success(()) - handler.promise.success(data) - } else if (buffer.isEmpty) { - if (takes.size >= MaxOps) { - handler.promise.failure( - new IllegalArgumentException("Too many pending take operations.")) - } else { - takes.enqueue(handler) - } - } else { - handler.promise.success(buffer.dequeue()) - } - handler.promise.future - } - -} - -object Channel { - final val MaxOps = 2 - - def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity) - -} - -class Handler[A] { - val promise = Promise[A] -} |