aboutsummaryrefslogtreecommitdiff
path: root/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala
blob: f7c4376cbf5cd1b27599791a2bfd809d98268d59 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package com.github.jodersky.ace

import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import java.io.IOException
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.util.Success

class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] {
  import Arq._

  case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]])

  // a map containing yet to be acknowledged messages
  private val openMessages = HashMap[Int, OpenMessage]()

  // received message sequences
  private val receivedSequences = Queue[Int]()

  protected def receive(frameData: Seq[Int]) = {
    val sequence = frameData(SequenceOffset)
    val command = frameData(CommandOffset)
    val message = frameData.drop(MessageOffset)

    command match {
      
      case Ack => {
        openMessages.get(sequence) map {
          case OpenMessage(data, promise) => promise.complete(Success(data))
        }
      }

      case Data => {
        sendToLower(ack(sequence))

        if (!(receivedSequences contains sequence)) {
          if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue
          receivedSequences enqueue sequence
          notifyHigher(message)
        }
      }

    }
  }

  def send(message: Seq[Int]) = {
    val promise = Promise[Seq[Int]]
    val sequence = nextSequence()
    
    val frameData = Seq(sequence, Data) ++ message 

    def send(n: Int): Future[Seq[Int]] =
      sendToLower(frameData) map { frameData =>
        Await.result(promise.future, timeout.milliseconds)
      } recoverWith {
        case t: TimeoutException if (n < maxResends) => send(n + 1)
      }

    if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests"))
    else {
      openMessages += (sequence -> OpenMessage(message, promise))
      send(0) andThen { case successOrFailure => (openMessages -= sequence) }
    }
  }

}

object Arq {
  final val MaxSequence = 255
  final val Data = 0x05
  final val Ack = 0x06

  final val SequenceOffset = 0
  final val CommandOffset = 1
  final val MessageOffset = 2

  private[this] var seq = 0
  private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq }

  def ack(seq: Int) = Seq(seq, Ack)
}