aboutsummaryrefslogtreecommitdiff
path: root/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala
blob: 9d426d3bc4efc79b17ccff897b8015d8a277f6fb (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.github.jodersky.ace

import scala.concurrent.Future

/** Represents a layer in a reactive protocol.
 *  @tparam L data type this layer receives from or writes to a lower layer
 *  @tparam T data type this layer sends to a higher layer or receives from a higher */
trait ReactiveLayer[L, T] {
  private var lowerLayer: Option[ReactiveLayer[_, L]] = None 
  private var higherLayer: Option[ReactiveLayer[T, _]] = None
  
  /** Notifies a higher layer that data is available. */
  protected def notifyHigher(data: T): Unit = higherLayer match {
    case Some(higher) => higher.receive(data)
    case None => throw new UnsupportedOperationException("Higher layer doesn't exist.")
  }
  
  /** Sends data to a lower layer. */
  protected def sendToLower(l: L): Future[L] = lowerLayer match {
    case Some(lower) => lower.send(l)
    case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist."))
  }
  
  /** Connects this layer with a higher layer, effectively linking calls
   *  `notifyHigher` to `higher.receive` and `higher.sendToLower` to `send`. */
  def connect[A](higher: ReactiveLayer[T, A]) = {
    this.higherLayer = Some(higher)
    higher.lowerLayer = Some(this)
    higher
  }
  
  /** Called from lower layer. */
  protected def receive(data: L): Unit
  
  /** Send data to this layer.
   *  @return a future value containing the data sent, or an error */
  def send(data: T): Future[T]
}