blob: 205577085f79e58b8aa1ce4b2c95c7b9e3a0352e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
package com.github.jodersky.ace.protocol
import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import java.io.IOException
import scala.collection.mutable.HashMap
import scala.collection.mutable.Queue
import com.github.jodersky.ace.protocol.{Packet => PPacket}
import scala.util.Success
class TransportLayer extends ReactiveLayer[Packet, Message] {
private val openRequests = HashMap[Int, (Message, Promise[Message])]()
private val receivedSeqs = Queue[Int]()
class Packet(val data: Seq[Int]) {
def seq = data(0)
def cmd = data(1)
def message = Message(data.drop(2))
def underlying = PPacket(data)
}
object Packet {
final val DATA = 0x05
final val ACK = 0x06
private var seq = 0;
private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq}
def fromPacket(packet: PPacket) = new Packet(packet.data)
def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data)
def ack(seq: Int) = new Packet (Seq(seq, ACK))
}
def receive(ppacket: PPacket) = {
val in = Packet.fromPacket(ppacket)
in.cmd match {
case Packet.ACK => {
openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))}
}
case Packet.DATA => {
writeToLower(Packet.ack(in.seq).underlying)
if (!(receivedSeqs contains in.seq)) {
if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue
receivedSeqs enqueue in.seq
notifyHigher(in.message)
}
}
}
}
def write(message: Message) = {
val promise = Promise[Message]
val packet = Packet.fromMessage(message)
val seq = packet.seq
def send(n: Int): Future[Message] =
writeToLower(packet.underlying) map { packet =>
Await.result(promise.future, TransportLayer.Timeout milliseconds)
} recoverWith {
case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1)
}
if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests"))
else {
openRequests += (seq -> (message, promise))
send(0) andThen {case r => (openRequests -= seq)}
}
}
}
object TransportLayer {
final val Timeout = 100
final val MaxResends = 5
final val MaxSeq = 255
final val MaxSeqBuffer = 10
}
|