summaryrefslogtreecommitdiff
path: root/shared/src/main/scala/escale/api.scala
blob: ac475ae5bccafe1df1179e25569d70d20162ff33 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package escale

import java.util.concurrent.atomic.AtomicBoolean
import scala.annotation.tailrec
import scala.collection.mutable
import scala.concurrent.{Future, Promise}

class Channel[A](capacity: Int) {
  require(capacity >= 0, "capacity must be >= 0")
  import Channel._

  private val puts = mutable.Queue.empty[(Handler[Unit], A)]
  private val takes = mutable.Queue.empty[Handler[A]]

  private val buffer = mutable.Queue.empty[A]

  @tailrec final def put(handler: Handler[Unit], value: A): Unit =
    synchronized {
      if (takes.size > 0) {
        val th = takes.dequeue()
        val callback = th.commit()
        if (th.active) {

          handler.commit()(())
          callback(value)
        } else {
          put(handler, value)
        }
      } else if (buffer.size < capacity) {
        buffer.enqueue(value)
        handler.commit()(())
      } else {
        require(puts.size < MaxOps, "Too many pending put operations.")
        puts.enqueue(handler -> value)
      }
    }
  def put(value: A): Future[Unit] = {
    val p = Promise[Unit]
    put(new Handler[Unit](_ => p.success(())), value)
    p.future
  }

  def take(handler: Handler[A]): Unit = synchronized {
    if (puts.size > 0) {
      val callback = handler.commit()
      if (handler.active) {
        val (ph, pd) = puts.dequeue()
        val data = if (capacity == 0) {
          pd
        } else {
          val d = buffer.dequeue()
          buffer.enqueue(pd)
          d
        }
        ph.commit()(())
        callback(data)
      }
    } else if (buffer.isEmpty) {
      require(takes.size < MaxOps, "Too many pending take operations")
      takes.enqueue(handler)
    } else {
      val callback = handler.commit()
      if (handler.active) {
        callback(buffer.dequeue())
      }
    }
  }
  def take(): Future[A] = {
    val p = Promise[A]
    take(new Handler[A](a => p.success(a)))
    p.future
  }

}

object Channel {
  final val MaxOps = 1024

  def apply[A](capacity: Int = 0): Channel[A] = new Channel[A](capacity)

  // TODO: this currently consumes a thread for every instance
  def timeout(ms: Int): Channel[Unit] = {
    val c = new Channel[Unit](0)
    Future {
      Thread.sleep(ms)
      c.put(())
    }(scala.concurrent.ExecutionContext.global)
    c
  }

  //def select(ops: Op[_]*): Unit = ???

  def select(channels: Channel[_]*): Future[(Channel[_], Any)] = {
    val flag = new Flag
    val result = Promise[(Channel[_], Any)]
    for (ch <- channels) {
      val handler = new SelectHandler[Any](flag, v => result.success((ch, v)))
      ch.take(handler)
    }
    result.future
  }

  type Op[A] = (Channel[A], A => Unit)

  def select2(reads: Op[_]*): Future[Unit] = {
    val flag = new Flag
    val done = Promise[Unit]
    for ((ch, callback) <- reads) {
      val c = callback.andThen { _ =>
        done.success(())
        ()
      }
      val handler = new SelectHandler(flag, c)
      ch.take(handler)
    }
    done.future
  }

}
class Handler[-A](callback: A => Unit) {
  def active: Boolean = true
  def commit(): A => Unit = callback
}

class Flag {
  val active = new AtomicBoolean(true)
}
class SelectHandler[A](flag: Flag, callback: A => Unit)
    extends Handler[A](callback) {
  var _active = true
  override def active = _active
  override def commit(): A => Unit =
    if (flag.active.compareAndSet(true, false)) {
      callback
    } else {
      _active = false
      _ =>
        ()
    }

}