summaryrefslogblamecommitdiff
path: root/shared/src/main/scala/escale/api.scala
blob: ac475ae5bccafe1df1179e25569d70d20162ff33 (plain) (tree)












































































































































                                                                              
package escale

import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.{Future, Promise}

class Channel[A](capacity: Int) {
  require(capacity >= 0, "capacity must be >= 0")
  import Channel._

  private val puts = mutable.Queue.empty[(Handler[Unit], A)]
  private val takes = mutable.Queue.empty[Handler[A]]

  private val buffer = mutable.Queue.empty[A]

  @tailrec final def put(handler: Handler[Unit], value: A): Unit =
    synchronized {
      if (takes.size > 0) {
        val th = takes.dequeue()
        val callback = th.commit()
        if (th.active) {

          handler.commit()(())
          callback(value)
        } else {
          put(handler, value)
        }
      } else if (buffer.size < capacity) {
        buffer.enqueue(value)
        handler.commit()(())
      } else {
        require(puts.size < MaxOps, "Too many pending put operations.")
        puts.enqueue(handler -> value)
      }
    }
  def put(value: A): Future[Unit] = {
    val p = Promise[Unit]
    put(new Handler[Unit](_ => p.success(())), value)
    p.future
  }

  def take(handler: Handler[A]): Unit = synchronized {
    if (puts.size > 0) {
      val callback = handler.commit()
      if (handler.active) {
        val (ph, pd) = puts.dequeue()
        val data = if (capacity == 0) {
          pd
        } else {
          val d = buffer.dequeue()
          buffer.enqueue(pd)
          d
        }
        ph.commit()(())
        callback(data)
      }
    } else if (buffer.isEmpty) {
      require(takes.size < MaxOps, "Too many pending take operations")
      takes.enqueue(handler)
    } else {
      val callback = handler.commit()
      if (handler.active) {
        callback(buffer.dequeue())
      }
    }
  }
  def take(): Future[A] = {
    val p = Promise[A]
    take(new Handler[A](a => p.success(a)))
    p.future
  }

}

object Channel {
  final val MaxOps = 1024

  def apply[A](capacity: Int = 0): Channel[A] = new Channel[A](capacity)

  // TODO: this currently consumes a thread for every instance
  def timeout(ms: Int): Channel[Unit] = {
    val c = new Channel[Unit](0)
    Future {
      Thread.sleep(ms)
      c.put(())
    }(scala.concurrent.ExecutionContext.global)
    c
  }

  //def select(ops: Op[_]*): Unit = ???

  def select(channels: Channel[_]*): Future[(Channel[_], Any)] = {
    val flag = new Flag
    val result = Promise[(Channel[_], Any)]
    for (ch <- channels) {
      val handler = new SelectHandler[Any](flag, v => result.success((ch, v)))
      ch.take(handler)
    }
    result.future
  }

  type Op[A] = (Channel[A], A => Unit)

  def select2(reads: Op[_]*): Future[Unit] = {
    val flag = new Flag
    val done = Promise[Unit]
    for ((ch, callback) <- reads) {
      val c = callback.andThen { _ =>
        done.success(())
        ()
      }
      val handler = new SelectHandler(flag, c)
      ch.take(handler)
    }
    done.future
  }

}
class Handler[-A](callback: A => Unit) {
  def active: Boolean = true
  def commit(): A => Unit = callback
}

class Flag {
  val active = new AtomicBoolean(true)
}
class SelectHandler[A](flag: Flag, callback: A => Unit)
    extends Handler[A](callback) {
  var _active = true
  override def active = _active
  override def commit(): A => Unit =
    if (flag.active.compareAndSet(true, false)) {
      callback
    } else {
      _active = false
      _ =>
        ()
    }

}