summaryrefslogtreecommitdiff
path: root/src/main/scala/escale/api.scala
blob: 57cb31db656af93fefd6214db69671dfec78ae9d (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
package escale

import scala.collection.mutable
import scala.concurrent.{Future, Promise}

class Channel[A](capacity: Int) {
  require(capacity >= 0, "capacity must be >= 0")
  import Channel._

  private val puts = mutable.Queue.empty[(Handler[Unit], A)]
  private val takes = mutable.Queue.empty[Handler[A]]

  private val buffer = mutable.Queue.empty[A]

  def put(value: A): Future[Unit] = synchronized {
    val handler = new Handler[Unit]

    if (takes.size > 0) {
      val th = takes.dequeue()
      th.promise.success(value)
      handler.promise.success(())
    } else if (buffer.size < capacity) {
      buffer.enqueue(value)
      handler.promise.success(())
    } else {
      if (puts.size >= MaxOps) {
        handler.promise.failure(
          new IllegalArgumentException("Too many pending put operations."))
      } else {
        puts.enqueue(handler -> value)
      }
    }
    handler.promise.future
  }
  def take(): Future[A] = synchronized {
    val handler = new Handler[A]

    if (puts.size > 0) {
      val (ph, pd) = puts.dequeue()
      val data = if (capacity == 0) {
        pd
      } else {
        val d = buffer.dequeue()
        buffer.enqueue(pd)
        d
      }
      ph.promise.success(())
      handler.promise.success(data)
    } else if (buffer.isEmpty) {
      if (takes.size >= MaxOps) {
        handler.promise.failure(
          new IllegalArgumentException("Too many pending take operations."))
      } else {
        takes.enqueue(handler)
      }
    } else {
      handler.promise.success(buffer.dequeue())
    }
    handler.promise.future
  }

}

object Channel {
  final val MaxOps = 2

  def apply[A](capacity: Int): Channel[A] = new Channel[A](capacity)

}

class Handler[A] {
  val promise = Promise[A]
}