aboutsummaryrefslogtreecommitdiff
path: root/flow-main
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-06-18 18:05:20 +0200
committerJakob Odersky <jodersky@gmail.com>2013-06-18 18:05:20 +0200
commitb0c32f5325702dd7f7ef3d5ccc0eb9a2b972cf7a (patch)
tree7030bd58c071fecb0ca737c08c295849d1460e45 /flow-main
parentdf6a79639dfb5241c57d9af4c6e2274de68166e0 (diff)
downloadakka-serial-b0c32f5325702dd7f7ef3d5ccc0eb9a2b972cf7a.tar.gz
akka-serial-b0c32f5325702dd7f7ef3d5ccc0eb9a2b972cf7a.tar.bz2
akka-serial-b0c32f5325702dd7f7ef3d5ccc0eb9a2b972cf7a.zip
use sbt-native and jni plugin
Diffstat (limited to 'flow-main')
-rw-r--r--flow-main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java49
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/Framing.scalac104
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/Serial.scalac34
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scalac35
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scalac44
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala8
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala35
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/low/Serial.scala65
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/test.sc52
9 files changed, 426 insertions, 0 deletions
diff --git a/flow-main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java b/flow-main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java
new file mode 100644
index 0000000..6bdcde5
--- /dev/null
+++ b/flow-main/src/main/java/com/github/jodersky/flow/low/NativeSerial.java
@@ -0,0 +1,49 @@
+package com.github.jodersky.flow.low;
+
+class NativeSerial {
+
+ static {
+ NativeLoader.load();
+ }
+
+ final static int E_PERMISSION = -1;
+ final static int E_OPEN = -2;
+ final static int E_BUSY = -3;
+ final static int E_BAUD = -4;
+ final static int E_PIPE = -5;
+ final static int E_MALLOC = -6;
+ final static int E_POINTER = -7;
+ final static int E_POLL = -8;
+ final static int E_IO = -9;
+ final static int E_CLOSE = -10;
+
+
+ /* return values:
+ * 0 ok
+ * E_PERMISSION don't have permission to open
+ * E_OPEN can't get file descriptor
+ * E_BUSY device busy
+ * E_BAUD invalid baudrate
+ * E_PIPE can't open pipe for graceful closing
+ * E_MALLOC malloc error */
+ native static int open(String device, int baud, long[] serial);
+
+ /* return
+ * >0 number of bytes read
+ * E_POINTER invalid serial pointer
+ * E_POLL poll error
+ * E_IO read error
+ * E_CLOSE close request */
+ native static int read(long serial, byte[] buffer);
+
+ /*return
+ * >0 number of bytes written
+ * E_POINTER invalid serial config (null pointer)
+ * E_IO write error */
+ native static int write(long serial, byte[] buffer);
+
+ native static void close(long serial);
+
+ native static void debug(boolean value);
+
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Framing.scalac b/flow-main/src/main/scala/com/github/jodersky/flow/Framing.scalac
new file mode 100644
index 0000000..f8173a7
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Framing.scalac
@@ -0,0 +1,104 @@
+package com.github.jodersky.flow
+
+import akka.io.PipelineContext
+import akka.io.SymmetricPipePair
+import akka.io.SymmetricPipelineStage
+import akka.util.ByteString
+import java.nio.ByteOrder
+import scala.annotation.tailrec
+import java.nio.ByteBuffer
+
+class DelimitedFrame(
+ StartByte: Byte,
+ StopByte: Byte,
+ EscapeByte: Byte)
+ //byteOrder: ByteOrder = ByteOrder.BIG_ENDIAN)
+ extends SymmetricPipelineStage[PipelineContext, ByteString, ByteString] {
+
+ // range checks omitted ...
+
+ override def apply(ctx: PipelineContext) =
+ new SymmetricPipePair[ByteString, ByteString] {
+ var buffer = ByteString.empty
+ //implicit val byteOrder = DelimitedFrame.this.byteOrder
+
+ sealed trait State
+ case object Waiting extends State
+ case object Accepting extends State
+ case object Escaping extends State
+
+ def extractFrame(bs: ByteString, accepted: ByteString, state: State): (ByteString, Option[ByteString]) = { //(remaining, frame))
+ if (bs.isEmpty && state == Waiting) (ByteString.empty, None)
+ else if (bs.isEmpty) (accepted, None)
+ else {
+ val in = bs.head
+
+ state match {
+ case Waiting if (in == StartByte) => extractFrame(bs.tail, accepted, Accepting)
+ case Escaping => extractFrame(bs.tail, accepted ++ ByteString(in), Accepting)
+ case Accepting => in match {
+ case EscapeByte => extractFrame(bs.tail, accepted, Escaping)
+ case StartByte => extractFrame(bs.tail, ByteString.empty, Accepting)
+ case StopByte => (bs.tail, Some(accepted))
+ case other => extractFrame(bs.tail, accepted ++ ByteString(other), Accepting)
+ }
+ case _ => extractFrame(bs.tail, accepted, state)
+ }
+ }
+ }
+
+ def extractFrames(bs: ByteString, accepted: List[ByteString]): (ByteString, List[ByteString]) = {
+ val (remainder, frame) = extractFrame(bs, ByteString.empty, Waiting)
+
+ frame match {
+ case None => (remainder, accepted)
+ case Some(data) => extractFrames(remainder, data :: accepted)
+ }
+ }
+
+ /*
+ * This is how commands (writes) are transformed: calculate length
+ * including header, write that to a ByteStringBuilder and append the
+ * payload data. The result is a single command (i.e. `Right(...)`).
+ */
+ override def commandPipeline = { bs: ByteString =>
+ val bb = ByteString.newBuilder
+
+ def escape(b: Byte) = {
+ bb += EscapeByte
+ bb += b
+ }
+
+ bb += StartByte
+ for (b <- bs) {
+ b match {
+ case StartByte => escape(b)
+ case StopByte => escape(b)
+ case EscapeByte => escape(b)
+ case _ => bb += b
+ }
+ }
+ bb += StopByte
+
+ ctx.singleCommand(bb.result)
+ }
+
+ /*
+ * This is how events (reads) are transformed: append the received
+ * ByteString to the buffer (if any) and extract the frames from the
+ * result. In the end store the new buffer contents and return the
+ * list of events (i.e. `Left(...)`).
+ */
+ override def eventPipeline = { bs: ByteString =>
+ val data = buffer ++ bs
+ val (remainder, frames) = extractFrames(data, Nil)
+ buffer = remainder
+
+ frames match {
+ case Nil => Nil
+ case one :: Nil ⇒ ctx.singleEvent(one)
+ case many ⇒ many reverseMap (Left(_))
+ }
+ }
+ }
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scalac b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scalac
new file mode 100644
index 0000000..7182425
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scalac
@@ -0,0 +1,34 @@
+package com.github.jodersky.flow
+
+import akka.io._
+import akka.actor.ExtensionKey
+import akka.actor.ExtendedActorSystem
+import akka.actor.Props
+import low.{ Serial => LowSerial }
+import akka.actor.ActorRef
+import akka.util.ByteString
+
+object Serial extends ExtensionKey[SerialExt] {
+
+ trait Command
+ trait Event
+
+ case class Open(handler: ActorRef, port: String, baud: Int) extends Command
+ case class Opened(operator: ActorRef) extends Event
+
+ case class Received(data: ByteString) extends Event
+
+ case class Write(data: ByteString) extends Command
+ case class Wrote(data: ByteString) extends Event
+
+ case object Close extends Command
+
+
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
+
+
+}
+
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ def manager = system.actorOf(Props[SerialManager], name = "IO-SERIAL")
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scalac b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scalac
new file mode 100644
index 0000000..ca3fc6b
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scalac
@@ -0,0 +1,35 @@
+package com.github.jodersky.flow
+
+import akka.actor.Actor
+import Serial._
+import low.{ Serial => LowSerial }
+import scala.util.Success
+import scala.util.Failure
+import akka.actor.Props
+import scala.concurrent._
+
+class SerialManager extends Actor {
+ import SerialManager._
+ import context._
+
+ def receive = {
+ case command @ Open(handler, port, baud) =>
+ future{LowSerial.open(port, baud)}.onComplete(_ match {
+ case Success(serial) => {
+ val operator = context.actorOf(Props(classOf[SerialOperator], serial, handler), name = escapePortString(port))
+ handler ! Opened(operator)
+ }
+ case Failure(t) => sender ! CommandFailed(command, t)
+ })
+ }
+
+}
+
+object SerialManager {
+
+ private def escapePortString(port: String) = port collect {
+ case '/' => '-'
+ case c => c
+ }
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scalac b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scalac
new file mode 100644
index 0000000..21d2067
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scalac
@@ -0,0 +1,44 @@
+package com.github.jodersky.flow
+
+import scala.util.Failure
+import scala.util.Success
+import Serial._
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.util.ByteString
+import low.{ Serial => LowSerial }
+import scala.util.Try
+import scala.concurrent._
+
+class SerialOperator(serial: LowSerial, handler: ActorRef) extends Actor {
+ import context._
+
+ context.watch(handler)
+
+ class Reader extends Actor {
+ while (true) {
+ val data = ByteString(serial.read())
+ handler ! Received(data)
+ }
+ }
+
+ def receive = {
+ case Write(data) => {
+ val writer = sender
+ future{serial.write(data.toArray)}.onComplete {
+ case Success(data) => writer ! Wrote(ByteString(data))
+ case Failure(t) => writer ! CommandFailed(c, t)
+ }
+ }
+
+ case Close => {
+ context.stop(self)
+ }
+ }
+
+ override def postStop = {
+ serial.close()
+ }
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala
new file mode 100644
index 0000000..c7b2fa1
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala
@@ -0,0 +1,8 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+class NoSuchPortException(message: String) extends IOException(message)
+class PortInUseException(message: String) extends IOException(message)
+class AccessDeniedException(message: String) extends IOException(message)
+class PortClosingException(message: String) extends IOException(message) \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala
new file mode 100644
index 0000000..fda82a6
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/low/NativeLoader.scala
@@ -0,0 +1,35 @@
+package com.github.jodersky.flow.low
+
+import java.io.File
+import java.io.FileOutputStream
+
+object NativeLoader {
+
+ def load = {
+ val os = System.getProperty("os.name").toLowerCase
+ val arch = System.getProperty("os.arch").toLowerCase
+
+ val in = NativeLoader.getClass().getResourceAsStream("/native/" + os + "/" + arch + "/" + "libflow.so")
+ val temp = File.createTempFile("flow" + os + arch, ".so");
+ temp.deleteOnExit()
+ val out = new FileOutputStream(temp);
+
+ try {
+ var read: Int = 0; ;
+ val buffer = new Array[Byte](4096);
+ do {
+ read = in.read(buffer)
+ if (read != -1) {
+ out.write(buffer, 0, read);
+ }
+ } while (read != -1)
+ } finally {
+ in.close()
+ out.close
+ }
+
+ System.load(temp.getAbsolutePath())
+
+ }
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/low/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/low/Serial.scala
new file mode 100644
index 0000000..e482bf8
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/low/Serial.scala
@@ -0,0 +1,65 @@
+package com.github.jodersky.flow.low
+
+import scala.concurrent._
+import scala.concurrent.ExecutionContext.Implicits._
+import java.io.IOException
+import com.github.jodersky.flow.AccessDeniedException
+import com.github.jodersky.flow.NoSuchPortException
+import com.github.jodersky.flow.PortInUseException
+import com.github.jodersky.flow.PortClosingException
+import scala.util.Try
+
+class Serial private (val port: String, private val pointer: Long) {
+ import NativeSerial._
+
+ def read(): Array[Byte] = synchronized {
+ val buffer = new Array[Byte](100)
+ NativeSerial.read(pointer, buffer) match {
+ case E_POINTER => throw new NullPointerException("pointer to native serial")
+ case E_POLL => throw new IOException(port + ": polling")
+ case E_IO => throw new IOException(port + ": reading")
+ case E_CLOSE => throw new PortClosingException(port + " closing")
+ case bytes if bytes > 0 => buffer.take(bytes)
+ case error => throw new IOException(s"unknown read error ${error}")
+ }
+ }
+
+ def write(data: Array[Byte]): Array[Byte] = {
+ import NativeSerial._
+ NativeSerial.write(pointer, data) match {
+ case E_POINTER => throw new NullPointerException("pointer to native serial")
+ case E_IO => throw new IOException(port + ": writing")
+ case bytes if bytes > 0 => data.take(bytes)
+ case error => throw new IOException(s"unknown write error ${error}")
+ }
+ }
+
+ def close() = {
+ NativeSerial.close(pointer)
+ }
+
+}
+
+object Serial {
+
+ def open(port: String, baud: Int) = synchronized {
+ val pointer = new Array[Long](1)
+ val result = NativeSerial.open(port, baud, pointer)
+
+ import NativeSerial._
+
+ result match {
+ case E_PERMISSION => throw new AccessDeniedException(port)
+ case E_OPEN => throw new NoSuchPortException(port)
+ case E_BUSY => throw new PortInUseException(port)
+ case E_BAUD => throw new IllegalArgumentException(s"invalid baudrate ${baud}, use standard unix values")
+ case E_PIPE => throw new IOException("cannot create pipe")
+ case E_MALLOC => throw new IOException("cannot allocate memory for serial port")
+ case 0 => new Serial(port, pointer(0))
+ case error => throw new IOException(s"unknown error ${error}")
+ }
+ }
+
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/test.sc b/flow-main/src/main/scala/com/github/jodersky/flow/test.sc
new file mode 100644
index 0000000..88a2193
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/test.sc
@@ -0,0 +1,52 @@
+package com.github.jodersky.flow
+
+import akka.io.PipelineContext
+import akka.io.SymmetricPipePair
+import akka.io.SymmetricPipelineStage
+import akka.util.ByteString
+import java.nio.ByteOrder
+import scala.annotation.tailrec
+import java.nio.ByteBuffer
+import akka.io._
+import akka.actor.{IO=>_,_}
+import Serial._
+
+object test {
+
+ val StartByte = 0: Byte //> StartByte : Byte = 0
+ val StopByte = 1: Byte //> StopByte : Byte = 1
+ val EscapeByte = 2: Byte //> EscapeByte : Byte = 2
+
+ val ctx = new PipelineContext{} //> ctx : akka.io.PipelineContext = com.github.jodersky.flow.test$$anonfun$main
+ //| $1$$anon$1@32bf7190
+
+ val stages = new DelimitedFrame(StartByte, StopByte, EscapeByte)
+ //> stages : com.github.jodersky.flow.DelimitedFrame = com.github.jodersky.flow
+ //| .DelimitedFrame@36ff057f
+
+ val PipelinePorts(cmd, evt, mgmt) = PipelineFactory.buildFunctionTriple(ctx, stages)
+ //> cmd : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString]) = <function1>
+ //| evt : akka.util.ByteString => (Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString]) = <function1>
+ //| mgmt : PartialFunction[AnyRef,(Iterable[akka.util.ByteString], Iterable[akk
+ //| a.util.ByteString])] = <function1>
+
+ val injector = PipelineFactory.buildWithSinkFunctions(ctx, stages)(
+ t => println("sent command: " + t), // will receive messages of type Try[ByteString]
+ t => println("got event: " + t) // will receive messages of type Try[Message]
+ ) //> injector : akka.io.PipelineInjector[akka.util.ByteString,akka.util.ByteStri
+ //| ng] = akka.io.PipelineFactory$$anon$5@70cb6009
+
+
+ val bs = ByteString.fromArray(Array(0,4,2,1,1,6,1))
+ //> bs : akka.util.ByteString = ByteString(0, 4, 2, 1, 1, 6, 1)
+
+ injector.injectCommand(bs) //> sent command: Success(ByteString(0, 2, 0, 4, 2, 2, 2, 1, 2, 1, 6, 2, 1, 1))
+ injector.injectEvent(bs) //> got event: Success(ByteString(4, 1))
+
+ implicit val system = ActorSystem("flow")
+ //> system : akka.actor.ActorSystem = akka://flow|
+ //IO(Serial) ! Open("s", 9600)
+
+} \ No newline at end of file