From 20a2350f9b33a797763413509781d0686fc38fe5 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Tue, 26 Feb 2013 18:22:53 +0100 Subject: simplify scala implementation --- scala/ace/lib/jssc.jar | Bin 120277 -> 0 bytes .../scala/com/github/jodersky/ace/Arduino.scala | 28 ------- .../main/scala/com/github/jodersky/ace/Arq.scala | 83 +++++++++++++++++++ .../scala/com/github/jodersky/ace/Framer.scala | 83 +++++++++++++++++++ .../main/scala/com/github/jodersky/ace/Main.scala | 18 ----- .../com/github/jodersky/ace/ReactiveLayer.scala | 38 +++++++++ .../scala/com/github/jodersky/ace/SafeSerial.scala | 35 -------- .../github/jodersky/ace/protocol/LinkLayer.scala | 72 ----------------- .../com/github/jodersky/ace/protocol/Message.scala | 3 - .../com/github/jodersky/ace/protocol/Packet.scala | 3 - .../jodersky/ace/protocol/PhysicalLayer.scala | 37 --------- .../jodersky/ace/protocol/ReactiveLayer.scala | 38 --------- .../jodersky/ace/protocol/SecureSerial.scala | 22 ------ .../jodersky/ace/protocol/TransportLayer.scala | 88 --------------------- 14 files changed, 204 insertions(+), 344 deletions(-) delete mode 100644 scala/ace/lib/jssc.jar delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala diff --git a/scala/ace/lib/jssc.jar b/scala/ace/lib/jssc.jar deleted file mode 100644 index b0d6a82..0000000 Binary files a/scala/ace/lib/jssc.jar and /dev/null differ diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala deleted file mode 100644 index 7077570..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala +++ /dev/null @@ -1,28 +0,0 @@ -package com.github.jodersky.ace - -import scala.util.Try -import jssc.SerialPort -import jssc.SerialPortEventListener -import jssc.SerialPortEvent -import scala.util.Success -import com.github.jodersky.ace.protocol.SecureSerial -import scala.concurrent.Await -import scala.concurrent.duration._ - -object Arduino { - - private def open(port: String, rate: Int) = { - val serialPort = new SerialPort(port); - serialPort.openPort(); - serialPort.setParams(rate, - SerialPort.DATABITS_8, - SerialPort.STOPBITS_1, - SerialPort.PARITY_NONE); //Set params. Also you can set params by this string: serialPort.setParams(9600, 8, 1, 0); - // serialPort.writeBytes("This is a test string".getBytes()); //Write data to port - //serialPort.closePort(); //Close serial port - serialPort - } - - def connect(port: String) = new SecureSerial(open(port, 115200)) - -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala new file mode 100644 index 0000000..3f8f84d --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala @@ -0,0 +1,83 @@ +package com.github.jodersky.ace + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import java.io.IOException +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.util.Success + +class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] { + import Arq._ + + case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]]) + + // a map containing yet to be acknowledged messages + private val openMessages = HashMap[Int, OpenMessage]() + + // received message sequences + private val receivedSequences = Queue[Int]() + + def receive(frameData: Seq[Int]) = { + val sequence = frameData(SequenceOffset) + val command = frameData(CommandOffset) + val message = frameData.drop(MessageOffset) + + command match { + + case Ack => { + openMessages.get(sequence) map { + case OpenMessage(data, promise) => promise.complete(Success(data)) + } + } + + case Data => { + writeToLower(ack(sequence)) + + if (!(receivedSequences contains sequence)) { + if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue + receivedSequences enqueue sequence + notifyHigher(message) + } + } + + } + } + + def write(message: Seq[Int]) = { + val promise = Promise[Seq[Int]] + val sequence = nextSequence() + + val frameData = Seq(sequence, Data) ++ message + + def send(n: Int): Future[Seq[Int]] = + writeToLower(frameData) map { frameData => + Await.result(promise.future, timeout.milliseconds) + } recoverWith { + case t: TimeoutException if (n < maxResends) => send(n + 1) + } + + if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests")) + else { + openMessages += (sequence -> OpenMessage(message, promise)) + send(0) andThen { case successOrFailure => (openMessages -= sequence) } + } + } + +} + +object Arq { + final val MaxSequence = 255 + final val Data = 0x05 + final val Ack = 0x06 + + final val SequenceOffset = 0 + final val CommandOffset = 1 + final val MessageOffset = 2 + + private[this] var seq = 0 + private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq } + + def ack(seq: Int) = Seq(seq, Ack) +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala new file mode 100644 index 0000000..52496e4 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala @@ -0,0 +1,83 @@ +package com.github.jodersky.ace + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +/** A framer takes bytes (as unsigned integers) and creates frames. + * Note that the input type of this reactive layer is also a sequence of + * integers for performance reasons (i.e. a future will not be created for every byte sent). + */ +class Framer extends ReactiveLayer[Seq[Int], Seq[Int]] { + import Framer._ + + private var state: State = Waiting + private val buffer = new ArrayBuffer[Int] + + def receive(bytes: Seq[Int]) = bytes foreach receive + + def receive(byte: Int): Unit = { + + state match { + case Escaping => { + buffer += byte + state = Receiving + } + case Waiting => if (byte == Start) { + buffer.clear() + state = Receiving + } + case Receiving => byte match { + case Escape => state = Escaping + case Start => buffer.clear() + case Stop => { + state = Waiting + if (checksum(buffer.init) == buffer.last) + notifyHigher(buffer.init) + } + case datum => buffer += datum + } + } + } + + def write(data: Seq[Int]): Future[Seq[Int]] = { + val buffer = new ArrayBuffer[Int] + + buffer += Start + data foreach { byte => + byte match { + case Start | Stop | Escape => { + buffer += Escape + buffer += byte + } + case _ => buffer += byte + } + } + val c = checksum(data) + c match { + case Start | Stop | Escape => { + buffer += Escape + buffer += c + } + case _ => buffer += c + } + buffer += Stop + writeToLower(buffer) map (_ => data) + } +} + +object Framer { + sealed trait State + case object Waiting extends State + case object Receiving extends State + case object Escaping extends State + + final val Escape = 0x02 + final val Start = 0x03 + final val Stop = 0x10 + + def checksum(unsignedData: Seq[Int]) = { + unsignedData.fold(0)(_ ^ _) + } + +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala deleted file mode 100644 index 17351e2..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala +++ /dev/null @@ -1,18 +0,0 @@ -package com.github.jodersky.ace - -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global - -object Main { - - def main(args: Array[String]): Unit = { - val s = SafeSerial.open("/dev/ttyACM0", 115200).get - var cmd: String = "" - while (cmd != ":quit"){ - cmd = Console.readLine - s.send(cmd).onComplete(v => println("sent: " + v)) - } - s.close() - } - -} diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala new file mode 100644 index 0000000..663a4a7 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala @@ -0,0 +1,38 @@ +package com.github.jodersky.ace + +import scala.concurrent.Future + +/** Represents a layer in a reactive protocol. + * @tparam L data type this layer receives from or writes to a lower layer + * @tparam T data type this layer sends to a higher layer or receives from a higher */ +trait ReactiveLayer[L, T] { + private var lowerLayer: Option[ReactiveLayer[_, L]] = None + private var higherLayer: Option[ReactiveLayer[T, _]] = None + + /** Notifies a higher layer that data is available. */ + protected def notifyHigher(data: T): Unit = higherLayer match { + case Some(higher) => higher.receive(data) + case None => throw new UnsupportedOperationException("Higher layer doesn't exist.") + } + + /** Writes data to a lower layer. */ + protected def writeToLower(l: L): Future[L] = lowerLayer match { + case Some(lower) => lower.write(l) + case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist.")) + } + + /** Connects this layer with a higher layer, effectively linking calls + * `notifyHigher` to `higher.receive` and `higher.writeToLower` to `write`. */ + def connect[A](higher: ReactiveLayer[T, A]) = { + this.higherLayer = Some(higher) + higher.lowerLayer = Some(this) + higher + } + + /** Called from lower layer. */ + def receive(data: L): Unit + + /** Write data to this layer. + * @return a future value containing the data written, or an error */ + def write(data: T): Future[T] +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala deleted file mode 100644 index d75d6f5..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala +++ /dev/null @@ -1,35 +0,0 @@ -package com.github.jodersky.ace - -import com.github.jodersky.ace.protocol._ -import jssc.SerialPort -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.Try - -class SafeSerial(port: SerialPort) { self => - val physical = new PhysicalLayer(port) - val link = new LinkLayer - val transport = new TransportLayer - - val application = new ReactiveLayer[Message, String] { - def receive(message: Message) = Console.println(message.data.map(_.toChar).mkString("")) - def write(s: String) = writeToLower(Message(s.map(_.toChar.toInt))).map(x => s) - } - - def send(s: String) = application.write(s) - def close() = Try(port.closePort()) - - physical connect link connect transport connect application - physical.begin() -} - -object SafeSerial { - def open(port: String, rate: Int) = Try { - val serialPort = new SerialPort(port); - serialPort.openPort() - serialPort.setParams(rate, - SerialPort.DATABITS_8, - SerialPort.STOPBITS_1, - SerialPort.PARITY_NONE) - serialPort - } map (port => new SafeSerial(port)) -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala deleted file mode 100644 index 5f8d228..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala +++ /dev/null @@ -1,72 +0,0 @@ -package com.github.jodersky.ace.protocol - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future -import scala.concurrent.ExecutionContext.Implicits.global - -class LinkLayer extends ReactiveLayer[Array[Byte], Packet] { - import LinkLayer._ - - private var state: State = Waiting - private val buffer = new ArrayBuffer[Int] - - def receive(data: Array[Byte]) = for (d <- data) receive(d) - - def receive(data: Byte): Unit = { - val unsigned = data & 0xff - - state match { - case Escaping => { - buffer += unsigned - state = Receiving - } - case Waiting => if (unsigned == Start) { - buffer.clear() - state = Receiving - } - case Receiving => unsigned match { - case Escape => state = Escaping - case Start => buffer.clear() - case End => { - state = Waiting - if (checksum(buffer.init.toArray) == buffer.last) - notifyHigher(Packet(buffer.init.toArray)) - } - case datum => buffer += datum - } - } - } - - def write(packet: Packet): Future[Packet] = { - val buffer = new ArrayBuffer[Int] - buffer += Start - packet.data foreach { unsigned => - unsigned match { - case Start | End | Escape => { - buffer += Escape - buffer += unsigned - } - case _ => buffer += unsigned - } - } - buffer += checksum(packet.data) - buffer += End - writeToLower(buffer.map(_.toByte).toArray).map(_ => packet) - } -} - -object LinkLayer { - sealed trait State - case object Waiting extends State - case object Receiving extends State - case object Escaping extends State - - final val Escape = 0x02 - final val Start = 0x03 - final val End = 0x10 - - def checksum(unsignedData: Seq[Int]) = { - unsignedData.fold(0)(_ ^ _) - } - -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala deleted file mode 100644 index 32edfea..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala +++ /dev/null @@ -1,3 +0,0 @@ -package com.github.jodersky.ace.protocol - -case class Message(data: Seq[Int]) \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala deleted file mode 100644 index 583022f..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala +++ /dev/null @@ -1,3 +0,0 @@ -package com.github.jodersky.ace.protocol - -case class Packet(data: Seq[Int]) \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala deleted file mode 100644 index 39c2d25..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala +++ /dev/null @@ -1,37 +0,0 @@ -package com.github.jodersky.ace.protocol - -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global -import jssc.SerialPort -import java.io.IOException -import jssc.SerialPortEvent -import jssc.SerialPortEventListener - -class PhysicalLayer(serial: SerialPort) extends ReactiveLayer[Nothing, Array[Byte]] { - - def receive(nothing: Nothing) = throw new UnsupportedOperationException("A receive function cannot be called on the lowest layer.") - - private val listener = new SerialPortEventListener { - override def serialEvent(event: SerialPortEvent) = { - if (event.isRXCHAR()) { - val bytes = serial.readBytes - if (bytes != null) notifyHigher(bytes) - } - } - } - - - def write(data: Array[Byte]) = future { - serial.writeBytes(data) - } map { success => - if (success) data - else throw new IOException("Could not write to serial port.") - } - - def begin() = { - val mask = SerialPort.MASK_RXCHAR + SerialPort.MASK_CTS + SerialPort.MASK_DSR - serial.setEventsMask(mask) - serial.addEventListener(listener) - } - -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala deleted file mode 100644 index 792947a..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala +++ /dev/null @@ -1,38 +0,0 @@ -package com.github.jodersky.ace.protocol - -import scala.concurrent.Future - -/** Represents a layer in a reactive protocol. - * @tparam L data type this layer receives from or writes to a lower layer - * @tparam T data type this layer sends to a higher layer or receives from a higher */ -trait ReactiveLayer[L, T] { - private var lowerLayer: Option[ReactiveLayer[_, L]] = None - private var higherLayer: Option[ReactiveLayer[T, _]] = None - - /** Notifies a higher layer that data is available. */ - protected def notifyHigher(data: T): Unit = higherLayer match { - case Some(higher) => higher.receive(data) - case None => throw new UnsupportedOperationException("Higher layer doesn't exist.") - } - - /** Writes data to a lower layer. */ - protected def writeToLower(l: L): Future[L] = lowerLayer match { - case Some(lower) => lower.write(l) - case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist.")) - } - - /** Connects this layer with a higher layer, effectively linking calls - * `notifyHigher` to `higher.receive` and `higher.writeToLower` to `write`. */ - def connect[A](higher: ReactiveLayer[T, A]) = { - this.higherLayer = Some(higher) - higher.lowerLayer = Some(this) - higher - } - - /** Called from lower layer. */ - def receive(data: L): Unit - - /** Write data to this layer. - * @return a future value containing the data written, or an error */ - def write(data: T): Future[T] -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala deleted file mode 100644 index e229b82..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala +++ /dev/null @@ -1,22 +0,0 @@ -package com.github.jodersky.ace.protocol - -import jssc.SerialPort -import scala.concurrent.ExecutionContext.Implicits.global -import scala.util.Try - -class SecureSerial(port: SerialPort) { self => - val physical = new PhysicalLayer(port) - val link = new LinkLayer - val transport = new TransportLayer - - val application = new ReactiveLayer[Message, String] { - def receive(message: Message) = Console.println(message.data.mkString("")) - def write(s: String) = writeToLower(Message(s.map(_.toInt))).map(x => s) - } - - def send(s: String) = application.write(s) - def close() = Try(port.closePort()) - - physical connect link connect transport connect application - physical.begin() -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala deleted file mode 100644 index 2055770..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala +++ /dev/null @@ -1,88 +0,0 @@ -package com.github.jodersky.ace.protocol - -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import java.io.IOException -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import com.github.jodersky.ace.protocol.{Packet => PPacket} -import scala.util.Success - -class TransportLayer extends ReactiveLayer[Packet, Message] { - - - private val openRequests = HashMap[Int, (Message, Promise[Message])]() - private val receivedSeqs = Queue[Int]() - - class Packet(val data: Seq[Int]) { - - def seq = data(0) - def cmd = data(1) - def message = Message(data.drop(2)) - - def underlying = PPacket(data) - } - - object Packet { - final val DATA = 0x05 - final val ACK = 0x06 - private var seq = 0; - private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq} - - def fromPacket(packet: PPacket) = new Packet(packet.data) - - def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data) - - def ack(seq: Int) = new Packet (Seq(seq, ACK)) - - } - - def receive(ppacket: PPacket) = { - val in = Packet.fromPacket(ppacket) - - in.cmd match { - case Packet.ACK => { - openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))} - } - - case Packet.DATA => { - writeToLower(Packet.ack(in.seq).underlying) - - if (!(receivedSeqs contains in.seq)) { - if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue - receivedSeqs enqueue in.seq - notifyHigher(in.message) - } - } - - } - } - - def write(message: Message) = { - val promise = Promise[Message] - val packet = Packet.fromMessage(message) - val seq = packet.seq - - def send(n: Int): Future[Message] = - writeToLower(packet.underlying) map { packet => - Await.result(promise.future, TransportLayer.Timeout milliseconds) - } recoverWith { - case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1) - } - - if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests")) - else { - openRequests += (seq -> (message, promise)) - send(0) andThen {case r => (openRequests -= seq)} - } - } - -} - -object TransportLayer { - final val Timeout = 100 - final val MaxResends = 5 - final val MaxSeq = 255 - final val MaxSeqBuffer = 10 -} \ No newline at end of file -- cgit v1.2.3