aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-06-26 22:32:48 +0200
committerJakob Odersky <jodersky@gmail.com>2013-06-26 22:32:48 +0200
commitf007a17cca8213193f5a6b51e3bcb61653230ff4 (patch)
tree33de7aa024f46752cea9ef31d758ea19dac2bd5d
parent3c31b1e34380218e064fbc5b6a591a8151fa3dc1 (diff)
downloadakka-serial-f007a17cca8213193f5a6b51e3bcb61653230ff4.tar.gz
akka-serial-f007a17cca8213193f5a6b51e3bcb61653230ff4.tar.bz2
akka-serial-f007a17cca8213193f5a6b51e3bcb61653230ff4.zip
move low -> internal and add documentation
-rw-r--r--src/main/java/com/github/jodersky/flow/internal/NativeSerial.java (renamed from src/main/java/com/github/jodersky/flow/low/NativeSerial.java)4
-rw-r--r--src/main/scala/com/github/jodersky/flow/Framing.scala104
-rw-r--r--src/main/scala/com/github/jodersky/flow/SerialOperator.scala9
-rw-r--r--src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala (renamed from src/main/scala/com/github/jodersky/flow/low/Serial.scala)32
-rw-r--r--src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala (renamed from src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala)3
-rw-r--r--src/main/scala/com/github/jodersky/flow/test.sc52
6 files changed, 29 insertions, 175 deletions
diff --git a/src/main/java/com/github/jodersky/flow/low/NativeSerial.java b/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
index dcbd2f0..dba3f44 100644
--- a/src/main/java/com/github/jodersky/flow/low/NativeSerial.java
+++ b/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
@@ -1,4 +1,6 @@
-package com.github.jodersky.flow.low;
+package com.github.jodersky.flow.internal;
+
+import com.github.jodersky.flow.internal.NativeLoader;
/** Thin layer on top of native code. */
class NativeSerial {
diff --git a/src/main/scala/com/github/jodersky/flow/Framing.scala b/src/main/scala/com/github/jodersky/flow/Framing.scala
deleted file mode 100644
index f8173a7..0000000
--- a/src/main/scala/com/github/jodersky/flow/Framing.scala
+++ /dev/null
@@ -1,104 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.io.PipelineContext
-import akka.io.SymmetricPipePair
-import akka.io.SymmetricPipelineStage
-import akka.util.ByteString
-import java.nio.ByteOrder
-import scala.annotation.tailrec
-import java.nio.ByteBuffer
-
-class DelimitedFrame(
- StartByte: Byte,
- StopByte: Byte,
- EscapeByte: Byte)
- //byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN)
- extends SymmetricPipelineStage[PipelineContext, ByteString, ByteString] {
-
- // range checks omitted ...
-
- override def apply(ctx: PipelineContext) =
- new SymmetricPipePair[ByteString, ByteString] {
- var buffer = ByteString.empty
- //implicit val byteOrder = DelimitedFrame.this.byteOrder
-
- sealed trait State
- case object Waiting extends State
- case object Accepting extends State
- case object Escaping extends State
-
- def extractFrame(bs: ByteString, accepted: ByteString, state: State): (ByteString, Option[ByteString]) = { //(remaining, frame))
- if (bs.isEmpty && state == Waiting) (ByteString.empty, None)
- else if (bs.isEmpty) (accepted, None)
- else {
- val in = bs.head
-
- state match {
- case Waiting if (in == StartByte) => extractFrame(bs.tail, accepted, Accepting)
- case Escaping => extractFrame(bs.tail, accepted ++ ByteString(in), Accepting)
- case Accepting => in match {
- case EscapeByte => extractFrame(bs.tail, accepted, Escaping)
- case StartByte => extractFrame(bs.tail, ByteString.empty, Accepting)
- case StopByte => (bs.tail, Some(accepted))
- case other => extractFrame(bs.tail, accepted ++ ByteString(other), Accepting)
- }
- case _ => extractFrame(bs.tail, accepted, state)
- }
- }
- }
-
- def extractFrames(bs: ByteString, accepted: List[ByteString]): (ByteString, List[ByteString]) = {
- val (remainder, frame) = extractFrame(bs, ByteString.empty, Waiting)
-
- frame match {
- case None => (remainder, accepted)
- case Some(data) => extractFrames(remainder, data :: accepted)
- }
- }
-
- /*
- * This is how commands (writes) are transformed: calculate length
- * including header, write that to a ByteStringBuilder and append the
- * payload data. The result is a single command (i.e. `Right(...)`).
- */
- override def commandPipeline = { bs: ByteString =>
- val bb = ByteString.newBuilder
-
- def escape(b: Byte) = {
- bb += EscapeByte
- bb += b
- }
-
- bb += StartByte
- for (b <- bs) {
- b match {
- case StartByte => escape(b)
- case StopByte => escape(b)
- case EscapeByte => escape(b)
- case _ => bb += b
- }
- }
- bb += StopByte
-
- ctx.singleCommand(bb.result)
- }
-
- /*
- * This is how events (reads) are transformed: append the received
- * ByteString to the buffer (if any) and extract the frames from the
- * result. In the end store the new buffer contents and return the
- * list of events (i.e. `Left(...)`).
- */
- override def eventPipeline = { bs: ByteString =>
- val data = buffer ++ bs
- val (remainder, frames) = extractFrames(data, Nil)
- buffer = remainder
-
- frames match {
- case Nil => Nil
- case one :: Nil ⇒ ctx.singleEvent(one)
- case many ⇒ many reverseMap (Left(_))
- }
- }
- }
-} \ No newline at end of file
diff --git a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index b1e8a0d..8f958b2 100644
--- a/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -1,5 +1,12 @@
package com.github.jodersky.flow
+import com.github.jodersky.flow.internalial.Close;
+import com.github.jodersky.flow.internalial.Closed;
+import com.github.jodersky.flow.internalial.CommandFailed;
+import com.github.jodersky.flow.internalial.Received;
+import com.github.jodersky.flow.internalial.Write;
+import com.github.jodersky.flow.internalial.Wrote;
+
import scala.concurrent.future
import scala.util.Failure
import scala.util.Success
@@ -18,7 +25,7 @@ import akka.util.ByteString
import low.{Serial => LowSerial}
class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor with ActorLogging {
- import context._
+import context._
object Reader extends Thread {
private var continueReading = true
diff --git a/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
index 6e67fcf..f1268c3 100644
--- a/src/main/scala/com/github/jodersky/flow/low/Serial.scala
+++ b/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
@@ -1,24 +1,17 @@
-package com.github.jodersky.flow.low
+package com.github.jodersky.flow.internal
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits._
import java.io.IOException
-import com.github.jodersky.flow.AccessDeniedException
-import com.github.jodersky.flow.PortInUseException
-import com.github.jodersky.flow.PortClosedException
-import com.github.jodersky.flow.IllegalBaudRateException
-import scala.util.Try
+import com.github.jodersky.flow._
import java.util.concurrent.atomic.AtomicBoolean
-import com.github.jodersky.flow.PortInterruptedException
-import com.github.jodersky.flow.NoSuchPortException
-class Serial private (val port: String, private val pointer: Long) {
- import Serial._
+class InternalSerial private (val port: String, private val pointer: Long) {
+ import InternalSerial._
private val reading = new AtomicBoolean(false)
private val writing = new AtomicBoolean(false)
private val closed = new AtomicBoolean(false)
-
+
+ /** Closes the underlying serial connection. Any threads blocking on read or write will return. */
def close(): Unit = synchronized {
if (!closed.get()) {
closed.set(true)
@@ -29,6 +22,8 @@ class Serial private (val port: String, private val pointer: Long) {
}
}
+ /** Read data from underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread */
def read(): Array[Byte] = if (!closed.get) {
reading.set(true)
try {
@@ -45,6 +40,8 @@ class Serial private (val port: String, private val pointer: Long) {
throw new PortClosedException(s"port ${port} is already closed")
}
+ /** Write data to underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread */
def write(data: Array[Byte]): Array[Byte] = if (!closed.get) {
writing.set(true)
try {
@@ -62,9 +59,10 @@ class Serial private (val port: String, private val pointer: Long) {
}
-object Serial {
+object InternalSerial {
import NativeSerial._
+ /** Transform error code to exception if necessary. */
private def except(result: Int, port: String): Int = result match {
case E_IO => throw new IOException(port)
case E_ACCESS_DENIED => throw new AccessDeniedException(port)
@@ -76,12 +74,14 @@ object Serial {
case success => success
}
- def open(port: String, baud: Int): Serial = synchronized {
+ /** Open a new connection to a serial port. */
+ def open(port: String, baud: Int): InternalSerial = synchronized {
val pointer = new Array[Long](1)
except(NativeSerial.open(port, baud, pointer), port)
- new Serial(port, pointer(0))
+ new InternalSerial(port, pointer(0))
}
+ /** Set debugging for all serial connections. Debugging results in printing extra messages (from the native library) in case of errors. */
def debug(value: Boolean) = NativeSerial.debug(value)
} \ No newline at end of file
diff --git a/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala b/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
index fda82a6..aebbe3f 100644
--- a/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala
+++ b/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
@@ -1,8 +1,9 @@
-package com.github.jodersky.flow.low
+package com.github.jodersky.flow.internal
import java.io.File
import java.io.FileOutputStream
+/**Loads the current system's native library for flow. */
object NativeLoader {
def load = {
diff --git a/src/main/scala/com/github/jodersky/flow/test.sc b/src/main/scala/com/github/jodersky/flow/test.sc
deleted file mode 100644
index 88a2193..0000000
--- a/src/main/scala/com/github/jodersky/flow/test.sc
+++ /dev/null
@@ -1,52 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.io.PipelineContext
-import akka.io.SymmetricPipePair
-import akka.io.SymmetricPipelineStage
-import akka.util.ByteString
-import java.nio.ByteOrder
-import scala.annotation.tailrec
-import java.nio.ByteBuffer
-import akka.io._
-import akka.actor.{IO=>_,_}
-import Serial._
-
-object test {
-
- val StartByte = 0: Byte //> StartByte : Byte = 0
- val StopByte = 1: Byte //> StopByte : Byte = 1
- val EscapeByte = 2: Byte //> EscapeByte : Byte = 2
-
- val ctx = new PipelineContext{} //> ctx : akka.io.PipelineContext = com.github.jodersky.flow.test$$anonfun$main
- //| $1$$anon$1@32bf7190
-
- val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte)
- //> stages : com.github.jodersky.flow.DelimitedFrame = com.github.jodersky.flow
- //| .DelimitedFrame@36ff057f
-
- val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages)
- //> cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
- //| a.util.ByteString]) = <function1>
- //| evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
- //| a.util.ByteString]) = <function1>
- //| mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akk
- //| a.util.ByteString])] = <function1>
-
- val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)(
- t => println("sent command: " + t), // will receive messages of type Try[ByteString]
- t => println("got event: " + t) // will receive messages of type Try[Message]
- ) //> injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteStri
- //| ng] = akka.io.PipelineFactory$$anon$5@70cb6009
-
-
- val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1))
- //> bs : akka.util.ByteString = ByteString(0, 4, 2, 1, 1, 6, 1)
-
- injector.injectCommand(bs) //> sent command: Success(ByteString(0, 2, 0, 4, 2, 2, 2, 1, 2, 1, 6, 2, 1, 1))
- injector.injectEvent(bs) //> got event: Success(ByteString(4, 1))
-
- implicit val system = ActorSystem("flow")
- //> system : akka.actor.ActorSystem = akka://flow|
- //IO(Serial) ! Open("s", 9600)
-
-} \ No newline at end of file