diff options
Diffstat (limited to 'shared/src/main/scala/escale/api.scala')
-rw-r--r-- | shared/src/main/scala/escale/api.scala | 141 |
1 files changed, 141 insertions, 0 deletions
diff --git a/shared/src/main/scala/escale/api.scala b/shared/src/main/scala/escale/api.scala new file mode 100644 index 0000000..ac475ae --- /dev/null +++ b/shared/src/main/scala/escale/api.scala @@ -0,0 +1,141 @@ +package escale + +import java.util.concurrent.atomic.AtomicBoolean +import scala.annotation.tailrec +import scala.collection.mutable +import scala.concurrent.{Future, Promise} + +class Channel[A](capacity: Int) { + require(capacity >= 0, "capacity must be >= 0") + import Channel._ + + private val puts = mutable.Queue.empty[(Handler[Unit], A)] + private val takes = mutable.Queue.empty[Handler[A]] + + private val buffer = mutable.Queue.empty[A] + + @tailrec final def put(handler: Handler[Unit], value: A): Unit = + synchronized { + if (takes.size > 0) { + val th = takes.dequeue() + val callback = th.commit() + if (th.active) { + + handler.commit()(()) + callback(value) + } else { + put(handler, value) + } + } else if (buffer.size < capacity) { + buffer.enqueue(value) + handler.commit()(()) + } else { + require(puts.size < MaxOps, "Too many pending put operations.") + puts.enqueue(handler -> value) + } + } + def put(value: A): Future[Unit] = { + val p = Promise[Unit] + put(new Handler[Unit](_ => p.success(())), value) + p.future + } + + def take(handler: Handler[A]): Unit = synchronized { + if (puts.size > 0) { + val callback = handler.commit() + if (handler.active) { + val (ph, pd) = puts.dequeue() + val data = if (capacity == 0) { + pd + } else { + val d = buffer.dequeue() + buffer.enqueue(pd) + d + } + ph.commit()(()) + callback(data) + } + } else if (buffer.isEmpty) { + require(takes.size < MaxOps, "Too many pending take operations") + takes.enqueue(handler) + } else { + val callback = handler.commit() + if (handler.active) { + callback(buffer.dequeue()) + } + } + } + def take(): Future[A] = { + val p = Promise[A] + take(new Handler[A](a => p.success(a))) + p.future + } + +} + +object Channel { + final val MaxOps = 1024 + + def apply[A](capacity: Int = 0): Channel[A] = new Channel[A](capacity) + + // TODO: this currently consumes a thread for every instance + def timeout(ms: Int): Channel[Unit] = { + val c = new Channel[Unit](0) + Future { + Thread.sleep(ms) + c.put(()) + }(scala.concurrent.ExecutionContext.global) + c + } + + //def select(ops: Op[_]*): Unit = ??? + + def select(channels: Channel[_]*): Future[(Channel[_], Any)] = { + val flag = new Flag + val result = Promise[(Channel[_], Any)] + for (ch <- channels) { + val handler = new SelectHandler[Any](flag, v => result.success((ch, v))) + ch.take(handler) + } + result.future + } + + type Op[A] = (Channel[A], A => Unit) + + def select2(reads: Op[_]*): Future[Unit] = { + val flag = new Flag + val done = Promise[Unit] + for ((ch, callback) <- reads) { + val c = callback.andThen { _ => + done.success(()) + () + } + val handler = new SelectHandler(flag, c) + ch.take(handler) + } + done.future + } + +} +class Handler[-A](callback: A => Unit) { + def active: Boolean = true + def commit(): A => Unit = callback +} + +class Flag { + val active = new AtomicBoolean(true) +} +class SelectHandler[A](flag: Flag, callback: A => Unit) + extends Handler[A](callback) { + var _active = true + override def active = _active + override def commit(): A => Unit = + if (flag.active.compareAndSet(true, false)) { + callback + } else { + _active = false + _ => + () + } + +} |