aboutsummaryrefslogtreecommitdiff
path: root/scala/ace/src/main/scala/com/github/jodersky/ace/protocol
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-02-26 18:22:53 +0100
committerJakob Odersky <jodersky@gmail.com>2013-02-26 18:25:07 +0100
commit20a2350f9b33a797763413509781d0686fc38fe5 (patch)
tree0ee6f84433d8eaec6f2e3a89bae026c095502b25 /scala/ace/src/main/scala/com/github/jodersky/ace/protocol
parent7923f67f3090cd5a8c7e8281f62b0beca0846bec (diff)
downloadace-20a2350f9b33a797763413509781d0686fc38fe5.tar.gz
ace-20a2350f9b33a797763413509781d0686fc38fe5.tar.bz2
ace-20a2350f9b33a797763413509781d0686fc38fe5.zip
simplify scala implementation
Diffstat (limited to 'scala/ace/src/main/scala/com/github/jodersky/ace/protocol')
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala72
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala3
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala3
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala37
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala38
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala22
-rw-r--r--scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala88
7 files changed, 0 insertions, 263 deletions
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala
deleted file mode 100644
index 5f8d228..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/LinkLayer.scala
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.Future
-import scala.concurrent.ExecutionContext.Implicits.global
-
-class LinkLayer extends ReactiveLayer[Array[Byte], Packet] {
- import LinkLayer._
-
- private var state: State = Waiting
- private val buffer = new ArrayBuffer[Int]
-
- def receive(data: Array[Byte]) = for (d <- data) receive(d)
-
- def receive(data: Byte): Unit = {
- val unsigned = data & 0xff
-
- state match {
- case Escaping => {
- buffer += unsigned
- state = Receiving
- }
- case Waiting => if (unsigned == Start) {
- buffer.clear()
- state = Receiving
- }
- case Receiving => unsigned match {
- case Escape => state = Escaping
- case Start => buffer.clear()
- case End => {
- state = Waiting
- if (checksum(buffer.init.toArray) == buffer.last)
- notifyHigher(Packet(buffer.init.toArray))
- }
- case datum => buffer += datum
- }
- }
- }
-
- def write(packet: Packet): Future[Packet] = {
- val buffer = new ArrayBuffer[Int]
- buffer += Start
- packet.data foreach { unsigned =>
- unsigned match {
- case Start | End | Escape => {
- buffer += Escape
- buffer += unsigned
- }
- case _ => buffer += unsigned
- }
- }
- buffer += checksum(packet.data)
- buffer += End
- writeToLower(buffer.map(_.toByte).toArray).map(_ => packet)
- }
-}
-
-object LinkLayer {
- sealed trait State
- case object Waiting extends State
- case object Receiving extends State
- case object Escaping extends State
-
- final val Escape = 0x02
- final val Start = 0x03
- final val End = 0x10
-
- def checksum(unsignedData: Seq[Int]) = {
- unsignedData.fold(0)(_ ^ _)
- }
-
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala
deleted file mode 100644
index 32edfea..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Message.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-case class Message(data: Seq[Int]) \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala
deleted file mode 100644
index 583022f..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/Packet.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-case class Packet(data: Seq[Int]) \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala
deleted file mode 100644
index 39c2d25..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/PhysicalLayer.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits.global
-import jssc.SerialPort
-import java.io.IOException
-import jssc.SerialPortEvent
-import jssc.SerialPortEventListener
-
-class PhysicalLayer(serial: SerialPort) extends ReactiveLayer[Nothing, Array[Byte]] {
-
- def receive(nothing: Nothing) = throw new UnsupportedOperationException("A receive function cannot be called on the lowest layer.")
-
- private val listener = new SerialPortEventListener {
- override def serialEvent(event: SerialPortEvent) = {
- if (event.isRXCHAR()) {
- val bytes = serial.readBytes
- if (bytes != null) notifyHigher(bytes)
- }
- }
- }
-
-
- def write(data: Array[Byte]) = future {
- serial.writeBytes(data)
- } map { success =>
- if (success) data
- else throw new IOException("Could not write to serial port.")
- }
-
- def begin() = {
- val mask = SerialPort.MASK_RXCHAR + SerialPort.MASK_CTS + SerialPort.MASK_DSR
- serial.setEventsMask(mask)
- serial.addEventListener(listener)
- }
-
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala
deleted file mode 100644
index 792947a..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/ReactiveLayer.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.concurrent.Future
-
-/** Represents a layer in a reactive protocol.
- * @tparam L data type this layer receives from or writes to a lower layer
- * @tparam T data type this layer sends to a higher layer or receives from a higher */
-trait ReactiveLayer[L, T] {
- private var lowerLayer: Option[ReactiveLayer[_, L]] = None
- private var higherLayer: Option[ReactiveLayer[T, _]] = None
-
- /** Notifies a higher layer that data is available. */
- protected def notifyHigher(data: T): Unit = higherLayer match {
- case Some(higher) => higher.receive(data)
- case None => throw new UnsupportedOperationException("Higher layer doesn't exist.")
- }
-
- /** Writes data to a lower layer. */
- protected def writeToLower(l: L): Future[L] = lowerLayer match {
- case Some(lower) => lower.write(l)
- case None => Future.failed(new UnsupportedOperationException("Lower layer doesn't exist."))
- }
-
- /** Connects this layer with a higher layer, effectively linking calls
- * `notifyHigher` to `higher.receive` and `higher.writeToLower` to `write`. */
- def connect[A](higher: ReactiveLayer[T, A]) = {
- this.higherLayer = Some(higher)
- higher.lowerLayer = Some(this)
- higher
- }
-
- /** Called from lower layer. */
- def receive(data: L): Unit
-
- /** Write data to this layer.
- * @return a future value containing the data written, or an error */
- def write(data: T): Future[T]
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala
deleted file mode 100644
index e229b82..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/SecureSerial.scala
+++ /dev/null
@@ -1,22 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import jssc.SerialPort
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.util.Try
-
-class SecureSerial(port: SerialPort) { self =>
- val physical = new PhysicalLayer(port)
- val link = new LinkLayer
- val transport = new TransportLayer
-
- val application = new ReactiveLayer[Message, String] {
- def receive(message: Message) = Console.println(message.data.mkString(""))
- def write(s: String) = writeToLower(Message(s.map(_.toInt))).map(x => s)
- }
-
- def send(s: String) = application.write(s)
- def close() = Try(port.closePort())
-
- physical connect link connect transport connect application
- physical.begin()
-} \ No newline at end of file
diff --git a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala b/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala
deleted file mode 100644
index 2055770..0000000
--- a/scala/ace/src/main/scala/com/github/jodersky/ace/protocol/TransportLayer.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-package com.github.jodersky.ace.protocol
-
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits.global
-import scala.concurrent.duration._
-import java.io.IOException
-import scala.collection.mutable.HashMap
-import scala.collection.mutable.Queue
-import com.github.jodersky.ace.protocol.{Packet => PPacket}
-import scala.util.Success
-
-class TransportLayer extends ReactiveLayer[Packet, Message] {
-
-
- private val openRequests = HashMap[Int, (Message, Promise[Message])]()
- private val receivedSeqs = Queue[Int]()
-
- class Packet(val data: Seq[Int]) {
-
- def seq = data(0)
- def cmd = data(1)
- def message = Message(data.drop(2))
-
- def underlying = PPacket(data)
- }
-
- object Packet {
- final val DATA = 0x05
- final val ACK = 0x06
- private var seq = 0;
- private def nextSeq() = {seq += 1; if (seq > TransportLayer.MaxSeq) seq = 0; seq}
-
- def fromPacket(packet: PPacket) = new Packet(packet.data)
-
- def fromMessage(message: Message) = new Packet(Seq(nextSeq(), DATA) ++ message.data)
-
- def ack(seq: Int) = new Packet (Seq(seq, ACK))
-
- }
-
- def receive(ppacket: PPacket) = {
- val in = Packet.fromPacket(ppacket)
-
- in.cmd match {
- case Packet.ACK => {
- openRequests.get(in.seq).map{case (message, promise) => promise.complete(Success(message))}
- }
-
- case Packet.DATA => {
- writeToLower(Packet.ack(in.seq).underlying)
-
- if (!(receivedSeqs contains in.seq)) {
- if (receivedSeqs.size > TransportLayer.MaxSeqBuffer) receivedSeqs.dequeue
- receivedSeqs enqueue in.seq
- notifyHigher(in.message)
- }
- }
-
- }
- }
-
- def write(message: Message) = {
- val promise = Promise[Message]
- val packet = Packet.fromMessage(message)
- val seq = packet.seq
-
- def send(n: Int): Future[Message] =
- writeToLower(packet.underlying) map { packet =>
- Await.result(promise.future, TransportLayer.Timeout milliseconds)
- } recoverWith {
- case t: TimeoutException if (n < TransportLayer.MaxResends) => send(n + 1)
- }
-
- if (openRequests.size >= TransportLayer.MaxSeqBuffer) Future.failed(new IOException("too many open requests"))
- else {
- openRequests += (seq -> (message, promise))
- send(0) andThen {case r => (openRequests -= seq)}
- }
- }
-
-}
-
-object TransportLayer {
- final val Timeout = 100
- final val MaxResends = 5
- final val MaxSeq = 255
- final val MaxSeqBuffer = 10
-} \ No newline at end of file