diff options
author | Jakob Odersky <jodersky@gmail.com> | 2013-03-02 12:06:03 +0100 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2013-03-02 12:06:03 +0100 |
commit | 494b80da610fe2b4cb1790ae237437a17ac6ef62 (patch) | |
tree | d36b5ed810851198a21de640128631c9846a4835 /scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala | |
parent | 5b00baec28e8e7976636df86f30bc3a6412f8203 (diff) | |
download | ace-494b80da610fe2b4cb1790ae237437a17ac6ef62.tar.gz ace-494b80da610fe2b4cb1790ae237437a17ac6ef62.tar.bz2 ace-494b80da610fe2b4cb1790ae237437a17ac6ef62.zip |
simplify scala implementation
Diffstat (limited to 'scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala')
-rw-r--r-- | scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala | 83 |
1 files changed, 83 insertions, 0 deletions
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala new file mode 100644 index 0000000..f7c4376 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala @@ -0,0 +1,83 @@ +package com.github.jodersky.ace + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import java.io.IOException +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.util.Success + +class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] { + import Arq._ + + case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]]) + + // a map containing yet to be acknowledged messages + private val openMessages = HashMap[Int, OpenMessage]() + + // received message sequences + private val receivedSequences = Queue[Int]() + + protected def receive(frameData: Seq[Int]) = { + val sequence = frameData(SequenceOffset) + val command = frameData(CommandOffset) + val message = frameData.drop(MessageOffset) + + command match { + + case Ack => { + openMessages.get(sequence) map { + case OpenMessage(data, promise) => promise.complete(Success(data)) + } + } + + case Data => { + sendToLower(ack(sequence)) + + if (!(receivedSequences contains sequence)) { + if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue + receivedSequences enqueue sequence + notifyHigher(message) + } + } + + } + } + + def send(message: Seq[Int]) = { + val promise = Promise[Seq[Int]] + val sequence = nextSequence() + + val frameData = Seq(sequence, Data) ++ message + + def send(n: Int): Future[Seq[Int]] = + sendToLower(frameData) map { frameData => + Await.result(promise.future, timeout.milliseconds) + } recoverWith { + case t: TimeoutException if (n < maxResends) => send(n + 1) + } + + if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests")) + else { + openMessages += (sequence -> OpenMessage(message, promise)) + send(0) andThen { case successOrFailure => (openMessages -= sequence) } + } + } + +} + +object Arq { + final val MaxSequence = 255 + final val Data = 0x05 + final val Ack = 0x06 + + final val SequenceOffset = 0 + final val CommandOffset = 1 + final val MessageOffset = 2 + + private[this] var seq = 0 + private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq } + + def ack(seq: Int) = Seq(seq, Ack) +}
\ No newline at end of file |