From e15a7e1267a6f733d734c5d3b59f3acc28bb4b29 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 25 Feb 2013 19:39:53 +0100 Subject: initial commit --- scala/.gitignore | 18 +++++ scala/build.sbt | 9 +++ scala/lib/jssc.jar | Bin 0 -> 120277 bytes .../scala/com/github/jodersky/ace/Arduino.scala | 28 +++++++ .../main/scala/com/github/jodersky/ace/Main.scala | 18 +++++ .../scala/com/github/jodersky/ace/SafeSerial.scala | 35 ++++++++ .../github/jodersky/ace/protocol/LinkLayer.scala | 72 +++++++++++++++++ .../com/github/jodersky/ace/protocol/Message.scala | 3 + .../com/github/jodersky/ace/protocol/Packet.scala | 3 + .../jodersky/ace/protocol/PhysicalLayer.scala | 37 +++++++++ .../jodersky/ace/protocol/ReactiveLayer.scala | 39 +++++++++ .../jodersky/ace/protocol/SecureSerial.scala | 22 ++++++ .../jodersky/ace/protocol/TransportLayer.scala | 88 +++++++++++++++++++++ 13 files changed, 372 insertions(+) create mode 100644 scala/.gitignore create mode 100644 scala/build.sbt create mode 100644 scala/lib/jssc.jar create mode 100644 scala/src/main/scala/com/github/jodersky/ace/Arduino.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/Main.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/SafeSerial.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/Message.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala create mode 100644 scala/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala (limited to 'scala') diff --git a/scala/.gitignore b/scala/.gitignore new file mode 100644 index 0000000..94730a8 --- /dev/null +++ b/scala/.gitignore @@ -0,0 +1,18 @@ +*.class +*.log + +# sbt specific +dist/* +target/ +.target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.project +.classpath +.cache +.settings/ diff --git a/scala/build.sbt b/scala/build.sbt new file mode 100644 index 0000000..d61bc17 --- /dev/null +++ b/scala/build.sbt @@ -0,0 +1,9 @@ +name := "ace" + +organization := "com.github.jodersky" + +version := "1.0-SNAPSHOT" + +scalaVersion := "2.10.0" + +scalacOptions ++= Seq("-deprecation","-feature") diff --git a/scala/lib/jssc.jar b/scala/lib/jssc.jar new file mode 100644 index 0000000..b0d6a82 Binary files /dev/null and b/scala/lib/jssc.jar differ diff --git a/scala/src/main/scala/com/github/jodersky/ace/Arduino.scala b/scala/src/main/scala/com/github/jodersky/ace/Arduino.scala new file mode 100644 index 0000000..7077570 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/Arduino.scala @@ -0,0 +1,28 @@ +package com.github.jodersky.ace + +import scala.util.Try +import jssc.SerialPort +import jssc.SerialPortEventListener +import jssc.SerialPortEvent +import scala.util.Success +import com.github.jodersky.ace.protocol.SecureSerial +import scala.concurrent.Await +import scala.concurrent.duration._ + +object Arduino { + + private def open(port: String, rate: Int) = { + val serialPort = new SerialPort(port); + serialPort.openPort(); + serialPort.setParams(rate, + SerialPort.DATABITS_8, + SerialPort.STOPBITS_1, + SerialPort.PARITY_NONE); //Set params. Also you can set params by this string: serialPort.setParams(9600, 8, 1, 0); + // serialPort.writeBytes("This is a test string".getBytes()); //Write data to port + //serialPort.closePort(); //Close serial port + serialPort + } + + def connect(port: String) = new SecureSerial(open(port, 115200)) + +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/Main.scala b/scala/src/main/scala/com/github/jodersky/ace/Main.scala new file mode 100644 index 0000000..17351e2 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/Main.scala @@ -0,0 +1,18 @@ +package com.github.jodersky.ace + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global + +object Main { + + def main(args: Array[String]): Unit = { + val s = SafeSerial.open("/dev/ttyACM0", 115200).get + var cmd: String = "" + while (cmd != ":quit"){ + cmd = Console.readLine + s.send(cmd).onComplete(v => println("sent: " + v)) + } + s.close() + } + +} diff --git a/scala/src/main/scala/com/github/jodersky/ace/SafeSerial.scala b/scala/src/main/scala/com/github/jodersky/ace/SafeSerial.scala new file mode 100644 index 0000000..d75d6f5 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/SafeSerial.scala @@ -0,0 +1,35 @@ +package com.github.jodersky.ace + +import com.github.jodersky.ace.protocol._ +import jssc.SerialPort +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.Try + +class SafeSerial(port: SerialPort) { self => + val physical = new PhysicalLayer(port) + val link = new LinkLayer + val transport = new TransportLayer + + val application = new ReactiveLayer[Message, String] { + def receive(message: Message) = Console.println(message.data.map(_.toChar).mkString("")) + def write(s: String) = writeToLower(Message(s.map(_.toChar.toInt))).map(x => s) + } + + def send(s: String) = application.write(s) + def close() = Try(port.closePort()) + + physical connect link connect transport connect application + physical.begin() +} + +object SafeSerial { + def open(port: String, rate: Int) = Try { + val serialPort = new SerialPort(port); + serialPort.openPort() + serialPort.setParams(rate, + SerialPort.DATABITS_8, + SerialPort.STOPBITS_1, + SerialPort.PARITY_NONE) + serialPort + } map (port => new SafeSerial(port)) +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala new file mode 100644 index 0000000..5f8d228 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala @@ -0,0 +1,72 @@ +package com.github.jodersky.ace.protocol + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +class LinkLayer extends ReactiveLayer[Array[Byte], Packet] { + import LinkLayer._ + + private var state: State = Waiting + private val buffer = new ArrayBuffer[Int] + + def receive(data: Array[Byte]) = for (d <- data) receive(d) + + def receive(data: Byte): Unit = { + val unsigned = data & 0xff + + state match { + case Escaping => { + buffer += unsigned + state = Receiving + } + case Waiting => if (unsigned == Start) { + buffer.clear() + state = Receiving + } + case Receiving => unsigned match { + case Escape => state = Escaping + case Start => buffer.clear() + case End => { + state = Waiting + if (checksum(buffer.init.toArray) == buffer.last) + notifyHigher(Packet(buffer.init.toArray)) + } + case datum => buffer += datum + } + } + } + + def write(packet: Packet): Future[Packet] = { + val buffer = new ArrayBuffer[Int] + buffer += Start + packet.data foreach { unsigned => + unsigned match { + case Start | End | Escape => { + buffer += Escape + buffer += unsigned + } + case _ => buffer += unsigned + } + } + buffer += checksum(packet.data) + buffer += End + writeToLower(buffer.map(_.toByte).toArray).map(_ => packet) + } +} + +object LinkLayer { + sealed trait State + case object Waiting extends State + case object Receiving extends State + case object Escaping extends State + + final val Escape = 0x02 + final val Start = 0x03 + final val End = 0x10 + + def checksum(unsignedData: Seq[Int]) = { + unsignedData.fold(0)(_ ^ _) + } + +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/Message.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/Message.scala new file mode 100644 index 0000000..32edfea --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/Message.scala @@ -0,0 +1,3 @@ +package com.github.jodersky.ace.protocol + +case class Message(data: Seq[Int]) \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala new file mode 100644 index 0000000..583022f --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala @@ -0,0 +1,3 @@ +package com.github.jodersky.ace.protocol + +case class Packet(data: Seq[Int]) \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala new file mode 100644 index 0000000..39c2d25 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala @@ -0,0 +1,37 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import jssc.SerialPort +import java.io.IOException +import jssc.SerialPortEvent +import jssc.SerialPortEventListener + +class PhysicalLayer(serial: SerialPort) extends ReactiveLayer[Nothing, Array[Byte]] { + + def receive(nothing: Nothing) = throw new UnsupportedOperationException("A receive function cannot be called on the lowest layer.") + + private val listener = new SerialPortEventListener { + override def serialEvent(event: SerialPortEvent) = { + if (event.isRXCHAR()) { + val bytes = serial.readBytes + if (bytes != null) notifyHigher(bytes) + } + } + } + + + def write(data: Array[Byte]) = future { + serial.writeBytes(data) + } map { success => + if (success) data + else throw new IOException("Could not write to serial port.") + } + + def begin() = { + val mask = SerialPort.MASK_RXCHAR + SerialPort.MASK_CTS + SerialPort.MASK_DSR + serial.setEventsMask(mask) + serial.addEventListener(listener) + } + +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala new file mode 100644 index 0000000..422ced3 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala @@ -0,0 +1,39 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent.Future + +/** Represents a layer in a reactive protocol. + * + * @tparam L data type this layer receives from or writes to a lower layer + * @tparam T data type this layer sends to a higher layer or receives from a higher */ +trait ReactiveLayer[L, T] { + private var lowerLayer: Option[ReactiveLayer[_, L]] = None + private var higherLayer: Option[ReactiveLayer[T, _]] = None + + /** Notifies a higher layer that data is available. */ + protected def notifyHigher(data: T): Unit = higherLayer match { + case Some(higher) => higher.receive(data) + case None => throw new UnsupportedOperationException("Higher layer doesn't exist.") + } + + /** Writes data to a lower layer. */ + protected def writeToLower(l: L): Future[L] = lowerLayer match { + case Some(lower) => lower.write(l) + case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist.")) + } + + /** Connects this layer with a higher layer, effectively linking calls + * `notifyHigher` to `higher.receive` and `higher.writeToLower` to `write`. */ + def connect[A](higher: ReactiveLayer[T, A]) = { + this.higherLayer = Some(higher) + higher.lowerLayer = Some(this) + higher + } + + /** Called from lower layer. */ + def receive(data: L): Unit + + /** Write data to this layer. + * @return a future value containing the data written, or an error */ + def write(data: T): Future[T] +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala new file mode 100644 index 0000000..e229b82 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala @@ -0,0 +1,22 @@ +package com.github.jodersky.ace.protocol + +import jssc.SerialPort +import scala.concurrent.ExecutionContext.Implicits.global +import scala.util.Try + +class SecureSerial(port: SerialPort) { self => + val physical = new PhysicalLayer(port) + val link = new LinkLayer + val transport = new TransportLayer + + val application = new ReactiveLayer[Message, String] { + def receive(message: Message) = Console.println(message.data.mkString("")) + def write(s: String) = writeToLower(Message(s.map(_.toInt))).map(x => s) + } + + def send(s: String) = application.write(s) + def close() = Try(port.closePort()) + + physical connect link connect transport connect application + physical.begin() +} \ No newline at end of file diff --git a/scala/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala new file mode 100644 index 0000000..2055770 --- /dev/null +++ b/scala/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala @@ -0,0 +1,88 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import java.io.IOException +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import com.github.jodersky.ace.protocol.{Packet => PPacket} +import scala.util.Success + +class TransportLayer extends ReactiveLayer[Packet, Message] { + + + private val openRequests = HashMap[Int, (Message, Promise[Message])]() + private val receivedSeqs = Queue[Int]() + + class Packet(val data: Seq[Int]) { + + def seq = data(0) + def cmd = data(1) + def message = Message(data.drop(2)) + + def underlying = PPacket(data) + } + + object Packet { + final val DATA = 0x05 + final val ACK = 0x06 + private var seq = 0; + private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq} + + def fromPacket(packet: PPacket) = new Packet(packet.data) + + def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data) + + def ack(seq: Int) = new Packet (Seq(seq, ACK)) + + } + + def receive(ppacket: PPacket) = { + val in = Packet.fromPacket(ppacket) + + in.cmd match { + case Packet.ACK => { + openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))} + } + + case Packet.DATA => { + writeToLower(Packet.ack(in.seq).underlying) + + if (!(receivedSeqs contains in.seq)) { + if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue + receivedSeqs enqueue in.seq + notifyHigher(in.message) + } + } + + } + } + + def write(message: Message) = { + val promise = Promise[Message] + val packet = Packet.fromMessage(message) + val seq = packet.seq + + def send(n: Int): Future[Message] = + writeToLower(packet.underlying) map { packet => + Await.result(promise.future, TransportLayer.Timeout milliseconds) + } recoverWith { + case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1) + } + + if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests")) + else { + openRequests += (seq -> (message, promise)) + send(0) andThen {case r => (openRequests -= seq)} + } + } + +} + +object TransportLayer { + final val Timeout = 100 + final val MaxResends = 5 + final val MaxSeq = 255 + final val MaxSeqBuffer = 10 +} \ No newline at end of file -- cgit v1.2.3