From 793983ca31eb05fce655ed5877b6f3faa4577acd Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Wed, 27 Feb 2013 15:53:33 +0100 Subject: restructure scala implementation --- .../main/scala/com/github/jodersky/ace/Arq.scala | 83 --------------------- .../scala/com/github/jodersky/ace/Framer.scala | 83 --------------------- .../com/github/jodersky/ace/ReactiveLayer.scala | 38 ---------- .../com/github/jodersky/ace/protocol/Arq.scala | 83 +++++++++++++++++++++ .../com/github/jodersky/ace/protocol/Framer.scala | 83 +++++++++++++++++++++ .../jodersky/ace/protocol/ReactiveLayer.scala | 38 ++++++++++ .../jodersky/ace/protocol/SimpleActionLayer.scala | 8 ++ .../com/github/jodersky/ace/serial/Serial.scala | 15 ++++ .../jodersky/ace/serial/SerialProvider.scala | 7 ++ scala/jssc/.gitignore | 18 +++++ scala/jssc/build.sbt | 11 +++ scala/jssc/lib/jssc.jar | Bin 0 -> 120277 bytes .../main/scala/com/github/jodersky/ace/Main.scala | 30 ++++++++ .../jodersky/ace/jssc/JSSCSerialProvider.scala | 48 ++++++++++++ .../com/github/jodersky/ace/jssc/package.scala | 7 ++ 15 files changed, 348 insertions(+), 204 deletions(-) delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala delete mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Arq.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Framer.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SimpleActionLayer.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/serial/Serial.scala create mode 100644 scala/ace/src/main/scala/com/github/jodersky/ace/serial/SerialProvider.scala create mode 100644 scala/jssc/.gitignore create mode 100644 scala/jssc/build.sbt create mode 100644 scala/jssc/lib/jssc.jar create mode 100644 scala/jssc/src/main/scala/com/github/jodersky/ace/Main.scala create mode 100644 scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/JSSCSerialProvider.scala create mode 100644 scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/package.scala diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala deleted file mode 100644 index 3f8f84d..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/Arq.scala +++ /dev/null @@ -1,83 +0,0 @@ -package com.github.jodersky.ace - -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits.global -import scala.concurrent.duration._ -import java.io.IOException -import scala.collection.mutable.HashMap -import scala.collection.mutable.Queue -import scala.util.Success - -class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] { - import Arq._ - - case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]]) - - // a map containing yet to be acknowledged messages - private val openMessages = HashMap[Int, OpenMessage]() - - // received message sequences - private val receivedSequences = Queue[Int]() - - def receive(frameData: Seq[Int]) = { - val sequence = frameData(SequenceOffset) - val command = frameData(CommandOffset) - val message = frameData.drop(MessageOffset) - - command match { - - case Ack => { - openMessages.get(sequence) map { - case OpenMessage(data, promise) => promise.complete(Success(data)) - } - } - - case Data => { - writeToLower(ack(sequence)) - - if (!(receivedSequences contains sequence)) { - if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue - receivedSequences enqueue sequence - notifyHigher(message) - } - } - - } - } - - def write(message: Seq[Int]) = { - val promise = Promise[Seq[Int]] - val sequence = nextSequence() - - val frameData = Seq(sequence, Data) ++ message - - def send(n: Int): Future[Seq[Int]] = - writeToLower(frameData) map { frameData => - Await.result(promise.future, timeout.milliseconds) - } recoverWith { - case t: TimeoutException if (n < maxResends) => send(n + 1) - } - - if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests")) - else { - openMessages += (sequence -> OpenMessage(message, promise)) - send(0) andThen { case successOrFailure => (openMessages -= sequence) } - } - } - -} - -object Arq { - final val MaxSequence = 255 - final val Data = 0x05 - final val Ack = 0x06 - - final val SequenceOffset = 0 - final val CommandOffset = 1 - final val MessageOffset = 2 - - private[this] var seq = 0 - private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq } - - def ack(seq: Int) = Seq(seq, Ack) -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala deleted file mode 100644 index 52496e4..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/Framer.scala +++ /dev/null @@ -1,83 +0,0 @@ -package com.github.jodersky.ace - -import scala.collection.mutable.ArrayBuffer -import scala.concurrent.Future -import scala.concurrent.ExecutionContext.Implicits.global - -/** A framer takes bytes (as unsigned integers) and creates frames. - * Note that the input type of this reactive layer is also a sequence of - * integers for performance reasons (i.e. a future will not be created for every byte sent). - */ -class Framer extends ReactiveLayer[Seq[Int], Seq[Int]] { - import Framer._ - - private var state: State = Waiting - private val buffer = new ArrayBuffer[Int] - - def receive(bytes: Seq[Int]) = bytes foreach receive - - def receive(byte: Int): Unit = { - - state match { - case Escaping => { - buffer += byte - state = Receiving - } - case Waiting => if (byte == Start) { - buffer.clear() - state = Receiving - } - case Receiving => byte match { - case Escape => state = Escaping - case Start => buffer.clear() - case Stop => { - state = Waiting - if (checksum(buffer.init) == buffer.last) - notifyHigher(buffer.init) - } - case datum => buffer += datum - } - } - } - - def write(data: Seq[Int]): Future[Seq[Int]] = { - val buffer = new ArrayBuffer[Int] - - buffer += Start - data foreach { byte => - byte match { - case Start | Stop | Escape => { - buffer += Escape - buffer += byte - } - case _ => buffer += byte - } - } - val c = checksum(data) - c match { - case Start | Stop | Escape => { - buffer += Escape - buffer += c - } - case _ => buffer += c - } - buffer += Stop - writeToLower(buffer) map (_ => data) - } -} - -object Framer { - sealed trait State - case object Waiting extends State - case object Receiving extends State - case object Escaping extends State - - final val Escape = 0x02 - final val Start = 0x03 - final val Stop = 0x10 - - def checksum(unsignedData: Seq[Int]) = { - unsignedData.fold(0)(_ ^ _) - } - -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala deleted file mode 100644 index 663a4a7..0000000 --- a/scala/ace/src/main/scala/com/github/jodersky/ace/ReactiveLayer.scala +++ /dev/null @@ -1,38 +0,0 @@ -package com.github.jodersky.ace - -import scala.concurrent.Future - -/** Represents a layer in a reactive protocol. - * @tparam L data type this layer receives from or writes to a lower layer - * @tparam T data type this layer sends to a higher layer or receives from a higher */ -trait ReactiveLayer[L, T] { - private var lowerLayer: Option[ReactiveLayer[_, L]] = None - private var higherLayer: Option[ReactiveLayer[T, _]] = None - - /** Notifies a higher layer that data is available. */ - protected def notifyHigher(data: T): Unit = higherLayer match { - case Some(higher) => higher.receive(data) - case None => throw new UnsupportedOperationException("Higher layer doesn't exist.") - } - - /** Writes data to a lower layer. */ - protected def writeToLower(l: L): Future[L] = lowerLayer match { - case Some(lower) => lower.write(l) - case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist.")) - } - - /** Connects this layer with a higher layer, effectively linking calls - * `notifyHigher` to `higher.receive` and `higher.writeToLower` to `write`. */ - def connect[A](higher: ReactiveLayer[T, A]) = { - this.higherLayer = Some(higher) - higher.lowerLayer = Some(this) - higher - } - - /** Called from lower layer. */ - def receive(data: L): Unit - - /** Write data to this layer. - * @return a future value containing the data written, or an error */ - def write(data: T): Future[T] -} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Arq.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Arq.scala new file mode 100644 index 0000000..4a4ed8d --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Arq.scala @@ -0,0 +1,83 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import scala.concurrent.duration._ +import java.io.IOException +import scala.collection.mutable.HashMap +import scala.collection.mutable.Queue +import scala.util.Success + +class Arq(timeout: Int, maxResends: Int = 5, maxMessageBuffer: Int = 10) extends ReactiveLayer[Seq[Int], Seq[Int]] { + import Arq._ + + case class OpenMessage(data: Seq[Int], promise: Promise[Seq[Int]]) + + // a map containing yet to be acknowledged messages + private val openMessages = HashMap[Int, OpenMessage]() + + // received message sequences + private val receivedSequences = Queue[Int]() + + protected def receive(frameData: Seq[Int]) = { + val sequence = frameData(SequenceOffset) + val command = frameData(CommandOffset) + val message = frameData.drop(MessageOffset) + + command match { + + case Ack => { + openMessages.get(sequence) map { + case OpenMessage(data, promise) => promise.complete(Success(data)) + } + } + + case Data => { + sendToLower(ack(sequence)) + + if (!(receivedSequences contains sequence)) { + if (receivedSequences.size > maxMessageBuffer) receivedSequences.dequeue + receivedSequences enqueue sequence + notifyHigher(message) + } + } + + } + } + + def send(message: Seq[Int]) = { + val promise = Promise[Seq[Int]] + val sequence = nextSequence() + + val frameData = Seq(sequence, Data) ++ message + + def send(n: Int): Future[Seq[Int]] = + sendToLower(frameData) map { frameData => + Await.result(promise.future, timeout.milliseconds) + } recoverWith { + case t: TimeoutException if (n < maxResends) => send(n + 1) + } + + if (openMessages.size >= maxMessageBuffer) Future.failed(new IOException("too many open requests")) + else { + openMessages += (sequence -> OpenMessage(message, promise)) + send(0) andThen { case successOrFailure => (openMessages -= sequence) } + } + } + +} + +object Arq { + final val MaxSequence = 255 + final val Data = 0x05 + final val Ack = 0x06 + + final val SequenceOffset = 0 + final val CommandOffset = 1 + final val MessageOffset = 2 + + private[this] var seq = 0 + private def nextSequence() = { seq += 1; if (seq > MaxSequence) seq = 0; seq } + + def ack(seq: Int) = Seq(seq, Ack) +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Framer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Framer.scala new file mode 100644 index 0000000..5558db3 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Framer.scala @@ -0,0 +1,83 @@ +package com.github.jodersky.ace.protocol + +import scala.collection.mutable.ArrayBuffer +import scala.concurrent.Future +import scala.concurrent.ExecutionContext.Implicits.global + +/** A framer takes bytes (as unsigned integers) and creates frames. + * Note that the input type of this reactive layer is also a sequence of + * integers for performance reasons (i.e. a future will not be created for every byte sent). + */ +class Framer extends ReactiveLayer[Seq[Int], Seq[Int]] { + import Framer._ + + private var state: State = Waiting + private val buffer = new ArrayBuffer[Int] + + protected def receive(bytes: Seq[Int]) = bytes foreach receive + + protected def receive(byte: Int): Unit = { + + state match { + case Escaping => { + buffer += byte + state = Receiving + } + case Waiting => if (byte == Start) { + buffer.clear() + state = Receiving + } + case Receiving => byte match { + case Escape => state = Escaping + case Start => buffer.clear() + case Stop => { + state = Waiting + if (checksum(buffer.init) == buffer.last) + notifyHigher(buffer.init) + } + case datum => buffer += datum + } + } + } + + def send(data: Seq[Int]): Future[Seq[Int]] = { + val buffer = new ArrayBuffer[Int] + + buffer += Start + data foreach { byte => + byte match { + case Start | Stop | Escape => { + buffer += Escape + buffer += byte + } + case _ => buffer += byte + } + } + val c = checksum(data) + c match { + case Start | Stop | Escape => { + buffer += Escape + buffer += c + } + case _ => buffer += c + } + buffer += Stop + sendToLower(buffer) map (_ => data) + } +} + +object Framer { + sealed trait State + case object Waiting extends State + case object Receiving extends State + case object Escaping extends State + + final val Escape = 0x10 + final val Start = 0x02 + final val Stop = 0x03 + + def checksum(unsignedData: Seq[Int]) = { + unsignedData.fold(0)(_ ^ _) + } + +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala new file mode 100644 index 0000000..c9531c0 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala @@ -0,0 +1,38 @@ +package com.github.jodersky.ace.protocol + +import scala.concurrent.Future + +/** Represents a layer in a reactive protocol. + * @tparam L data type this layer receives from or writes to a lower layer + * @tparam T data type this layer sends to a higher layer or receives from a higher */ +trait ReactiveLayer[L, T] { + private var lowerLayer: Option[ReactiveLayer[_, L]] = None + private var higherLayer: Option[ReactiveLayer[T, _]] = None + + /** Notifies a higher layer that data is available. */ + protected def notifyHigher(data: T): Unit = higherLayer match { + case Some(higher) => higher.receive(data) + case None => throw new UnsupportedOperationException("Higher layer doesn't exist.") + } + + /** Sends data to a lower layer. */ + protected def sendToLower(l: L): Future[L] = lowerLayer match { + case Some(lower) => lower.send(l) + case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist.")) + } + + /** Connects this layer with a higher layer, effectively linking calls + * `notifyHigher` to `higher.receive` and `higher.sendToLower` to `send`. */ + def connect[A](higher: ReactiveLayer[T, A]) = { + this.higherLayer = Some(higher) + higher.lowerLayer = Some(this) + higher + } + + /** Called from lower layer. */ + protected def receive(data: L): Unit + + /** Send data to this layer. + * @return a future value containing the data sent, or an error */ + def send(data: T): Future[T] +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SimpleActionLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SimpleActionLayer.scala new file mode 100644 index 0000000..81e39cb --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SimpleActionLayer.scala @@ -0,0 +1,8 @@ +package com.github.jodersky.ace.protocol + +class SimpleActionLayer[A](action: A => Unit) extends ReactiveLayer[A, A] { + protected def receive(data: A) = action(data) + + def send(data: A) = sendToLower(data) + +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/serial/Serial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/serial/Serial.scala new file mode 100644 index 0000000..b688ec8 --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/serial/Serial.scala @@ -0,0 +1,15 @@ +package com.github.jodersky.ace.serial + +import com.github.jodersky.ace.protocol.ReactiveLayer + +trait Serial extends ReactiveLayer[Nothing, Seq[Int]] { + protected def receive(nothing: Nothing) = throw new UnsupportedOperationException("A receive function cannot be called on the lowest layer.") + + def begin(): Unit + def close(): Unit + +} + +object Serial { + def open(port: String, baudRate: Int)(implicit provider: SerialProvider) = provider.open(port, baudRate) +} \ No newline at end of file diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/serial/SerialProvider.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/serial/SerialProvider.scala new file mode 100644 index 0000000..9c7b95c --- /dev/null +++ b/scala/ace/src/main/scala/com/github/jodersky/ace/serial/SerialProvider.scala @@ -0,0 +1,7 @@ +package com.github.jodersky.ace.serial + +trait SerialProvider { + + def open(port: String, baudRate: Int): Serial + +} \ No newline at end of file diff --git a/scala/jssc/.gitignore b/scala/jssc/.gitignore new file mode 100644 index 0000000..94730a8 --- /dev/null +++ b/scala/jssc/.gitignore @@ -0,0 +1,18 @@ +*.class +*.log + +# sbt specific +dist/* +target/ +.target/ +lib_managed/ +src_managed/ +project/boot/ +project/plugins/project/ + +# Scala-IDE specific +.scala_dependencies +.project +.classpath +.cache +.settings/ diff --git a/scala/jssc/build.sbt b/scala/jssc/build.sbt new file mode 100644 index 0000000..8757f1c --- /dev/null +++ b/scala/jssc/build.sbt @@ -0,0 +1,11 @@ +name := "ace-jssc" + +organization := "com.github.jodersky" + +version := "1.0-SNAPSHOT" + +scalaVersion := "2.10.0" + +scalacOptions ++= Seq("-deprecation","-feature") + +libraryDependencies += "com.github.jodersky" %% "ace" % "1.0-SNAPSHOT" diff --git a/scala/jssc/lib/jssc.jar b/scala/jssc/lib/jssc.jar new file mode 100644 index 0000000..b0d6a82 Binary files /dev/null and b/scala/jssc/lib/jssc.jar differ diff --git a/scala/jssc/src/main/scala/com/github/jodersky/ace/Main.scala b/scala/jssc/src/main/scala/com/github/jodersky/ace/Main.scala new file mode 100644 index 0000000..55c23b9 --- /dev/null +++ b/scala/jssc/src/main/scala/com/github/jodersky/ace/Main.scala @@ -0,0 +1,30 @@ +package com.github.jodersky.ace + +import com.github.jodersky.ace.protocol._ + +import scala.concurrent.ExecutionContext.Implicits.global + +object Main { + + def main(args: Array[String]): Unit = { + import com.github.jodersky.ace.jssc._ + + val s = serial.Serial.open("/dev/ttyACM0", 9600) + val framer = new Framer + val arq = new Arq(200) + val app = new SimpleActionLayer((s: Seq[Int]) => println(s)) + + s connect framer connect arq connect app + s.begin() + + while (true) { + app.send(Console.readLine.getBytes().map(_.toInt)).map(sent => Console.println("> " + sent)) + + } + + + () + } + + +} \ No newline at end of file diff --git a/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/JSSCSerialProvider.scala b/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/JSSCSerialProvider.scala new file mode 100644 index 0000000..3bfd650 --- /dev/null +++ b/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/JSSCSerialProvider.scala @@ -0,0 +1,48 @@ +package com.github.jodersky.ace.jssc + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits.global +import com.github.jodersky.ace.serial._ +import com.github.jodersky.ace.serial.Serial +import jssc.SerialPortEvent +import jssc.SerialPort +import jssc.SerialPortEventListener +import java.io.IOException + +object JSSCSerialProvider extends SerialProvider { + + def open(port: String, baudRate: Int) = new Serial { + val serialPort = new SerialPort(port); + serialPort.openPort() + serialPort.setParams( + baudRate, + SerialPort.DATABITS_8, + SerialPort.STOPBITS_1, + SerialPort.PARITY_NONE) + + val listener = new SerialPortEventListener { + override def serialEvent(event: SerialPortEvent) = { + if (event.isRXCHAR()) { + val bytes = serialPort.readBytes + if (bytes != null) notifyHigher(bytes.map(_ & 0xff)) + } + } + } + + def send(data: Seq[Int]) = future { + serialPort.writeBytes(data.toArray.map(_.toByte)) + } map { success => + if (success) data + else throw new IOException("Could not write to serial port.") + } + + def close() = serialPort.closePort() + + def begin() = { + val mask = SerialPort.MASK_RXCHAR + SerialPort.MASK_CTS + SerialPort.MASK_DSR + serialPort.setEventsMask(mask) + serialPort.addEventListener(listener) + } + } + +} \ No newline at end of file diff --git a/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/package.scala b/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/package.scala new file mode 100644 index 0000000..891e588 --- /dev/null +++ b/scala/jssc/src/main/scala/com/github/jodersky/ace/jssc/package.scala @@ -0,0 +1,7 @@ +package com.github.jodersky.ace + +package object jssc { + + implicit val provider = JSSCSerialProvider + +} \ No newline at end of file -- cgit v1.2.3