diff options
16 files changed, 523 insertions, 161 deletions
diff --git a/example/src/main/scala/com/github/jodersky/flow/example/Main.scala b/example/src/main/scala/com/github/jodersky/flow/example/Main.scala index 5032d1c..20fffe2 100644 --- a/example/src/main/scala/com/github/jodersky/flow/example/Main.scala +++ b/example/src/main/scala/com/github/jodersky/flow/example/Main.scala @@ -1,41 +1,44 @@ -package com.github.jodersky.flow.example +package com.github.jodersky.flow +package example -import scala.concurrent._ import scala.concurrent.ExecutionContext.Implicits.global -import com.github.jodersky.flow.Serial -import scala.util.Try import scala.util.Success +import scala.util.Try + +import com.github.jodersky.flow.Serial +import com.github.jodersky.flow.Serial._ + +import akka.actor.ActorSystem +import akka.actor.Props +import akka.io.IO +import akka.util.ByteString object Main { def main(args: Array[String]): Unit = { - val isInt = Try(args(1).toInt) match {case Success(_) => true; case _ => false} + /*val isInt = Try(args(1).toInt) match { case Success(_) => true; case _ => false } if (!(args.length == 2 && isInt)) { println("invalid parameters") println("parameters: port baud") println("example: /dev/ttyACM0 115200") return - } - val port = args(0) - val baud = args(1).toInt + }*/ + val port = "/dev/ttyACM0" + val baud = 115200 + + //low.Serial.debug(true) - Serial.debug(true) + implicit val system = ActorSystem("flow") + val serial = system.actorOf(Props[SerialHandler], name = "serial-handler") - println("opening " + port) - val serial = Serial.open(port, baud) { data => - println("received: " + data.mkString("[", ",", "]")) - } + IO(Serial) ! Serial.Open(serial, port, baud) - println("press enter to write a looong array of data") - Console.readLine() - - val data: Array[Byte] = Array.fill(100)(42) - serial.write(data).map(d => println("wrote: " + d.mkString("[", ",", "]"))).recover { case t => println("write error") } - - println("press enter to exit") - Console.readLine() + readLine() + serial ! Write(ByteString(42)) - println("exiting") - Console.flush() + readLine() + //serial ! Close + system.shutdown() + } }
\ No newline at end of file diff --git a/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala b/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala new file mode 100644 index 0000000..d228cb9 --- /dev/null +++ b/example/src/main/scala/com/github/jodersky/flow/example/SerialHandler.scala @@ -0,0 +1,36 @@ +package com.github.jodersky.flow.example + +import com.github.jodersky.flow.Serial._ +import com.github.jodersky.flow.low.{Serial => LowSerial} +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.util.ByteString + +class SerialHandler extends Actor with ActorLogging { + var operator: Option[ActorRef] = None + + def receive = { + case Opened(operator) => this.operator = Some(operator) + + case CommandFailed(cmd, reason) => + println(s"command ${cmd} failed, reason: ${reason}") + + case Received(data) => println("received data: " + formatData(data)) + + case Close => + operator.map(_ ! Close) + + case Closed(_) => println("port closed") + + case Write(data) => { + operator.map(_ ! Write(data)) + } + + case Wrote(data) => println("wrote data: " + formatData(data)) + + } + + private def formatData(data: ByteString) = data.mkString("[",",","]") + +}
\ No newline at end of file diff --git a/main/.worksheet/src/com.github.jodersky.flow.test.scala b/main/.worksheet/src/com.github.jodersky.flow.test.scala new file mode 100644 index 0000000..1510da1 --- /dev/null +++ b/main/.worksheet/src/com.github.jodersky.flow.test.scala @@ -0,0 +1,40 @@ +package com.github.jodersky.flow + +import akka.io.PipelineContext +import akka.io.SymmetricPipePair +import akka.io.SymmetricPipelineStage +import akka.util.ByteString +import java.nio.ByteOrder +import scala.annotation.tailrec +import java.nio.ByteBuffer +import akka.io._ +import akka.actor.{IO=>_,_} +import Serial._ + +object test {;import org.scalaide.worksheet.runtime.library.WorksheetSupport._; def main(args: Array[String])=$execute{;$skip(351); + + val StartByte = 0: Byte;System.out.println("""StartByte : Byte = """ + $show(StartByte ));$skip(25); + val StopByte = 1: Byte;System.out.println("""StopByte : Byte = """ + $show(StopByte ));$skip(27); + val EscapeByte = 2: Byte;System.out.println("""EscapeByte : Byte = """ + $show(EscapeByte ));$skip(36); + + val ctx = new PipelineContext{};System.out.println("""ctx : akka.io.PipelineContext = """ + $show(ctx ));$skip(68); + + val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte);System.out.println("""stages : com.github.jodersky.flow.DelimitedFrame = """ + $show(stages ));$skip(88); + + val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages);System.out.println("""cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akka.util.ByteString]) = """ + $show(cmd ));System.out.println("""evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akka.util.ByteString]) = """ + $show(evt ));System.out.println("""mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akka.util.ByteString])] = """ + $show(mgmt ));$skip(243); + + val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)( + t => println("sent command: " + t), // will receive messages of type Try[ByteString] + t => println("got event: " + t) // will receive messages of type Try[Message] + );System.out.println("""injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteString] = """ + $show(injector ));$skip(59); + + + val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1));System.out.println("""bs : akka.util.ByteString = """ + $show(bs ));$skip(51); + + injector.injectCommand(bs);$skip(27); + injector.injectEvent(bs);$skip(47); + + implicit val system = ActorSystem("flow");System.out.println("""system : akka.actor.ActorSystem = """ + $show(system ))} + //IO(Serial) ! Open("s", 9600) + +} diff --git a/main/src/main/c/com_github_jodersky_flow_NativeSerial.h b/main/src/main/c/com_github_jodersky_flow_NativeSerial.h deleted file mode 100644 index e89652c..0000000 --- a/main/src/main/c/com_github_jodersky_flow_NativeSerial.h +++ /dev/null @@ -1,73 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include <jni.h> -/* Header for class com_github_jodersky_flow_NativeSerial */ - -#ifndef _Included_com_github_jodersky_flow_NativeSerial -#define _Included_com_github_jodersky_flow_NativeSerial -#ifdef __cplusplus -extern "C" { -#endif -#undef com_github_jodersky_flow_NativeSerial_E_PERMISSION -#define com_github_jodersky_flow_NativeSerial_E_PERMISSION -1L -#undef com_github_jodersky_flow_NativeSerial_E_OPEN -#define com_github_jodersky_flow_NativeSerial_E_OPEN -2L -#undef com_github_jodersky_flow_NativeSerial_E_BUSY -#define com_github_jodersky_flow_NativeSerial_E_BUSY -3L -#undef com_github_jodersky_flow_NativeSerial_E_BAUD -#define com_github_jodersky_flow_NativeSerial_E_BAUD -4L -#undef com_github_jodersky_flow_NativeSerial_E_PIPE -#define com_github_jodersky_flow_NativeSerial_E_PIPE -5L -#undef com_github_jodersky_flow_NativeSerial_E_MALLOC -#define com_github_jodersky_flow_NativeSerial_E_MALLOC -6L -#undef com_github_jodersky_flow_NativeSerial_E_POINTER -#define com_github_jodersky_flow_NativeSerial_E_POINTER -7L -#undef com_github_jodersky_flow_NativeSerial_E_POLL -#define com_github_jodersky_flow_NativeSerial_E_POLL -8L -#undef com_github_jodersky_flow_NativeSerial_E_IO -#define com_github_jodersky_flow_NativeSerial_E_IO -9L -#undef com_github_jodersky_flow_NativeSerial_E_CLOSE -#define com_github_jodersky_flow_NativeSerial_E_CLOSE -10L -/* - * Class: com_github_jodersky_flow_NativeSerial - * Method: open - * Signature: (Ljava/lang/String;I[J)I - */ -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open - (JNIEnv *, jclass, jstring, jint, jlongArray); - -/* - * Class: com_github_jodersky_flow_NativeSerial - * Method: read - * Signature: (J[B)I - */ -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read - (JNIEnv *, jclass, jlong, jbyteArray); - -/* - * Class: com_github_jodersky_flow_NativeSerial - * Method: write - * Signature: (J[B)I - */ -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write - (JNIEnv *, jclass, jlong, jbyteArray); - -/* - * Class: com_github_jodersky_flow_NativeSerial - * Method: close - * Signature: (J)V - */ -JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_close - (JNIEnv *, jclass, jlong); - -/* - * Class: com_github_jodersky_flow_NativeSerial - * Method: debug - * Signature: (Z)V - */ -JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_debug - (JNIEnv *, jclass, jboolean); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h b/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h new file mode 100644 index 0000000..54ea963 --- /dev/null +++ b/main/src/main/c/com_github_jodersky_flow_low_NativeSerial.h @@ -0,0 +1,73 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include <jni.h> +/* Header for class com_github_jodersky_flow_low_NativeSerial */ + +#ifndef _Included_com_github_jodersky_flow_low_NativeSerial +#define _Included_com_github_jodersky_flow_low_NativeSerial +#ifdef __cplusplus +extern "C" { +#endif +#undef com_github_jodersky_flow_low_NativeSerial_E_PERMISSION +#define com_github_jodersky_flow_low_NativeSerial_E_PERMISSION -1L +#undef com_github_jodersky_flow_low_NativeSerial_E_OPEN +#define com_github_jodersky_flow_low_NativeSerial_E_OPEN -2L +#undef com_github_jodersky_flow_low_NativeSerial_E_BUSY +#define com_github_jodersky_flow_low_NativeSerial_E_BUSY -3L +#undef com_github_jodersky_flow_low_NativeSerial_E_BAUD +#define com_github_jodersky_flow_low_NativeSerial_E_BAUD -4L +#undef com_github_jodersky_flow_low_NativeSerial_E_PIPE +#define com_github_jodersky_flow_low_NativeSerial_E_PIPE -5L +#undef com_github_jodersky_flow_low_NativeSerial_E_MALLOC +#define com_github_jodersky_flow_low_NativeSerial_E_MALLOC -6L +#undef com_github_jodersky_flow_low_NativeSerial_E_POINTER +#define com_github_jodersky_flow_low_NativeSerial_E_POINTER -7L +#undef com_github_jodersky_flow_low_NativeSerial_E_POLL +#define com_github_jodersky_flow_low_NativeSerial_E_POLL -8L +#undef com_github_jodersky_flow_low_NativeSerial_E_IO +#define com_github_jodersky_flow_low_NativeSerial_E_IO -9L +#undef com_github_jodersky_flow_low_NativeSerial_E_CLOSE +#define com_github_jodersky_flow_low_NativeSerial_E_CLOSE -10L +/* + * Class: com_github_jodersky_flow_low_NativeSerial + * Method: open + * Signature: (Ljava/lang/String;I[J)I + */ +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open + (JNIEnv *, jclass, jstring, jint, jlongArray); + +/* + * Class: com_github_jodersky_flow_low_NativeSerial + * Method: read + * Signature: (J[B)I + */ +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read + (JNIEnv *, jclass, jlong, jbyteArray); + +/* + * Class: com_github_jodersky_flow_low_NativeSerial + * Method: write + * Signature: (J[B)I + */ +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write + (JNIEnv *, jclass, jlong, jbyteArray); + +/* + * Class: com_github_jodersky_flow_low_NativeSerial + * Method: close + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close + (JNIEnv *, jclass, jlong); + +/* + * Class: com_github_jodersky_flow_low_NativeSerial + * Method: debug + * Signature: (Z)V + */ +JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug + (JNIEnv *, jclass, jboolean); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/main/src/main/c/flow.c b/main/src/main/c/flow.c index 3ed66b6..89813ce 100644 --- a/main/src/main/c/flow.c +++ b/main/src/main/c/flow.c @@ -37,7 +37,7 @@ #include <termios.h> #include <fcntl.h> #include <poll.h> -#include "com_github_jodersky_flow_NativeSerial.h" +#include "com_github_jodersky_flow_low_NativeSerial.h" #define E_PERMISSION -1 #define E_OPEN -2 @@ -236,7 +236,7 @@ inline jlong s2j(struct serial_config* pointer) { return (jlong) pointer; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_open (JNIEnv *env, jclass clazz, jstring device, jint baud, jlongArray jserialp) { const char *dev = (*env)->GetStringUTFChars(env, device, 0); @@ -250,13 +250,13 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_open return r; } -JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_close +JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_close (JNIEnv * env, jclass clazz, jlong serial) { serial_close(j2s(serial)); } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_read (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer) { @@ -272,7 +272,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_read return n; } -JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write +JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_low_NativeSerial_write (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer) { unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL); @@ -284,7 +284,7 @@ JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_NativeSerial_write return r; } -JNIEXPORT void JNICALL Java_com_github_jodersky_flow_NativeSerial_debug +JNIEXPORT void JNICALL Java_com_github_jodersky_flow_low_NativeSerial_debug (JNIEnv *env, jclass clazz, jboolean value) { debug = (bool) value; diff --git a/main/src/main/java/com/github/jodersky/flow/NativeSerial.java b/main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java index f5d708e..6bdcde5 100644 --- a/main/src/main/java/com/github/jodersky/flow/NativeSerial.java +++ b/main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java @@ -1,6 +1,6 @@ -package com.github.jodersky.flow; +package com.github.jodersky.flow.low; -public class NativeSerial { +class NativeSerial { static { NativeLoader.load(); diff --git a/main/src/main/scala/com/github/jodersky/flow/Framing.scala b/main/src/main/scala/com/github/jodersky/flow/Framing.scala new file mode 100644 index 0000000..f8173a7 --- /dev/null +++ b/main/src/main/scala/com/github/jodersky/flow/Framing.scala @@ -0,0 +1,104 @@ +package com.github.jodersky.flow + +import akka.io.PipelineContext +import akka.io.SymmetricPipePair +import akka.io.SymmetricPipelineStage +import akka.util.ByteString +import java.nio.ByteOrder +import scala.annotation.tailrec +import java.nio.ByteBuffer + +class DelimitedFrame( + StartByte: Byte, + StopByte: Byte, + EscapeByte: Byte) + //byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN) + extends SymmetricPipelineStage[PipelineContext, ByteString, ByteString] { + + // range checks omitted ... + + override def apply(ctx: PipelineContext) = + new SymmetricPipePair[ByteString, ByteString] { + var buffer = ByteString.empty + //implicit val byteOrder = DelimitedFrame.this.byteOrder + + sealed trait State + case object Waiting extends State + case object Accepting extends State + case object Escaping extends State + + def extractFrame(bs: ByteString, accepted: ByteString, state: State): (ByteString, Option[ByteString]) = { //(remaining, frame)) + if (bs.isEmpty && state == Waiting) (ByteString.empty, None) + else if (bs.isEmpty) (accepted, None) + else { + val in = bs.head + + state match { + case Waiting if (in == StartByte) => extractFrame(bs.tail, accepted, Accepting) + case Escaping => extractFrame(bs.tail, accepted ++ ByteString(in), Accepting) + case Accepting => in match { + case EscapeByte => extractFrame(bs.tail, accepted, Escaping) + case StartByte => extractFrame(bs.tail, ByteString.empty, Accepting) + case StopByte => (bs.tail, Some(accepted)) + case other => extractFrame(bs.tail, accepted ++ ByteString(other), Accepting) + } + case _ => extractFrame(bs.tail, accepted, state) + } + } + } + + def extractFrames(bs: ByteString, accepted: List[ByteString]): (ByteString, List[ByteString]) = { + val (remainder, frame) = extractFrame(bs, ByteString.empty, Waiting) + + frame match { + case None => (remainder, accepted) + case Some(data) => extractFrames(remainder, data :: accepted) + } + } + + /* + * This is how commands (writes) are transformed: calculate length + * including header, write that to a ByteStringBuilder and append the + * payload data. The result is a single command (i.e. `Right(...)`). + */ + override def commandPipeline = { bs: ByteString => + val bb = ByteString.newBuilder + + def escape(b: Byte) = { + bb += EscapeByte + bb += b + } + + bb += StartByte + for (b <- bs) { + b match { + case StartByte => escape(b) + case StopByte => escape(b) + case EscapeByte => escape(b) + case _ => bb += b + } + } + bb += StopByte + + ctx.singleCommand(bb.result) + } + + /* + * This is how events (reads) are transformed: append the received + * ByteString to the buffer (if any) and extract the frames from the + * result. In the end store the new buffer contents and return the + * list of events (i.e. `Left(...)`). + */ + override def eventPipeline = { bs: ByteString => + val data = buffer ++ bs + val (remainder, frames) = extractFrames(data, Nil) + buffer = remainder + + frames match { + case Nil => Nil + case one :: Nil ⇒ ctx.singleEvent(one) + case many ⇒ many reverseMap (Left(_)) + } + } + } +}
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/Serial.scala b/main/src/main/scala/com/github/jodersky/flow/Serial.scala index 7565cbd..3f28ad8 100644 --- a/main/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/main/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -1,63 +1,32 @@ package com.github.jodersky.flow -import scala.concurrent._ -import scala.concurrent.ExecutionContext.Implicits._ -import java.io.IOException +import akka.io._ +import akka.actor.ExtensionKey +import akka.actor.ExtendedActorSystem +import akka.actor.Props +import low.{ Serial => LowSerial } +import akka.actor.ActorRef +import akka.util.ByteString -class Serial private (val port: String, private val pointer: Long, reader: Array[Byte] => Unit) { - future { - var n = 0 - val buffer = new Array[Byte](100) - while (n >= 0) { - n = NativeSerial.read(pointer, buffer) - import NativeSerial._ - n match { - case E_POINTER => throw new NullPointerException("pointer to native serial") - case E_POLL => throw new IOException(port + ": polling") - case E_IO => throw new IOException(port + ": reading") - case E_CLOSE => println("close request") - case x if x > 0 => reader(buffer.take(n)) - } - } - } +object Serial extends ExtensionKey[SerialExt] { - private def writeBlock(data: Array[Byte]) = synchronized { - import NativeSerial._ - NativeSerial.write(pointer, data) match { - case E_POINTER => throw new NullPointerException("pointer to native serial") - case E_IO => throw new IOException(port + ": writing") - case r => data.take(r) - } - } - - def write(data: Array[Byte]) = future { writeBlock(data) } - - def close() = synchronized { - NativeSerial.close(pointer) - } - -} + trait Command + trait Event -object Serial { + case class CommandFailed(command: Command, reason: Throwable) extends Event - def open(port: String, baud: Int)(reader: Array[Byte] => Unit) = synchronized { - val pointer = new Array[Long](1) - val result = NativeSerial.open(port, baud, pointer) + case class Open(handler: ActorRef, port: String, baud: Int) extends Command + case class Opened(operator: ActorRef) extends Command - import NativeSerial._ - result match { - case E_PERMISSION => throw new AccessDeniedException(port) - case E_OPEN => throw new NoSuchPortException(port) - case E_BUSY => throw new PortInUseException(port) - case E_BAUD => throw new IllegalArgumentException( - s"invalid baudrate ${baud}, use standard unix values") - case E_PIPE => throw new IOException("cannot create pipe") - case E_MALLOC => throw new IOException("cannot allocate memory") - case 0 => new Serial(port, pointer(0), reader) - case _ => throw new Exception("cannot open port") - } - } + case object Close extends Command + case class Closed(reason: Throwable) extends Event + + case class Write(data: ByteString) extends Command + case class Wrote(data: ByteString) extends Event - def debug(value: Boolean) = NativeSerial.debug(value) + case class Received(data: ByteString) extends Event +} +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + def manager = system.actorOf(Props[SerialManager], name = "IO-SERIAL") }
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala new file mode 100644 index 0000000..b288a71 --- /dev/null +++ b/main/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -0,0 +1,34 @@ +package com.github.jodersky.flow + +import akka.actor.Actor +import Serial._ +import low.{ Serial => LowSerial } +import scala.util.Success +import scala.util.Failure +import akka.actor.Props + +class SerialManager extends Actor { + import SerialManager._ + import context._ + + def receive = { + case command @ Open(handler, port, baud) => + LowSerial.open(port, baud).onComplete(_ match { + case Success(serial) => { + val operator = context.actorOf(Props(classOf[SerialOperator], serial, handler), name = escapePortString(port)) + handler ! Opened(operator) + } + case Failure(t) => handler ! CommandFailed(command, t) + }) + } + +} + +object SerialManager { + + private def escapePortString(port: String) = port collect { + case '/' => '-' + case c => c + } + +}
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala new file mode 100644 index 0000000..dcfb22b --- /dev/null +++ b/main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -0,0 +1,50 @@ +package com.github.jodersky.flow + +import scala.util.Failure +import scala.util.Success + +import Serial._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.util.ByteString +import low.{ Serial => LowSerial } + +class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor { + import context._ + + context.watch(handler) + startRead() + + def receive = { + case c @ Write(data) => { + val writer = sender + serial.write(data.toArray).onComplete { + case Success(data) => writer ! Wrote(ByteString(data)) + case Failure(t) => writer ! CommandFailed(c, t) + } + } + case Close => { + context.stop(self) + } + } + + private def startRead(): Unit = { + val futureData = serial.read() + futureData.onComplete { + case Failure(t) => { + handler ! Closed(t) + context.stop(self) + } + case Success(data) => { + handler ! Received(ByteString(data)) + startRead() + } + } + } + + override def postStop = { + serial.close() + } + +}
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/main/src/main/scala/com/github/jodersky/flow/exceptions.scala index a360079..c7b2fa1 100644 --- a/main/src/main/scala/com/github/jodersky/flow/exceptions.scala +++ b/main/src/main/scala/com/github/jodersky/flow/exceptions.scala @@ -4,4 +4,5 @@ import java.io.IOException class NoSuchPortException(message: String) extends IOException(message) class PortInUseException(message: String) extends IOException(message) -class AccessDeniedException(message: String) extends IOException(message)
\ No newline at end of file +class AccessDeniedException(message: String) extends IOException(message) +class PortClosingException(message: String) extends IOException(message)
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/NativeLoader.scala b/main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala index 91b3fd8..fda82a6 100644 --- a/main/src/main/scala/com/github/jodersky/flow/NativeLoader.scala +++ b/main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package com.github.jodersky.flow.low import java.io.File import java.io.FileOutputStream diff --git a/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala new file mode 100644 index 0000000..ac58187 --- /dev/null +++ b/main/src/main/scala/com/github/jodersky/flow/low/Serial.scala @@ -0,0 +1,71 @@ +package com.github.jodersky.flow.low + +import scala.concurrent._ +import scala.concurrent.ExecutionContext.Implicits._ +import java.io.IOException +import com.github.jodersky.flow.AccessDeniedException +import com.github.jodersky.flow.NoSuchPortException +import com.github.jodersky.flow.PortInUseException +import com.github.jodersky.flow.PortClosingException +import scala.util.Try + +class Serial private (val port: String, private val pointer: Long) { + import NativeSerial._ + + private def doRead(): Array[Byte] = synchronized { + val buffer = new Array[Byte](100) + NativeSerial.read(pointer, buffer) match { + case E_POINTER => throw new NullPointerException("pointer to native serial") + case E_POLL => throw new IOException(port + ": polling") + case E_IO => throw new IOException(port + ": reading") + case E_CLOSE => throw new PortClosingException(port + " closing") + case bytes if bytes > 0 => buffer.take(bytes) + case error => throw new IOException(s"unknown read error ${error}") + } + } + + private def doWrite(data: Array[Byte]): Array[Byte] = { + import NativeSerial._ + NativeSerial.write(pointer, data) match { + case E_POINTER => throw new NullPointerException("pointer to native serial") + case E_IO => throw new IOException(port + ": writing") + case bytes if bytes > 0 => data.take(bytes) + case error => throw new IOException(s"unknown write error ${error}") + } + } + + def read() = future { doRead() } + + def write(data: Array[Byte]) = future { doWrite(data) } + + def close() = { + NativeSerial.close(pointer) + } + +} + +object Serial { + + def doOpen(port: String, baud: Int) = synchronized { + val pointer = new Array[Long](1) + val result = NativeSerial.open(port, baud, pointer) + + import NativeSerial._ + + result match { + case E_PERMISSION => throw new AccessDeniedException(port) + case E_OPEN => throw new NoSuchPortException(port) + case E_BUSY => throw new PortInUseException(port) + case E_BAUD => throw new IllegalArgumentException(s"invalid baudrate ${baud}, use standard unix values") + case E_PIPE => throw new IOException("cannot create pipe") + case E_MALLOC => throw new IOException("cannot allocate memory for serial port") + case 0 => new Serial(port, pointer(0)) + case error => throw new IOException(s"unknown error ${error}") + } + } + + def open(port: String, baud: Int) = future { doOpen(port, baud) } + + def debug(value: Boolean) = NativeSerial.debug(value) + +}
\ No newline at end of file diff --git a/main/src/main/scala/com/github/jodersky/flow/test.sc b/main/src/main/scala/com/github/jodersky/flow/test.sc new file mode 100644 index 0000000..88a2193 --- /dev/null +++ b/main/src/main/scala/com/github/jodersky/flow/test.sc @@ -0,0 +1,52 @@ +package com.github.jodersky.flow + +import akka.io.PipelineContext +import akka.io.SymmetricPipePair +import akka.io.SymmetricPipelineStage +import akka.util.ByteString +import java.nio.ByteOrder +import scala.annotation.tailrec +import java.nio.ByteBuffer +import akka.io._ +import akka.actor.{IO=>_,_} +import Serial._ + +object test { + + val StartByte = 0: Byte //> StartByte : Byte = 0 + val StopByte = 1: Byte //> StopByte : Byte = 1 + val EscapeByte = 2: Byte //> EscapeByte : Byte = 2 + + val ctx = new PipelineContext{} //> ctx : akka.io.PipelineContext = com.github.jodersky.flow.test$$anonfun$main + //| $1$$anon$1@32bf7190 + + val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte) + //> stages : com.github.jodersky.flow.DelimitedFrame = com.github.jodersky.flow + //| .DelimitedFrame@36ff057f + + val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages) + //> cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk + //| a.util.ByteString]) = <function1> + //| evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk + //| a.util.ByteString]) = <function1> + //| mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akk + //| a.util.ByteString])] = <function1> + + val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)( + t => println("sent command: " + t), // will receive messages of type Try[ByteString] + t => println("got event: " + t) // will receive messages of type Try[Message] + ) //> injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteStri + //| ng] = akka.io.PipelineFactory$$anon$5@70cb6009 + + + val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1)) + //> bs : akka.util.ByteString = ByteString(0, 4, 2, 1, 1, 6, 1) + + injector.injectCommand(bs) //> sent command: Success(ByteString(0, 2, 0, 4, 2, 2, 2, 1, 2, 1, 6, 2, 1, 1)) + injector.injectEvent(bs) //> got event: Success(ByteString(4, 1)) + + implicit val system = ActorSystem("flow") + //> system : akka.actor.ActorSystem = akka://flow| + //IO(Serial) ! Open("s", 9600) + +}
\ No newline at end of file diff --git a/project/Build.scala b/project/Build.scala index 04d82de..ccee261 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -28,7 +28,7 @@ object FlowBuild extends Build { lazy val jniSettings = JNIBuild.defaults ++ Seq( jdkHome := file(System.getProperty("java.home")) / "..", - javaClass := "com.github.jodersky.flow.NativeSerial", + javaClass := "com.github.jodersky.flow.low.NativeSerial", NativeBuild.compiler := "gcc", options := Seq("-fPIC"), NativeBuild.includeDirectories <<= jdkHome apply (jdk => Seq(jdk / "include", jdk / "include" / "linux")), @@ -49,9 +49,11 @@ object FlowBuild extends Build { } object Dependencies { - lazy val all = Seq() lazy val io = "com.github.scala-incubator.io" %% "scala-io-core" % "0.4.2" lazy val file = "com.github.scala-incubator.io" %% "scala-io-file" % "0.4.2" + lazy val akka = "com.typesafe.akka" %% "akka-actor" % "2.2-M3" + + lazy val all = Seq(akka) } |