diff options
Diffstat (limited to 'src/main/scala/escale/api.scala')
-rw-r--r-- | src/main/scala/escale/api.scala | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/main/scala/escale/api.scala b/src/main/scala/escale/api.scala new file mode 100644 index 0000000..57cb31d --- /dev/null +++ b/src/main/scala/escale/api.scala @@ -0,0 +1,73 @@ +package escale + +import scala.collection.mutable +import scala.concurrent.{Future, Promise} + +class Channel[A](capacity: Int) { + require(capacity >= 0, "capacity must be >= 0") + import Channel._ + + private val puts = mutable.Queue.empty[(Handler[Unit], A)] + private val takes = mutable.Queue.empty[Handler[A]] + + private val buffer = mutable.Queue.empty[A] + + def put(value: A): Future[Unit] = synchronized { + val handler = new Handler[Unit] + + if (takes.size > 0) { + val th = takes.dequeue() + th.promise.success(value) + handler.promise.success(()) + } else if (buffer.size < capacity) { + buffer.enqueue(value) + handler.promise.success(()) + } else { + if (puts.size >= MaxOps) { + handler.promise.failure( + new IllegalArgumentException("Too many pending put operations.")) + } else { + puts.enqueue(handler -> value) + } + } + handler.promise.future + } + def take(): Future[A] = synchronized { + val handler = new Handler[A] + + if (puts.size > 0) { + val (ph, pd) = puts.dequeue() + val data = if (capacity == 0) { + pd + } else { + val d = buffer.dequeue() + buffer.enqueue(pd) + d + } + ph.promise.success(()) + handler.promise.success(data) + } else if (buffer.isEmpty) { + if (takes.size >= MaxOps) { + handler.promise.failure( + new IllegalArgumentException("Too many pending take operations.")) + } else { + takes.enqueue(handler) + } + } else { + handler.promise.success(buffer.dequeue()) + } + handler.promise.future + } + +} + +object Channel { + final val MaxOps = 2 + + def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity) + +} + +class Handler[A] { + val promise = Promise[A] +} |