From 20a2350f9b33a797763413509781d0686fc38fe5 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 26 Feb 2013 18:22:53 +0100 Subject: simplify scala implementation --- .../jodersky/ace/protocol/TransportLayer.scala | 88 ---------------------- 1 file changed, 88 deletions(-) delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala (limited to 'scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala') diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala deleted file mode 100644 index 2055770..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala +++ /dev/null @@ -1,88 +0,0 @@ -package com.github.jodersky.ace.protocol - -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import java.io.IOException -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import com.github.jodersky.ace.protocol.{Packet => PPacket} -import scala.util.Success - -class TransportLayer extends ReactiveLayer[Packet, Message] { - - - private val openRequests = HashMap[Int, (Message, Promise[Message])]() - private val receivedSeqs = Queue[Int]() - - class Packet(val data: Seq[Int]) { - - def seq = data(0) - def cmd = data(1) - def message = Message(data.drop(2)) - - def underlying = PPacket(data) - } - - object Packet { - final val DATA = 0x05 - final val ACK = 0x06 - private var seq = 0; - private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq} - - def fromPacket(packet: PPacket) = new Packet(packet.data) - - def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data) - - def ack(seq: Int) = new Packet (Seq(seq, ACK)) - - } - - def receive(ppacket: PPacket) = { - val in = Packet.fromPacket(ppacket) - - in.cmd match { - case Packet.ACK => { - openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))} - } - - case Packet.DATA => { - writeToLower(Packet.ack(in.seq).underlying) - - if (!(receivedSeqs contains in.seq)) { - if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue - receivedSeqs enqueue in.seq - notifyHigher(in.message) - } - } - - } - } - - def write(message: Message) = { - val promise = Promise[Message] - val packet = Packet.fromMessage(message) - val seq = packet.seq - - def send(n: Int): Future[Message] = - writeToLower(packet.underlying) map { packet => - Await.result(promise.future, TransportLayer.Timeout milliseconds) - } recoverWith { - case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1) - } - - if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests")) - else { - openRequests += (seq -> (message, promise)) - send(0) andThen {case r => (openRequests -= seq)} - } - } - -} - -object TransportLayer { - final val Timeout = 100 - final val MaxResends = 5 - final val MaxSeq = 255 - final val MaxSeqBuffer = 10 -} \ No newline at end of file -- cgit v1.2.3