aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-05-28 14:53:21 +0200
committerJakob Odersky <jodersky@gmail.com>2013-05-28 14:53:21 +0200
commit6b86c91b3901c5d8ad28f68b248152d443954034 (patch)
tree6623bd436c5eaba1ab84227d09f1b8797e42c24c
parent4be8ee0da20995e0e43f50cb4675b65609a613a6 (diff)
downloadakka-serial-6b86c91b3901c5d8ad28f68b248152d443954034.tar.gz
akka-serial-6b86c91b3901c5d8ad28f68b248152d443954034.tar.bz2
akka-serial-6b86c91b3901c5d8ad28f68b248152d443954034.zip
implement akka-io like api
-rw-r--r--example/src/main/scala/com/github/jodersky/flow/example/Main.scala49
-rw-r--r--example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala36
-rw-r--r--main/.worksheet/src/com.github.jodersky.flow.test.scala40
-rw-r--r--main/src/main/c/com_github_jodersky_flow_NativeSerial.h73
-rw-r--r--main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h73
-rw-r--r--main/src/main/c/flow.c12
-rw-r--r--main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java (renamed from main/src/main/java/com/github/jodersky/flow/NativeSerial.java)4
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/Framing.scala104
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/Serial.scala75
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/SerialManager.scala34
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala50
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/exceptions.scala3
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala (renamed from main/src/main/scala/com/github/jodersky/flow/NativeLoader.scala)2
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/low/Serial.scala71
-rw-r--r--main/src/main/scala/com/github/jodersky/flow/test.sc52
-rw-r--r--project/Build.scala6
16 files changed, 523 insertions, 161 deletions
diff --git a/example/src/main/scala/com/github/jodersky/flow/example/Main.scala b/example/src/main/scala/com/github/jodersky/flow/example/Main.scala
index 5032d1c..20fffe2 100644
--- a/example/src/main/scala/com/github/jodersky/flow/example/Main.scala
+++ b/example/src/main/scala/com/github/jodersky/flow/example/Main.scala
@@ -1,41 +1,44 @@
-package com.github.jodersky.flow.example
+package com.github.jodersky.flow
+package example
-import scala.concurrent._
import scala.concurrent.ExecutionContext.Implicits.global
-import com.github.jodersky.flow.Serial
-import scala.util.Try
import scala.util.Success
+import scala.util.Try
+
+import com.github.jodersky.flow.Serial
+import com.github.jodersky.flow.Serial._
+
+import akka.actor.ActorSystem
+import akka.actor.Props
+import akka.io.IO
+import akka.util.ByteString
object Main {
def main(args: Array[String]): Unit = {
- val isInt = Try(args(1).toInt) match {case Success(_) => true; case _ => false}
+ /*val isInt = Try(args(1).toInt) match { case Success(_) => true; case _ => false }
if (!(args.length == 2 && isInt)) {
println("invalid parameters")
println("parameters: port baud")
println("example: /dev/ttyACM0 115200")
return
- }
- val port = args(0)
- val baud = args(1).toInt
+ }*/
+ val port = "/dev/ttyACM0"
+ val baud = 115200
+
+ //low.Serial.debug(true)
- Serial.debug(true)
+ implicit val system = ActorSystem("flow")
+ val serial = system.actorOf(Props[SerialHandler], name = "serial-handler")
- println("opening " + port)
- val serial = Serial.open(port, baud) { data =>
- println("received: " + data.mkString("[", ",", "]"))
- }
+ IO(Serial) ! Serial.Open(serial, port, baud)
- println("press enter to write a looong array of data")
- Console.readLine()
-
- val data: Array[Byte] = Array.fill(100)(42)
- serial.write(data).map(d => println("wrote: " + d.mkString("[", ",", "]"))).recover { case t => println("write error") }
-
- println("press enter to exit")
- Console.readLine()
+ readLine()
+ serial ! Write(ByteString(42))
- println("exiting")
- Console.flush()
+ readLine()
+ //serial ! Close
+ system.shutdown()
+
}
} \ No newline at end of file
diff --git a/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala b/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala
new file mode 100644
index 0000000..d228cb9
--- /dev/null
+++ b/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala
@@ -0,0 +1,36 @@
+package com.github.jodersky.flow.example
+
+import com.github.jodersky.flow.Serial._
+import com.github.jodersky.flow.low.{Serial => LowSerial}
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.util.ByteString
+
+class SerialHandler extends Actor with ActorLogging {
+ var operator: Option[ActorRef] = None
+
+ def receive = {
+ case Opened(operator) => this.operator = Some(operator)
+
+ case CommandFailed(cmd, reason) =>
+ println(s"command ${cmd} failed, reason: ${reason}")
+
+ case Received(data) => println("received data: " + formatData(data))
+
+ case Close =>
+ operator.map(_ ! Close)
+
+ case Closed(_) => println("port closed")
+
+ case Write(data) => {
+ operator.map(_ ! Write(data))
+ }
+
+ case Wrote(data) => println("wrote data: " + formatData(data))
+
+ }
+
+ private def formatData(data: ByteString) = data.mkString("[",",","]")
+
+} \ No newline at end of file
diff --git a/main/.worksheet/src/com.github.jodersky.flow.test.scala b/main/.worksheet/src/com.github.jodersky.flow.test.scala
new file mode 100644
index 0000000..1510da1
--- /dev/null
+++ b/main/.worksheet/src/com.github.jodersky.flow.test.scala
@@ -0,0 +1,40 @@
+package com.github.jodersky.flow
+
+import akka.io.PipelineContext
+import akka.io.SymmetricPipePair
+import akka.io.SymmetricPipelineStage
+import akka.util.ByteString
+import java.nio.ByteOrder
+import scala.annotation.tailrec
+import java.nio.ByteBuffer
+import akka.io._
+import akka.actor.{IO=>_,_}
+import Serial._
+
+object test {;import org.scalaide.worksheet.runtime.library.WorksheetSupport._; def main(args: Array[String])=$execute{;$skip(351);
+
+ val StartByte = 0: Byte;System.out.println("""StartByte : Byte = """ + $show(StartByte ));$skip(25);
+ val StopByte = 1: Byte;System.out.println("""StopByte : Byte = """ + $show(StopByte ));$skip(27);
+ val EscapeByte = 2: Byte;System.out.println("""EscapeByte : Byte = """ + $show(EscapeByte ));$skip(36);
+
+ val ctx = new PipelineContext{};System.out.println("""ctx : akka.io.PipelineContext = """ + $show(ctx ));$skip(68);
+
+ val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte);System.out.println("""stages : com.github.jodersky.flow.DelimitedFrame = """ + $show(stages ));$skip(88);
+
+ val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages);System.out.println("""cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akka.util.ByteString]) = """ + $show(cmd ));System.out.println("""evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akka.util.ByteString]) = """ + $show(evt ));System.out.println("""mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akka.util.ByteString])] = """ + $show(mgmt ));$skip(243);
+
+ val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)(
+ t => println("sent command: " + t), // will receive messages of type Try[ByteString]
+ t => println("got event: " + t) // will receive messages of type Try[Message]
+ );System.out.println("""injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteString] = """ + $show(injector ));$skip(59);
+
+
+ val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1));System.out.println("""bs : akka.util.ByteString = """ + $show(bs ));$skip(51);
+
+ injector.injectCommand(bs);$skip(27);
+ injector.injectEvent(bs);$skip(47);
+
+ implicit val system = ActorSystem("flow");System.out.println("""system : akka.actor.ActorSystem = """ + $show(system ))}
+ //IO(Serial) ! Open("s", 9600)
+
+}
diff --git a/main/src/main/c/com_github_jodersky_flow_NativeSerial.h b/main/src/main/c/com_github_jodersky_flow_NativeSerial.h
deleted file mode 100644
index e89652c..0000000
--- a/main/src/main/c/com_github_jodersky_flow_NativeSerial.h
+++ /dev/null
@@ -1,73 +0,0 @@
-/* DO NOT EDIT THIS FILE - it is machine generated */
-#include <jni.h>
-/* Header for class com_github_jodersky_flow_NativeSerial */
-
-#ifndef _Included_com_github_jodersky_flow_NativeSerial
-#define _Included_com_github_jodersky_flow_NativeSerial
-#ifdef __cplusplus
-extern "C" {
-#endif
-#undef com_github_jodersky_flow_NativeSerial_E_PERMISSION
-#define com_github_jodersky_flow_NativeSerial_E_PERMISSION -1L
-#undef com_github_jodersky_flow_NativeSerial_E_OPEN
-#define com_github_jodersky_flow_NativeSerial_E_OPEN -2L
-#undef com_github_jodersky_flow_NativeSerial_E_BUSY
-#define com_github_jodersky_flow_NativeSerial_E_BUSY -3L
-#undef com_github_jodersky_flow_NativeSerial_E_BAUD
-#define com_github_jodersky_flow_NativeSerial_E_BAUD -4L
-#undef com_github_jodersky_flow_NativeSerial_E_PIPE
-#define com_github_jodersky_flow_NativeSerial_E_PIPE -5L
-#undef com_github_jodersky_flow_NativeSerial_E_MALLOC
-#define com_github_jodersky_flow_NativeSerial_E_MALLOC -6L
-#undef com_github_jodersky_flow_NativeSerial_E_POINTER
-#define com_github_jodersky_flow_NativeSerial_E_POINTER -7L
-#undef com_github_jodersky_flow_NativeSerial_E_POLL
-#define com_github_jodersky_flow_NativeSerial_E_POLL -8L
-#undef com_github_jodersky_flow_NativeSerial_E_IO
-#define com_github_jodersky_flow_NativeSerial_E_IO -9L
-#undef com_github_jodersky_flow_NativeSerial_E_CLOSE
-#define com_github_jodersky_flow_NativeSerial_E_CLOSE -10L
-/*
- * Class: com_github_jodersky_flow_NativeSerial
- * Method: open
- * Signature: (Ljava/lang/String;I[J)I
- */
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open
- (JNIEnv *, jclass, jstring, jint, jlongArray);
-
-/*
- * Class: com_github_jodersky_flow_NativeSerial
- * Method: read
- * Signature: (J[B)I
- */
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read
- (JNIEnv *, jclass, jlong, jbyteArray);
-
-/*
- * Class: com_github_jodersky_flow_NativeSerial
- * Method: write
- * Signature: (J[B)I
- */
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write
- (JNIEnv *, jclass, jlong, jbyteArray);
-
-/*
- * Class: com_github_jodersky_flow_NativeSerial
- * Method: close
- * Signature: (J)V
- */
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_close
- (JNIEnv *, jclass, jlong);
-
-/*
- * Class: com_github_jodersky_flow_NativeSerial
- * Method: debug
- * Signature: (Z)V
- */
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_debug
- (JNIEnv *, jclass, jboolean);
-
-#ifdef __cplusplus
-}
-#endif
-#endif
diff --git a/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h b/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h
new file mode 100644
index 0000000..54ea963
--- /dev/null
+++ b/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h
@@ -0,0 +1,73 @@
+/* DO NOT EDIT THIS FILE - it is machine generated */
+#include <jni.h>
+/* Header for class com_github_jodersky_flow_low_NativeSerial */
+
+#ifndef _Included_com_github_jodersky_flow_low_NativeSerial
+#define _Included_com_github_jodersky_flow_low_NativeSerial
+#ifdef __cplusplus
+extern "C" {
+#endif
+#undef com_github_jodersky_flow_low_NativeSerial_E_PERMISSION
+#define com_github_jodersky_flow_low_NativeSerial_E_PERMISSION -1L
+#undef com_github_jodersky_flow_low_NativeSerial_E_OPEN
+#define com_github_jodersky_flow_low_NativeSerial_E_OPEN -2L
+#undef com_github_jodersky_flow_low_NativeSerial_E_BUSY
+#define com_github_jodersky_flow_low_NativeSerial_E_BUSY -3L
+#undef com_github_jodersky_flow_low_NativeSerial_E_BAUD
+#define com_github_jodersky_flow_low_NativeSerial_E_BAUD -4L
+#undef com_github_jodersky_flow_low_NativeSerial_E_PIPE
+#define com_github_jodersky_flow_low_NativeSerial_E_PIPE -5L
+#undef com_github_jodersky_flow_low_NativeSerial_E_MALLOC
+#define com_github_jodersky_flow_low_NativeSerial_E_MALLOC -6L
+#undef com_github_jodersky_flow_low_NativeSerial_E_POINTER
+#define com_github_jodersky_flow_low_NativeSerial_E_POINTER -7L
+#undef com_github_jodersky_flow_low_NativeSerial_E_POLL
+#define com_github_jodersky_flow_low_NativeSerial_E_POLL -8L
+#undef com_github_jodersky_flow_low_NativeSerial_E_IO
+#define com_github_jodersky_flow_low_NativeSerial_E_IO -9L
+#undef com_github_jodersky_flow_low_NativeSerial_E_CLOSE
+#define com_github_jodersky_flow_low_NativeSerial_E_CLOSE -10L
+/*
+ * Class: com_github_jodersky_flow_low_NativeSerial
+ * Method: open
+ * Signature: (Ljava/lang/String;I[J)I
+ */
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open
+ (JNIEnv *, jclass, jstring, jint, jlongArray);
+
+/*
+ * Class: com_github_jodersky_flow_low_NativeSerial
+ * Method: read
+ * Signature: (J[B)I
+ */
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read
+ (JNIEnv *, jclass, jlong, jbyteArray);
+
+/*
+ * Class: com_github_jodersky_flow_low_NativeSerial
+ * Method: write
+ * Signature: (J[B)I
+ */
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write
+ (JNIEnv *, jclass, jlong, jbyteArray);
+
+/*
+ * Class: com_github_jodersky_flow_low_NativeSerial
+ * Method: close
+ * Signature: (J)V
+ */
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close
+ (JNIEnv *, jclass, jlong);
+
+/*
+ * Class: com_github_jodersky_flow_low_NativeSerial
+ * Method: debug
+ * Signature: (Z)V
+ */
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug
+ (JNIEnv *, jclass, jboolean);
+
+#ifdef __cplusplus
+}
+#endif
+#endif
diff --git a/main/src/main/c/flow.c b/main/src/main/c/flow.c
index 3ed66b6..89813ce 100644
--- a/main/src/main/c/flow.c
+++ b/main/src/main/c/flow.c
@@ -37,7 +37,7 @@
#include <termios.h>
#include <fcntl.h>
#include <poll.h>
-#include "com_github_jodersky_flow_NativeSerial.h"
+#include "com_github_jodersky_flow_low_NativeSerial.h"
#define E_PERMISSION -1
#define E_OPEN -2
@@ -236,7 +236,7 @@ inline jlong s2j(struct serial_config* pointer) {
return (jlong) pointer;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open
(JNIEnv *env, jclass clazz, jstring device, jint baud, jlongArray jserialp)
{
const char *dev = (*env)->GetStringUTFChars(env, device, 0);
@@ -250,13 +250,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open
return r;
}
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_close
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close
(JNIEnv * env, jclass clazz, jlong serial)
{
serial_close(j2s(serial));
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read
(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
{
@@ -272,7 +272,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read
return n;
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write
(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
{
unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL);
@@ -284,7 +284,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write
return r;
}
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_debug
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug
(JNIEnv *env, jclass clazz, jboolean value)
{
debug = (bool) value;
diff --git a/main/src/main/java/com/github/jodersky/flow/NativeSerial.java b/main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java
index f5d708e..6bdcde5 100644
--- a/main/src/main/java/com/github/jodersky/flow/NativeSerial.java
+++ b/main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java
@@ -1,6 +1,6 @@
-package com.github.jodersky.flow;
+package com.github.jodersky.flow.low;
-public class NativeSerial {
+class NativeSerial {
static {
NativeLoader.load();
diff --git a/main/src/main/scala/com/github/jodersky/flow/Framing.scala b/main/src/main/scala/com/github/jodersky/flow/Framing.scala
new file mode 100644
index 0000000..f8173a7
--- /dev/null
+++ b/main/src/main/scala/com/github/jodersky/flow/Framing.scala
@@ -0,0 +1,104 @@
+package com.github.jodersky.flow
+
+import akka.io.PipelineContext
+import akka.io.SymmetricPipePair
+import akka.io.SymmetricPipelineStage
+import akka.util.ByteString
+import java.nio.ByteOrder
+import scala.annotation.tailrec
+import java.nio.ByteBuffer
+
+class DelimitedFrame(
+ StartByte: Byte,
+ StopByte: Byte,
+ EscapeByte: Byte)
+ //byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN)
+ extends SymmetricPipelineStage[PipelineContext, ByteString, ByteString] {
+
+ // range checks omitted ...
+
+ override def apply(ctx: PipelineContext) =
+ new SymmetricPipePair[ByteString, ByteString] {
+ var buffer = ByteString.empty
+ //implicit val byteOrder = DelimitedFrame.this.byteOrder
+
+ sealed trait State
+ case object Waiting extends State
+ case object Accepting extends State
+ case object Escaping extends State
+
+ def extractFrame(bs: ByteString, accepted: ByteString, state: State): (ByteString, Option[ByteString]) = { //(remaining, frame))
+ if (bs.isEmpty && state == Waiting) (ByteString.empty, None)
+ else if (bs.isEmpty) (accepted, None)
+ else {
+ val in = bs.head
+
+ state match {
+ case Waiting if (in == StartByte) => extractFrame(bs.tail, accepted, Accepting)
+ case Escaping => extractFrame(bs.tail, accepted ++ ByteString(in), Accepting)
+ case Accepting => in match {
+ case EscapeByte => extractFrame(bs.tail, accepted, Escaping)
+ case StartByte => extractFrame(bs.tail, ByteString.empty, Accepting)
+ case StopByte => (bs.tail, Some(accepted))
+ case other => extractFrame(bs.tail, accepted ++ ByteString(other), Accepting)
+ }
+ case _ => extractFrame(bs.tail, accepted, state)
+ }
+ }
+ }
+
+ def extractFrames(bs: ByteString, accepted: List[ByteString]): (ByteString, List[ByteString]) = {
+ val (remainder, frame) = extractFrame(bs, ByteString.empty, Waiting)
+
+ frame match {
+ case None => (remainder, accepted)
+ case Some(data) => extractFrames(remainder, data :: accepted)
+ }
+ }
+
+ /*
+ * This is how commands (writes) are transformed: calculate length
+ * including header, write that to a ByteStringBuilder and append the
+ * payload data. The result is a single command (i.e. `Right(...)`).
+ */
+ override def commandPipeline = { bs: ByteString =>
+ val bb = ByteString.newBuilder
+
+ def escape(b: Byte) = {
+ bb += EscapeByte
+ bb += b
+ }
+
+ bb += StartByte
+ for (b <- bs) {
+ b match {
+ case StartByte => escape(b)
+ case StopByte => escape(b)
+ case EscapeByte => escape(b)
+ case _ => bb += b
+ }
+ }
+ bb += StopByte
+
+ ctx.singleCommand(bb.result)
+ }
+
+ /*
+ * This is how events (reads) are transformed: append the received
+ * ByteString to the buffer (if any) and extract the frames from the
+ * result. In the end store the new buffer contents and return the
+ * list of events (i.e. `Left(...)`).
+ */
+ override def eventPipeline = { bs: ByteString =>
+ val data = buffer ++ bs
+ val (remainder, frames) = extractFrames(data, Nil)
+ buffer = remainder
+
+ frames match {
+ case Nil => Nil
+ case one :: Nil ⇒ ctx.singleEvent(one)
+ case many ⇒ many reverseMap (Left(_))
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/Serial.scala b/main/src/main/scala/com/github/jodersky/flow/Serial.scala
index 7565cbd..3f28ad8 100644
--- a/main/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/main/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -1,63 +1,32 @@
package com.github.jodersky.flow
-import scala.concurrent._
-import scala.concurrent.ExecutionContext.Implicits._
-import java.io.IOException
+import akka.io._
+import akka.actor.ExtensionKey
+import akka.actor.ExtendedActorSystem
+import akka.actor.Props
+import low.{ Serial => LowSerial }
+import akka.actor.ActorRef
+import akka.util.ByteString
-class Serial private (val port: String, private val pointer: Long, reader: Array[Byte] => Unit) {
- future {
- var n = 0
- val buffer = new Array[Byte](100)
- while (n >= 0) {
- n = NativeSerial.read(pointer, buffer)
- import NativeSerial._
- n match {
- case E_POINTER => throw new NullPointerException("pointer to native serial")
- case E_POLL => throw new IOException(port + ": polling")
- case E_IO => throw new IOException(port + ": reading")
- case E_CLOSE => println("close request")
- case x if x > 0 => reader(buffer.take(n))
- }
- }
- }
+object Serial extends ExtensionKey[SerialExt] {
- private def writeBlock(data: Array[Byte]) = synchronized {
- import NativeSerial._
- NativeSerial.write(pointer, data) match {
- case E_POINTER => throw new NullPointerException("pointer to native serial")
- case E_IO => throw new IOException(port + ": writing")
- case r => data.take(r)
- }
- }
-
- def write(data: Array[Byte]) = future { writeBlock(data) }
-
- def close() = synchronized {
- NativeSerial.close(pointer)
- }
-
-}
+ trait Command
+ trait Event
-object Serial {
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
- def open(port: String, baud: Int)(reader: Array[Byte] => Unit) = synchronized {
- val pointer = new Array[Long](1)
- val result = NativeSerial.open(port, baud, pointer)
+ case class Open(handler: ActorRef, port: String, baud: Int) extends Command
+ case class Opened(operator: ActorRef) extends Command
- import NativeSerial._
- result match {
- case E_PERMISSION => throw new AccessDeniedException(port)
- case E_OPEN => throw new NoSuchPortException(port)
- case E_BUSY => throw new PortInUseException(port)
- case E_BAUD => throw new IllegalArgumentException(
- s"invalid baudrate ${baud}, use standard unix values")
- case E_PIPE => throw new IOException("cannot create pipe")
- case E_MALLOC => throw new IOException("cannot allocate memory")
- case 0 => new Serial(port, pointer(0), reader)
- case _ => throw new Exception("cannot open port")
- }
- }
+ case object Close extends Command
+ case class Closed(reason: Throwable) extends Event
+
+ case class Write(data: ByteString) extends Command
+ case class Wrote(data: ByteString) extends Event
- def debug(value: Boolean) = NativeSerial.debug(value)
+ case class Received(data: ByteString) extends Event
+}
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ def manager = system.actorOf(Props[SerialManager], name = "IO-SERIAL")
} \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
new file mode 100644
index 0000000..b288a71
--- /dev/null
+++ b/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -0,0 +1,34 @@
+package com.github.jodersky.flow
+
+import akka.actor.Actor
+import Serial._
+import low.{ Serial => LowSerial }
+import scala.util.Success
+import scala.util.Failure
+import akka.actor.Props
+
+class SerialManager extends Actor {
+ import SerialManager._
+ import context._
+
+ def receive = {
+ case command @ Open(handler, port, baud) =>
+ LowSerial.open(port, baud).onComplete(_ match {
+ case Success(serial) => {
+ val operator = context.actorOf(Props(classOf[SerialOperator], serial, handler), name = escapePortString(port))
+ handler ! Opened(operator)
+ }
+ case Failure(t) => handler ! CommandFailed(command, t)
+ })
+ }
+
+}
+
+object SerialManager {
+
+ private def escapePortString(port: String) = port collect {
+ case '/' => '-'
+ case c => c
+ }
+
+} \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
new file mode 100644
index 0000000..dcfb22b
--- /dev/null
+++ b/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -0,0 +1,50 @@
+package com.github.jodersky.flow
+
+import scala.util.Failure
+import scala.util.Success
+
+import Serial._
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.util.ByteString
+import low.{ Serial => LowSerial }
+
+class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor {
+ import context._
+
+ context.watch(handler)
+ startRead()
+
+ def receive = {
+ case c @ Write(data) => {
+ val writer = sender
+ serial.write(data.toArray).onComplete {
+ case Success(data) => writer ! Wrote(ByteString(data))
+ case Failure(t) => writer ! CommandFailed(c, t)
+ }
+ }
+ case Close => {
+ context.stop(self)
+ }
+ }
+
+ private def startRead(): Unit = {
+ val futureData = serial.read()
+ futureData.onComplete {
+ case Failure(t) => {
+ handler ! Closed(t)
+ context.stop(self)
+ }
+ case Success(data) => {
+ handler ! Received(ByteString(data))
+ startRead()
+ }
+ }
+ }
+
+ override def postStop = {
+ serial.close()
+ }
+
+} \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/main/src/main/scala/com/github/jodersky/flow/exceptions.scala
index a360079..c7b2fa1 100644
--- a/main/src/main/scala/com/github/jodersky/flow/exceptions.scala
+++ b/main/src/main/scala/com/github/jodersky/flow/exceptions.scala
@@ -4,4 +4,5 @@ import java.io.IOException
class NoSuchPortException(message: String) extends IOException(message)
class PortInUseException(message: String) extends IOException(message)
-class AccessDeniedException(message: String) extends IOException(message) \ No newline at end of file
+class AccessDeniedException(message: String) extends IOException(message)
+class PortClosingException(message: String) extends IOException(message) \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/NativeLoader.scala b/main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala
index 91b3fd8..fda82a6 100644
--- a/main/src/main/scala/com/github/jodersky/flow/NativeLoader.scala
+++ b/main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package com.github.jodersky.flow.low
import java.io.File
import java.io.FileOutputStream
diff --git a/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala
new file mode 100644
index 0000000..ac58187
--- /dev/null
+++ b/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala
@@ -0,0 +1,71 @@
+package com.github.jodersky.flow.low
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits._
+import java.io.IOException
+import com.github.jodersky.flow.AccessDeniedException
+import com.github.jodersky.flow.NoSuchPortException
+import com.github.jodersky.flow.PortInUseException
+import com.github.jodersky.flow.PortClosingException
+import scala.util.Try
+
+class Serial private (val port: String, private val pointer: Long) {
+ import NativeSerial._
+
+ private def doRead(): Array[Byte] = synchronized {
+ val buffer = new Array[Byte](100)
+ NativeSerial.read(pointer, buffer) match {
+ case E_POINTER => throw new NullPointerException("pointer to native serial")
+ case E_POLL => throw new IOException(port + ": polling")
+ case E_IO => throw new IOException(port + ": reading")
+ case E_CLOSE => throw new PortClosingException(port + " closing")
+ case bytes if bytes > 0 => buffer.take(bytes)
+ case error => throw new IOException(s"unknown read error ${error}")
+ }
+ }
+
+ private def doWrite(data: Array[Byte]): Array[Byte] = {
+ import NativeSerial._
+ NativeSerial.write(pointer, data) match {
+ case E_POINTER => throw new NullPointerException("pointer to native serial")
+ case E_IO => throw new IOException(port + ": writing")
+ case bytes if bytes > 0 => data.take(bytes)
+ case error => throw new IOException(s"unknown write error ${error}")
+ }
+ }
+
+ def read() = future { doRead() }
+
+ def write(data: Array[Byte]) = future { doWrite(data) }
+
+ def close() = {
+ NativeSerial.close(pointer)
+ }
+
+}
+
+object Serial {
+
+ def doOpen(port: String, baud: Int) = synchronized {
+ val pointer = new Array[Long](1)
+ val result = NativeSerial.open(port, baud, pointer)
+
+ import NativeSerial._
+
+ result match {
+ case E_PERMISSION => throw new AccessDeniedException(port)
+ case E_OPEN => throw new NoSuchPortException(port)
+ case E_BUSY => throw new PortInUseException(port)
+ case E_BAUD => throw new IllegalArgumentException(s"invalid baudrate ${baud}, use standard unix values")
+ case E_PIPE => throw new IOException("cannot create pipe")
+ case E_MALLOC => throw new IOException("cannot allocate memory for serial port")
+ case 0 => new Serial(port, pointer(0))
+ case error => throw new IOException(s"unknown error ${error}")
+ }
+ }
+
+ def open(port: String, baud: Int) = future { doOpen(port, baud) }
+
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+} \ No newline at end of file
diff --git a/main/src/main/scala/com/github/jodersky/flow/test.sc b/main/src/main/scala/com/github/jodersky/flow/test.sc
new file mode 100644
index 0000000..88a2193
--- /dev/null
+++ b/main/src/main/scala/com/github/jodersky/flow/test.sc
@@ -0,0 +1,52 @@
+package com.github.jodersky.flow
+
+import akka.io.PipelineContext
+import akka.io.SymmetricPipePair
+import akka.io.SymmetricPipelineStage
+import akka.util.ByteString
+import java.nio.ByteOrder
+import scala.annotation.tailrec
+import java.nio.ByteBuffer
+import akka.io._
+import akka.actor.{IO=>_,_}
+import Serial._
+
+object test {
+
+ val StartByte = 0: Byte //> StartByte : Byte = 0
+ val StopByte = 1: Byte //> StopByte : Byte = 1
+ val EscapeByte = 2: Byte //> EscapeByte : Byte = 2
+
+ val ctx = new PipelineContext{} //> ctx : akka.io.PipelineContext = com.github.jodersky.flow.test$$anonfun$main
+ //| $1$$anon$1@32bf7190
+
+ val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte)
+ //> stages : com.github.jodersky.flow.DelimitedFrame = com.github.jodersky.flow
+ //| .DelimitedFrame@36ff057f
+
+ val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages)
+ //> cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString]) = <function1>
+ //| evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString]) = <function1>
+ //| mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString])] = <function1>
+
+ val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)(
+ t => println("sent command: " + t), // will receive messages of type Try[ByteString]
+ t => println("got event: " + t) // will receive messages of type Try[Message]
+ ) //> injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteStri
+ //| ng] = akka.io.PipelineFactory$$anon$5@70cb6009
+
+
+ val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1))
+ //> bs : akka.util.ByteString = ByteString(0, 4, 2, 1, 1, 6, 1)
+
+ injector.injectCommand(bs) //> sent command: Success(ByteString(0, 2, 0, 4, 2, 2, 2, 1, 2, 1, 6, 2, 1, 1))
+ injector.injectEvent(bs) //> got event: Success(ByteString(4, 1))
+
+ implicit val system = ActorSystem("flow")
+ //> system : akka.actor.ActorSystem = akka://flow|
+ //IO(Serial) ! Open("s", 9600)
+
+} \ No newline at end of file
diff --git a/project/Build.scala b/project/Build.scala
index 04d82de..ccee261 100644
--- a/project/Build.scala
+++ b/project/Build.scala
@@ -28,7 +28,7 @@ object FlowBuild extends Build {
lazy val jniSettings = JNIBuild.defaults ++ Seq(
jdkHome := file(System.getProperty("java.home")) / "..",
- javaClass := "com.github.jodersky.flow.NativeSerial",
+ javaClass := "com.github.jodersky.flow.low.NativeSerial",
NativeBuild.compiler := "gcc",
options := Seq("-fPIC"),
NativeBuild.includeDirectories <<= jdkHome apply (jdk => Seq(jdk / "include", jdk / "include" / "linux")),
@@ -49,9 +49,11 @@ object FlowBuild extends Build {
}
object Dependencies {
- lazy val all = Seq()
lazy val io = "com.github.scala-incubator.io" %% "scala-io-core" % "0.4.2"
lazy val file = "com.github.scala-incubator.io" %% "scala-io-file" % "0.4.2"
+ lazy val akka = "com.typesafe.akka" %% "akka-actor" % "2.2-M3"
+
+ lazy val all = Seq(akka)
}