aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-02-26 18:22:53 +0100
committerJakob Odersky <jodersky@gmail.com>2013-02-26 18:25:07 +0100
commit20a2350f9b33a797763413509781d0686fc38fe5 (patch)
tree0ee6f84433d8eaec6f2e3a89bae026c095502b25
parent7923f67f3090cd5a8c7e8281f62b0beca0846bec (diff)
downloadace-20a2350f9b33a797763413509781d0686fc38fe5.tar.gz
ace-20a2350f9b33a797763413509781d0686fc38fe5.tar.bz2
ace-20a2350f9b33a797763413509781d0686fc38fe5.zip
simplify scala implementation
-rw-r--r--scala/ace/lib/jssc.jarbin120277 -> 0 bytes
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala28
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala83
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala83
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala18
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala (renamed from scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala)2
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala35
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala72
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala3
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala3
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala37
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala22
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala88
13 files changed, 167 insertions, 307 deletions
diff --git a/scala/ace/lib/jssc.jar b/scala/ace/lib/jssc.jar
deleted file mode 100644
index b0d6a82..0000000
--- a/scala/ace/lib/jssc.jar
+++ /dev/null
Binary files differ
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala
deleted file mode 100644
index 7077570..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/Arduino.scala
+++ /dev/null
@@ -1,28 +0,0 @@
-package com.github.jodersky.ace
-
-import scala.util.Try
-import jssc.SerialPort
-import jssc.SerialPortEventListener
-import jssc.SerialPortEvent
-import scala.util.Success
-import com.github.jodersky.ace.protocol.SecureSerial
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-object Arduino {
-
- private def open(port: String, rate: Int) = {
- val serialPort = new SerialPort(port);
- serialPort.openPort();
- serialPort.setParams(rate,
- SerialPort.DATABITS_8,
- SerialPort.STOPBITS_1,
- SerialPort.PARITY_NONE); //Set params. Also you can set params by this string: serialPort.setParams(9600, 8, 1, 0);
- // serialPort.writeBytes("This is a test string".getBytes()); //Write data to port
- //serialPort.closePort(); //Close serial port
- serialPort
- }
-
- def connect(port: String) = new SecureSerial(open(port, 115200))
-
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala
new file mode 100644
index 0000000..3f8f84d
--- /dev/null
+++ b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala
@@ -0,0 +1,83 @@
+package com.github.jodersky.ace
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.duration._
+import java.io.IOException
+import scala.collection.mutable.HashMap
+import scala.collection.mutable.Queue
+import scala.util.Success
+
+class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] {
+ import Arq._
+
+ case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]])
+
+ // a map containing yet to be acknowledged messages
+ private val openMessages = HashMap[Int, OpenMessage]()
+
+ // received message sequences
+ private val receivedSequences = Queue[Int]()
+
+ def receive(frameData: Seq[Int]) = {
+ val sequence = frameData(SequenceOffset)
+ val command = frameData(CommandOffset)
+ val message = frameData.drop(MessageOffset)
+
+ command match {
+
+ case Ack => {
+ openMessages.get(sequence) map {
+ case OpenMessage(data, promise) => promise.complete(Success(data))
+ }
+ }
+
+ case Data => {
+ writeToLower(ack(sequence))
+
+ if (!(receivedSequences contains sequence)) {
+ if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue
+ receivedSequences enqueue sequence
+ notifyHigher(message)
+ }
+ }
+
+ }
+ }
+
+ def write(message: Seq[Int]) = {
+ val promise = Promise[Seq[Int]]
+ val sequence = nextSequence()
+
+ val frameData = Seq(sequence, Data) ++ message
+
+ def send(n: Int): Future[Seq[Int]] =
+ writeToLower(frameData) map { frameData =>
+ Await.result(promise.future, timeout.milliseconds)
+ } recoverWith {
+ case t: TimeoutException if (n < maxResends) => send(n + 1)
+ }
+
+ if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests"))
+ else {
+ openMessages += (sequence -> OpenMessage(message, promise))
+ send(0) andThen { case successOrFailure => (openMessages -= sequence) }
+ }
+ }
+
+}
+
+object Arq {
+ final val MaxSequence = 255
+ final val Data = 0x05
+ final val Ack = 0x06
+
+ final val SequenceOffset = 0
+ final val CommandOffset = 1
+ final val MessageOffset = 2
+
+ private[this] var seq = 0
+ private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq }
+
+ def ack(seq: Int) = Seq(seq, Ack)
+} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala
new file mode 100644
index 0000000..52496e4
--- /dev/null
+++ b/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala
@@ -0,0 +1,83 @@
+package com.github.jodersky.ace
+
+import scala.collection.mutable.ArrayBuffer
+import scala.concurrent.Future
+import scala.concurrent.ExecutionContext.Implicits.global
+
+/** A framer takes bytes (as unsigned integers) and creates frames.
+ * Note that the input type of this reactive layer is also a sequence of
+ * integers for performance reasons (i.e. a future will not be created for every byte sent).
+ */
+class Framer extends ReactiveLayer[Seq[Int], Seq[Int]] {
+ import Framer._
+
+ private var state: State = Waiting
+ private val buffer = new ArrayBuffer[Int]
+
+ def receive(bytes: Seq[Int]) = bytes foreach receive
+
+ def receive(byte: Int): Unit = {
+
+ state match {
+ case Escaping => {
+ buffer += byte
+ state = Receiving
+ }
+ case Waiting => if (byte == Start) {
+ buffer.clear()
+ state = Receiving
+ }
+ case Receiving => byte match {
+ case Escape => state = Escaping
+ case Start => buffer.clear()
+ case Stop => {
+ state = Waiting
+ if (checksum(buffer.init) == buffer.last)
+ notifyHigher(buffer.init)
+ }
+ case datum => buffer += datum
+ }
+ }
+ }
+
+ def write(data: Seq[Int]): Future[Seq[Int]] = {
+ val buffer = new ArrayBuffer[Int]
+
+ buffer += Start
+ data foreach { byte =>
+ byte match {
+ case Start | Stop | Escape => {
+ buffer += Escape
+ buffer += byte
+ }
+ case _ => buffer += byte
+ }
+ }
+ val c = checksum(data)
+ c match {
+ case Start | Stop | Escape => {
+ buffer += Escape
+ buffer += c
+ }
+ case _ => buffer += c
+ }
+ buffer += Stop
+ writeToLower(buffer) map (_ => data)
+ }
+}
+
+object Framer {
+ sealed trait State
+ case object Waiting extends State
+ case object Receiving extends State
+ case object Escaping extends State
+
+ final val Escape = 0x02
+ final val Start = 0x03
+ final val Stop = 0x10
+
+ def checksum(unsignedData: Seq[Int]) = {
+ unsignedData.fold(0)(_ ^ _)
+ }
+
+} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala
deleted file mode 100644
index 17351e2..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/Main.scala
+++ /dev/null
@@ -1,18 +0,0 @@
-package com.github.jodersky.ace
-
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits.global
-
-object Main {
-
- def main(args: Array[String]): Unit = {
- val s = SafeSerial.open("/dev/ttyACM0", 115200).get
- var cmd: String = ""
- while (cmd != ":quit"){
- cmd = Console.readLine
- s.send(cmd).onComplete(v => println("sent: " + v))
- }
- s.close()
- }
-
-}
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala
index 792947a..663a4a7 100644
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala
+++ b/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.ace.protocol
+package com.github.jodersky.ace
import scala.concurrent.Future
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala
deleted file mode 100644
index d75d6f5..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/SafeSerial.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-package com.github.jodersky.ace
-
-import com.github.jodersky.ace.protocol._
-import jssc.SerialPort
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.Try
-
-class SafeSerial(port: SerialPort) { self =>
- val physical = new PhysicalLayer(port)
- val link = new LinkLayer
- val transport = new TransportLayer
-
- val application = new ReactiveLayer[Message, String] {
- def receive(message: Message) = Console.println(message.data.map(_.toChar).mkString(""))
- def write(s: String) = writeToLower(Message(s.map(_.toChar.toInt))).map(x => s)
- }
-
- def send(s: String) = application.write(s)
- def close() = Try(port.closePort())
-
- physical connect link connect transport connect application
- physical.begin()
-}
-
-object SafeSerial {
- def open(port: String, rate: Int) = Try {
- val serialPort = new SerialPort(port);
- serialPort.openPort()
- serialPort.setParams(rate,
- SerialPort.DATABITS_8,
- SerialPort.STOPBITS_1,
- SerialPort.PARITY_NONE)
- serialPort
- } map (port => new SafeSerial(port))
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala
deleted file mode 100644
index 5f8d228..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.ExecutionContext.Implicits.global
-
-class LinkLayer extends ReactiveLayer[Array[Byte], Packet] {
- import LinkLayer._
-
- private var state: State = Waiting
- private val buffer = new ArrayBuffer[Int]
-
- def receive(data: Array[Byte]) = for (d <- data) receive(d)
-
- def receive(data: Byte): Unit = {
- val unsigned = data & 0xff
-
- state match {
- case Escaping => {
- buffer += unsigned
- state = Receiving
- }
- case Waiting => if (unsigned == Start) {
- buffer.clear()
- state = Receiving
- }
- case Receiving => unsigned match {
- case Escape => state = Escaping
- case Start => buffer.clear()
- case End => {
- state = Waiting
- if (checksum(buffer.init.toArray) == buffer.last)
- notifyHigher(Packet(buffer.init.toArray))
- }
- case datum => buffer += datum
- }
- }
- }
-
- def write(packet: Packet): Future[Packet] = {
- val buffer = new ArrayBuffer[Int]
- buffer += Start
- packet.data foreach { unsigned =>
- unsigned match {
- case Start | End | Escape => {
- buffer += Escape
- buffer += unsigned
- }
- case _ => buffer += unsigned
- }
- }
- buffer += checksum(packet.data)
- buffer += End
- writeToLower(buffer.map(_.toByte).toArray).map(_ => packet)
- }
-}
-
-object LinkLayer {
- sealed trait State
- case object Waiting extends State
- case object Receiving extends State
- case object Escaping extends State
-
- final val Escape = 0x02
- final val Start = 0x03
- final val End = 0x10
-
- def checksum(unsignedData: Seq[Int]) = {
- unsignedData.fold(0)(_ ^ _)
- }
-
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala
deleted file mode 100644
index 32edfea..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-case class Message(data: Seq[Int]) \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala
deleted file mode 100644
index 583022f..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-case class Packet(data: Seq[Int]) \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala
deleted file mode 100644
index 39c2d25..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits.global
-import jssc.SerialPort
-import java.io.IOException
-import jssc.SerialPortEvent
-import jssc.SerialPortEventListener
-
-class PhysicalLayer(serial: SerialPort) extends ReactiveLayer[Nothing, Array[Byte]] {
-
- def receive(nothing: Nothing) = throw new UnsupportedOperationException("A receive function cannot be called on the lowest layer.")
-
- private val listener = new SerialPortEventListener {
- override def serialEvent(event: SerialPortEvent) = {
- if (event.isRXCHAR()) {
- val bytes = serial.readBytes
- if (bytes != null) notifyHigher(bytes)
- }
- }
- }
-
-
- def write(data: Array[Byte]) = future {
- serial.writeBytes(data)
- } map { success =>
- if (success) data
- else throw new IOException("Could not write to serial port.")
- }
-
- def begin() = {
- val mask = SerialPort.MASK_RXCHAR + SerialPort.MASK_CTS + SerialPort.MASK_DSR
- serial.setEventsMask(mask)
- serial.addEventListener(listener)
- }
-
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala
deleted file mode 100644
index e229b82..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import jssc.SerialPort
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.Try
-
-class SecureSerial(port: SerialPort) { self =>
- val physical = new PhysicalLayer(port)
- val link = new LinkLayer
- val transport = new TransportLayer
-
- val application = new ReactiveLayer[Message, String] {
- def receive(message: Message) = Console.println(message.data.mkString(""))
- def write(s: String) = writeToLower(Message(s.map(_.toInt))).map(x => s)
- }
-
- def send(s: String) = application.write(s)
- def close() = Try(port.closePort())
-
- physical connect link connect transport connect application
- physical.begin()
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala
deleted file mode 100644
index 2055770..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import java.io.IOException
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import com.github.jodersky.ace.protocol.{Packet => PPacket}
-import scala.util.Success
-
-class TransportLayer extends ReactiveLayer[Packet, Message] {
-
-
- private val openRequests = HashMap[Int, (Message, Promise[Message])]()
- private val receivedSeqs = Queue[Int]()
-
- class Packet(val data: Seq[Int]) {
-
- def seq = data(0)
- def cmd = data(1)
- def message = Message(data.drop(2))
-
- def underlying = PPacket(data)
- }
-
- object Packet {
- final val DATA = 0x05
- final val ACK = 0x06
- private var seq = 0;
- private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq}
-
- def fromPacket(packet: PPacket) = new Packet(packet.data)
-
- def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data)
-
- def ack(seq: Int) = new Packet (Seq(seq, ACK))
-
- }
-
- def receive(ppacket: PPacket) = {
- val in = Packet.fromPacket(ppacket)
-
- in.cmd match {
- case Packet.ACK => {
- openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))}
- }
-
- case Packet.DATA => {
- writeToLower(Packet.ack(in.seq).underlying)
-
- if (!(receivedSeqs contains in.seq)) {
- if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue
- receivedSeqs enqueue in.seq
- notifyHigher(in.message)
- }
- }
-
- }
- }
-
- def write(message: Message) = {
- val promise = Promise[Message]
- val packet = Packet.fromMessage(message)
- val seq = packet.seq
-
- def send(n: Int): Future[Message] =
- writeToLower(packet.underlying) map { packet =>
- Await.result(promise.future, TransportLayer.Timeout milliseconds)
- } recoverWith {
- case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1)
- }
-
- if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests"))
- else {
- openRequests += (seq -> (message, promise))
- send(0) andThen {case r => (openRequests -= seq)}
- }
- }
-
-}
-
-object TransportLayer {
- final val Timeout = 100
- final val MaxResends = 5
- final val MaxSeq = 255
- final val MaxSeqBuffer = 10
-} \ No newline at end of file