blob: 97080231d34a04faee06de2ec75ef5802dc01963 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
package com.github.jodersky.ace
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import java.io.IOException
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import scala.util.Success
class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] {
require(maxMessageBuffer < 256, "max amount of messages must fit into a byte")
import Arq._
case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]])
// a map containing yet to be acknowledged messages
private val openMessages = HashMap[Int, OpenMessage]()
// received message sequences
private val receivedSequences = Queue[Int]()
protected def receive(frameData: Seq[Int]) = {
val sequence = frameData(SequenceOffset)
val command = frameData(CommandOffset)
val message = frameData.drop(MessageOffset)
command match {
case Ack => {
openMessages.get(sequence) map {
case OpenMessage(data, promise) => promise.complete(Success(data))
}
}
case Data => {
sendToLower(ack(sequence))
if (!(receivedSequences contains sequence)) {
if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue
receivedSequences enqueue sequence
notifyHigher(message)
}
}
}
}
def send(message: Seq[Int]) = {
val promise = Promise[Seq[Int]]
val sequence = nextSequence()
val frameData = Seq(sequence, Data) ++ message
def send(n: Int): Future[Seq[Int]] =
sendToLower(frameData) map { frameData =>
Await.result(promise.future, timeout.milliseconds)
} recoverWith {
case t: TimeoutException if (n < maxResends) => send(n + 1)
}
if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests"))
else {
openMessages += (sequence -> OpenMessage(message, promise))
send(0) andThen { case successOrFailure => (openMessages -= sequence) }
}
}
}
object Arq {
final val MaxSequence = 255
final val Data = 0x05
final val Ack = 0x06
final val SequenceOffset = 0
final val CommandOffset = 1
final val MessageOffset = 2
private[this] var seq = 0
private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq }
def ack(seq: Int) = Seq(seq, Ack)
}
|