aboutsummaryrefslogtreecommitdiff
path: root/flow-main
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2013-06-27 19:27:57 +0200
committerJakob Odersky <jodersky@gmail.com>2013-06-27 19:27:57 +0200
commit719978035732a55261b753bbc33570d3c1f53785 (patch)
treede110fafce8b2ca11f7061f4c8f4b93a4baf548a /flow-main
parentf21d2de9405d5dd36f108a380f558cab930c1205 (diff)
downloadakka-serial-719978035732a55261b753bbc33570d3c1f53785.tar.gz
akka-serial-719978035732a55261b753bbc33570d3c1f53785.tar.bz2
akka-serial-719978035732a55261b753bbc33570d3c1f53785.zip
refactor build to a more generic structure
Diffstat (limited to 'flow-main')
-rw-r--r--flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java68
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala25
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala9
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala55
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala99
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala10
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala88
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala36
8 files changed, 390 insertions, 0 deletions
diff --git a/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
new file mode 100644
index 0000000..dba3f44
--- /dev/null
+++ b/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
@@ -0,0 +1,68 @@
+package com.github.jodersky.flow.internal;
+
+import com.github.jodersky.flow.internal.NativeLoader;
+
+/** Thin layer on top of native code. */
+class NativeSerial {
+
+ static {
+ NativeLoader.load();
+ }
+
+ final static int E_IO = -1;
+ final static int E_ACCESS_DENIED = -2;
+ final static int E_BUSY = -3;
+ final static int E_INVALID_BAUD = -4;
+ final static int E_INTERRUPT = -5;
+ final static int E_NO_PORT = -6;
+
+ /**Opens a serial port and allocates memory for storing configuration. Note: if this function fails,
+ * any internally allocated resources will be freed.
+ * @param port_name name of port
+ * @param baud baud rate
+ * @param serial pointer to memory that will be allocated with a serial structure
+ * @return 0 on success
+ * @return E_NO_PORT if the given port does not exist
+ * @return E_ACCESS_DENIED if permissions are not sufficient to open port
+ * @return E_BUSY if port is already in use
+ * @return E_INVALID_BAUD if specified baudrate is non-standard
+ * @return E_IO on other error */
+ native static int open(String device, int baud, long[] serial);
+
+ /**Starts a blocking read from a previously opened serial port. The read is blocking, however it may be
+ * interrupted by calling 'serial_interrupt' on the given serial port.
+ * @param serial pointer to serial configuration from which to read
+ * @param buffer buffer into which data is read
+ * @param size maximum buffer size
+ * @return n>0 the number of bytes read into buffer
+ * @return E_INTERRUPT if the call to this function was interrupted
+ * @return E_IO on IO error */
+ native static int read(long serial, byte[] buffer);
+
+ /**Writes data to a previously opened serial port.
+ * @param serial pointer to serial configuration to which to write
+ * @param data data to write
+ * @param size number of bytes to write from data
+ * @return n>0 the number of bytes written
+ * @return E_IO on IO error */
+ native static int write(long serial, byte[] buffer);
+
+ /**Interrupts a blocked read call.
+ * @param serial_config the serial port to interrupt
+ * @return 0 on success
+ * @return E_IO on error */
+ native static int interrupt(long serial);
+
+ /**Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to
+ * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT
+ * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per
+ * close manual page, close should not be called on a file descriptor that is in use by another thread).
+ * @param serial pointer to serial configuration that is to be closed (and freed)
+ * @return 0 on success
+ * @return E_IO on error */
+ native static int close(long serial);
+
+ /**Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. */
+ native static void debug(boolean value);
+
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
new file mode 100644
index 0000000..6e4aefb
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -0,0 +1,25 @@
+package com.github.jodersky.flow
+
+import akka.actor.ActorRef
+import akka.actor.ExtensionKey
+import akka.util.ByteString
+
+/** Defines messages used by serial IO layer. */
+object Serial extends ExtensionKey[SerialExt] {
+
+ trait Command
+ trait Event
+
+ case class Open(handler: ActorRef, port: String, baud: Int) extends Command
+ case class Opened(port: String) extends Event
+ case class OpenFailed(port: String, reason: Throwable) extends Event
+
+ case class Received(data: ByteString) extends Event
+
+ case class Write(data: ByteString, ack: Boolean = false) extends Command
+ case class Wrote(data: ByteString) extends Event
+
+ case object Close extends Command
+ case class Closed(error: Option[Exception]) extends Event
+
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala
new file mode 100644
index 0000000..080ddd3
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala
@@ -0,0 +1,9 @@
+package com.github.jodersky.flow
+
+import akka.actor.ExtendedActorSystem
+import akka.actor.Props
+import akka.io.IO
+
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
new file mode 100644
index 0000000..688ae3c
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -0,0 +1,55 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+import com.github.jodersky.flow.internal.InternalSerial
+
+import Serial.Open
+import Serial.OpenFailed
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.OneForOneStrategy
+import akka.actor.Props
+import akka.actor.SupervisorStrategy.Escalate
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor.actorRef2Scala
+
+class SerialManager extends Actor with ActorLogging {
+ import SerialManager._
+ import context._
+
+ override val supervisorStrategy =
+ OneForOneStrategy() {
+ case _: IOException => Stop
+ case _: Exception => Escalate
+ }
+
+ def receive = {
+ case Open(handler, port, baud) => Try { InternalSerial.open(port, baud) } match {
+ case Failure(t) => {
+ log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage())
+ handler ! OpenFailed(port, t)
+ }
+
+ case Success(serial) => {
+ log.debug(s"opened low-level serial port at ${port}, baud ${baud}")
+ context.actorOf(Props(classOf[SerialOperator], handler, serial), name = escapePortString(port))
+ }
+
+ }
+ }
+
+}
+
+object SerialManager {
+
+ private def escapePortString(port: String) = port collect {
+ case '/' => '-'
+ case c => c
+ }
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
new file mode 100644
index 0000000..604dc74
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -0,0 +1,99 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+import com.github.jodersky.flow.internal.InternalSerial
+
+import Serial.Close
+import Serial.Closed
+import Serial.Opened
+import Serial.Received
+import Serial.Write
+import Serial.Wrote
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.Terminated
+import akka.actor.actorRef2Scala
+import akka.util.ByteString
+
+class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging {
+ import context._
+
+ case class ReadException(ex: Exception)
+
+ object Reader extends Thread {
+
+ def enterReadLoop() = {
+ var continueReading = true
+ while (continueReading) {
+ try {
+ val data = ByteString(serial.read())
+ handler ! Received(data)
+ } catch {
+
+ //port is closing, stop thread gracefully
+ case ex: PortInterruptedException => {
+ continueReading = false
+ }
+
+ //something else went wrong stop and tell actor
+ case ex: Exception => {
+ continueReading = false
+ self ! ReadException(ex)
+ }
+ }
+ }
+ }
+
+ def name = this.getName()
+
+ override def run() {
+ this.setName("flow-reader " + serial.port)
+ log.debug(name + ": started reader thread")
+ enterReadLoop()
+ log.debug(name + ": exiting")
+ }
+
+ }
+
+ override def preStart() = {
+ context watch handler
+ handler ! Opened(serial.port)
+ Reader.start()
+ }
+
+ override def postStop = {
+ serial.close()
+ }
+
+ def receive: Receive = {
+
+ case Write(data, ack) => {
+ try {
+ val sent = serial.write(data.toArray)
+ if (ack) sender ! Wrote(ByteString(sent))
+ } catch {
+ case ex: IOException => {
+ handler ! Closed(Some(ex))
+ context stop self
+ }
+ }
+ }
+
+ case Close => {
+ handler ! Closed(None)
+ context stop self
+ }
+
+ case Terminated(`handler`) => context.stop(self)
+
+ //go down with reader thread
+ case ReadException(ex) => {
+ handler ! Closed(Some(ex))
+ context stop self
+ }
+
+ }
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala
new file mode 100644
index 0000000..0422018
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala
@@ -0,0 +1,10 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+class NoSuchPortException(message: String) extends Exception(message)
+class PortInUseException(message: String) extends Exception(message)
+class AccessDeniedException(message: String) extends Exception(message)
+class IllegalBaudRateException(message: String) extends Exception(message)
+class PortInterruptedException(message: String) extends Exception(message)
+class PortClosedException(message: String) extends Exception(message) \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
new file mode 100644
index 0000000..315f395
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
@@ -0,0 +1,88 @@
+package com.github.jodersky.flow.internal
+
+import java.io.IOException
+import com.github.jodersky.flow._
+import java.util.concurrent.atomic.AtomicBoolean
+
+/** Wraps NativeSerial in a more object-oriented style, still quite low level. */
+class InternalSerial private (val port: String, private val pointer: Long) {
+ import InternalSerial._
+
+ private val reading = new AtomicBoolean(false)
+ private val writing = new AtomicBoolean(false)
+ private val closed = new AtomicBoolean(false)
+
+ /** Closes the underlying serial connection. Any threads blocking on read or write will return. */
+ def close(): Unit = synchronized {
+ if (!closed.get()) {
+ closed.set(true)
+ except(NativeSerial.interrupt(pointer), port)
+ if (writing.get()) wait() // if reading, wait for read to finish
+ if (reading.get()) wait()
+ except(NativeSerial.close(pointer), port)
+ }
+ }
+
+ /** Read data from underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread */
+ def read(): Array[Byte] = if (!closed.get) {
+ reading.set(true)
+ try {
+ val buffer = new Array[Byte](100)
+ val bytesRead = except(NativeSerial.read(pointer, buffer), port)
+ buffer take bytesRead
+ } finally {
+ synchronized {
+ reading.set(false)
+ if (closed.get) notify(); //read was interrupted by close
+ }
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is already closed")
+ }
+
+ /** Write data to underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread */
+ def write(data: Array[Byte]): Array[Byte] = if (!closed.get) {
+ writing.set(true)
+ try {
+ val bytesWritten = except(NativeSerial.write(pointer, data), port)
+ data take bytesWritten
+ } finally {
+ synchronized {
+ writing.set(false)
+ if (closed.get) notify()
+ }
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is already closed")
+ }
+
+}
+
+object InternalSerial {
+ import NativeSerial._
+
+ /** Transform error code to exception if necessary. */
+ private def except(result: Int, port: String): Int = result match {
+ case E_IO => throw new IOException(port)
+ case E_ACCESS_DENIED => throw new AccessDeniedException(port)
+ case E_BUSY => throw new PortInUseException(port)
+ case E_INVALID_BAUD => throw new IllegalBaudRateException("use standard baud rate")
+ case E_INTERRUPT => throw new PortInterruptedException(port)
+ case E_NO_PORT => throw new NoSuchPortException(port)
+ case error if error < 0 => throw new IOException(s"unknown error code: ${error}")
+ case success => success
+ }
+
+ /** Open a new connection to a serial port. */
+ def open(port: String, baud: Int): InternalSerial = synchronized {
+ val pointer = new Array[Long](1)
+ except(NativeSerial.open(port, baud, pointer), port)
+ new InternalSerial(port, pointer(0))
+ }
+
+ /** Set debugging for all serial connections. Debugging results in printing extra messages (from the native library) in case of errors. */
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
new file mode 100644
index 0000000..aebbe3f
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
@@ -0,0 +1,36 @@
+package com.github.jodersky.flow.internal
+
+import java.io.File
+import java.io.FileOutputStream
+
+/**Loads the current system's native library for flow. */
+object NativeLoader {
+
+ def load = {
+ val os = System.getProperty("os.name").toLowerCase
+ val arch = System.getProperty("os.arch").toLowerCase
+
+ val in = NativeLoader.getClass().getResourceAsStream("/native/" + os + "/" + arch + "/" + "libflow.so")
+ val temp = File.createTempFile("flow" + os + arch, ".so");
+ temp.deleteOnExit()
+ val out = new FileOutputStream(temp);
+
+ try {
+ var read: Int = 0; ;
+ val buffer = new Array[Byte](4096);
+ do {
+ read = in.read(buffer)
+ if (read != -1) {
+ out.write(buffer, 0, read);
+ }
+ } while (read != -1)
+ } finally {
+ in.close()
+ out.close
+ }
+
+ System.load(temp.getAbsolutePath())
+
+ }
+
+} \ No newline at end of file