diff options
author | Jakob Odersky <jodersky@gmail.com> | 2013-02-26 17:14:53 +0100 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2013-02-26 17:14:53 +0100 |
commit | 77db2136559ccef7d84cf6c0fd0166a970224680 (patch) | |
tree | 4addef09cfaa1567a952ed3b0522ecd26f96df75 /scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala | |
parent | 03edd62b745f225075fab0d96e0ec93f96c3466c (diff) | |
download | ace-77db2136559ccef7d84cf6c0fd0166a970224680.tar.gz ace-77db2136559ccef7d84cf6c0fd0166a970224680.tar.bz2 ace-77db2136559ccef7d84cf6c0fd0166a970224680.zip |
restructure scala directory in view of providing seperate projects for serial implementations
Diffstat (limited to 'scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala')
-rw-r--r-- | scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala | 88 |
1 files changed, 88 insertions, 0 deletions
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala new file mode 100644 index 0000000..2055770 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala @@ -0,0 +1,88 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import java.io.IOException +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import com.github.jodersky.ace.protocol.{Packet => PPacket} +import scala.util.Success + +class TransportLayer extends ReactiveLayer[Packet, Message] { + + + private val openRequests = HashMap[Int, (Message, Promise[Message])]() + private val receivedSeqs = Queue[Int]() + + class Packet(val data: Seq[Int]) { + + def seq = data(0) + def cmd = data(1) + def message = Message(data.drop(2)) + + def underlying = PPacket(data) + } + + object Packet { + final val DATA = 0x05 + final val ACK = 0x06 + private var seq = 0; + private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq} + + def fromPacket(packet: PPacket) = new Packet(packet.data) + + def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data) + + def ack(seq: Int) = new Packet (Seq(seq, ACK)) + + } + + def receive(ppacket: PPacket) = { + val in = Packet.fromPacket(ppacket) + + in.cmd match { + case Packet.ACK => { + openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))} + } + + case Packet.DATA => { + writeToLower(Packet.ack(in.seq).underlying) + + if (!(receivedSeqs contains in.seq)) { + if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue + receivedSeqs enqueue in.seq + notifyHigher(in.message) + } + } + + } + } + + def write(message: Message) = { + val promise = Promise[Message] + val packet = Packet.fromMessage(message) + val seq = packet.seq + + def send(n: Int): Future[Message] = + writeToLower(packet.underlying) map { packet => + Await.result(promise.future, TransportLayer.Timeout milliseconds) + } recoverWith { + case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1) + } + + if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests")) + else { + openRequests += (seq -> (message, promise)) + send(0) andThen {case r => (openRequests -= seq)} + } + } + +} + +object TransportLayer { + final val Timeout = 100 + final val MaxResends = 5 + final val MaxSeq = 255 + final val MaxSeqBuffer = 10 +}
\ No newline at end of file |