diff options
author | Jakob Odersky <jodersky@gmail.com> | 2013-06-27 19:27:57 +0200 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2013-06-27 19:27:57 +0200 |
commit | 719978035732a55261b753bbc33570d3c1f53785 (patch) | |
tree | de110fafce8b2ca11f7061f4c8f4b93a4baf548a /flow-main/src/main | |
parent | f21d2de9405d5dd36f108a380f558cab930c1205 (diff) | |
download | akka-serial-719978035732a55261b753bbc33570d3c1f53785.tar.gz akka-serial-719978035732a55261b753bbc33570d3c1f53785.tar.bz2 akka-serial-719978035732a55261b753bbc33570d3c1f53785.zip |
refactor build to a more generic structure
Diffstat (limited to 'flow-main/src/main')
8 files changed, 390 insertions, 0 deletions
diff --git a/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java new file mode 100644 index 0000000..dba3f44 --- /dev/null +++ b/flow-main/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java @@ -0,0 +1,68 @@ +package com.github.jodersky.flow.internal; + +import com.github.jodersky.flow.internal.NativeLoader; + +/** Thin layer on top of native code. */ +class NativeSerial { + + static { + NativeLoader.load(); + } + + final static int E_IO = -1; + final static int E_ACCESS_DENIED = -2; + final static int E_BUSY = -3; + final static int E_INVALID_BAUD = -4; + final static int E_INTERRUPT = -5; + final static int E_NO_PORT = -6; + + /**Opens a serial port and allocates memory for storing configuration. Note: if this function fails, + * any internally allocated resources will be freed. + * @param port_name name of port + * @param baud baud rate + * @param serial pointer to memory that will be allocated with a serial structure + * @return 0 on success + * @return E_NO_PORT if the given port does not exist + * @return E_ACCESS_DENIED if permissions are not sufficient to open port + * @return E_BUSY if port is already in use + * @return E_INVALID_BAUD if specified baudrate is non-standard + * @return E_IO on other error */ + native static int open(String device, int baud, long[] serial); + + /**Starts a blocking read from a previously opened serial port. The read is blocking, however it may be + * interrupted by calling 'serial_interrupt' on the given serial port. + * @param serial pointer to serial configuration from which to read + * @param buffer buffer into which data is read + * @param size maximum buffer size + * @return n>0 the number of bytes read into buffer + * @return E_INTERRUPT if the call to this function was interrupted + * @return E_IO on IO error */ + native static int read(long serial, byte[] buffer); + + /**Writes data to a previously opened serial port. + * @param serial pointer to serial configuration to which to write + * @param data data to write + * @param size number of bytes to write from data + * @return n>0 the number of bytes written + * @return E_IO on IO error */ + native static int write(long serial, byte[] buffer); + + /**Interrupts a blocked read call. + * @param serial_config the serial port to interrupt + * @return 0 on success + * @return E_IO on error */ + native static int interrupt(long serial); + + /**Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to + * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT + * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per + * close manual page, close should not be called on a file descriptor that is in use by another thread). + * @param serial pointer to serial configuration that is to be closed (and freed) + * @return 0 on success + * @return E_IO on error */ + native static int close(long serial); + + /**Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. */ + native static void debug(boolean value); + +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala new file mode 100644 index 0000000..6e4aefb --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -0,0 +1,25 @@ +package com.github.jodersky.flow + +import akka.actor.ActorRef +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + trait Command + trait Event + + case class Open(handler: ActorRef, port: String, baud: Int) extends Command + case class Opened(port: String) extends Event + case class OpenFailed(port: String, reason: Throwable) extends Event + + case class Received(data: ByteString) extends Event + + case class Write(data: ByteString, ack: Boolean = false) extends Command + case class Wrote(data: ByteString) extends Event + + case object Close extends Command + case class Closed(error: Option[Exception]) extends Event + +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala new file mode 100644 index 0000000..080ddd3 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala @@ -0,0 +1,9 @@ +package com.github.jodersky.flow + +import akka.actor.ExtendedActorSystem +import akka.actor.Props +import akka.io.IO + +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala new file mode 100644 index 0000000..688ae3c --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -0,0 +1,55 @@ +package com.github.jodersky.flow + +import java.io.IOException + +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import com.github.jodersky.flow.internal.InternalSerial + +import Serial.Open +import Serial.OpenFailed +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.SupervisorStrategy.Escalate +import akka.actor.SupervisorStrategy.Stop +import akka.actor.actorRef2Scala + +class SerialManager extends Actor with ActorLogging { + import SerialManager._ + import context._ + + override val supervisorStrategy = + OneForOneStrategy() { + case _: IOException => Stop + case _: Exception => Escalate + } + + def receive = { + case Open(handler, port, baud) => Try { InternalSerial.open(port, baud) } match { + case Failure(t) => { + log.debug(s"failed to open low serial port at ${port}, baud ${baud}, reason: " + t.getMessage()) + handler ! OpenFailed(port, t) + } + + case Success(serial) => { + log.debug(s"opened low-level serial port at ${port}, baud ${baud}") + context.actorOf(Props(classOf[SerialOperator], handler, serial), name = escapePortString(port)) + } + + } + } + +} + +object SerialManager { + + private def escapePortString(port: String) = port collect { + case '/' => '-' + case c => c + } + +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala new file mode 100644 index 0000000..604dc74 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -0,0 +1,99 @@ +package com.github.jodersky.flow + +import java.io.IOException + +import com.github.jodersky.flow.internal.InternalSerial + +import Serial.Close +import Serial.Closed +import Serial.Opened +import Serial.Received +import Serial.Write +import Serial.Wrote +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Terminated +import akka.actor.actorRef2Scala +import akka.util.ByteString + +class SerialOperator(handler: ActorRef, serial: InternalSerial) extends Actor with ActorLogging { + import context._ + + case class ReadException(ex: Exception) + + object Reader extends Thread { + + def enterReadLoop() = { + var continueReading = true + while (continueReading) { + try { + val data = ByteString(serial.read()) + handler ! Received(data) + } catch { + + //port is closing, stop thread gracefully + case ex: PortInterruptedException => { + continueReading = false + } + + //something else went wrong stop and tell actor + case ex: Exception => { + continueReading = false + self ! ReadException(ex) + } + } + } + } + + def name = this.getName() + + override def run() { + this.setName("flow-reader " + serial.port) + log.debug(name + ": started reader thread") + enterReadLoop() + log.debug(name + ": exiting") + } + + } + + override def preStart() = { + context watch handler + handler ! Opened(serial.port) + Reader.start() + } + + override def postStop = { + serial.close() + } + + def receive: Receive = { + + case Write(data, ack) => { + try { + val sent = serial.write(data.toArray) + if (ack) sender ! Wrote(ByteString(sent)) + } catch { + case ex: IOException => { + handler ! Closed(Some(ex)) + context stop self + } + } + } + + case Close => { + handler ! Closed(None) + context stop self + } + + case Terminated(`handler`) => context.stop(self) + + //go down with reader thread + case ReadException(ex) => { + handler ! Closed(Some(ex)) + context stop self + } + + } + +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala new file mode 100644 index 0000000..0422018 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala @@ -0,0 +1,10 @@ +package com.github.jodersky.flow + +import java.io.IOException + +class NoSuchPortException(message: String) extends Exception(message) +class PortInUseException(message: String) extends Exception(message) +class AccessDeniedException(message: String) extends Exception(message) +class IllegalBaudRateException(message: String) extends Exception(message) +class PortInterruptedException(message: String) extends Exception(message) +class PortClosedException(message: String) extends Exception(message)
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala new file mode 100644 index 0000000..315f395 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala @@ -0,0 +1,88 @@ +package com.github.jodersky.flow.internal + +import java.io.IOException +import com.github.jodersky.flow._ +import java.util.concurrent.atomic.AtomicBoolean + +/** Wraps NativeSerial in a more object-oriented style, still quite low level. */ +class InternalSerial private (val port: String, private val pointer: Long) { + import InternalSerial._ + + private val reading = new AtomicBoolean(false) + private val writing = new AtomicBoolean(false) + private val closed = new AtomicBoolean(false) + + /** Closes the underlying serial connection. Any threads blocking on read or write will return. */ + def close(): Unit = synchronized { + if (!closed.get()) { + closed.set(true) + except(NativeSerial.interrupt(pointer), port) + if (writing.get()) wait() // if reading, wait for read to finish + if (reading.get()) wait() + except(NativeSerial.close(pointer), port) + } + } + + /** Read data from underlying serial connection. + * @throws PortInterruptedException if port is closed from another thread */ + def read(): Array[Byte] = if (!closed.get) { + reading.set(true) + try { + val buffer = new Array[Byte](100) + val bytesRead = except(NativeSerial.read(pointer, buffer), port) + buffer take bytesRead + } finally { + synchronized { + reading.set(false) + if (closed.get) notify(); //read was interrupted by close + } + } + } else { + throw new PortClosedException(s"port ${port} is already closed") + } + + /** Write data to underlying serial connection. + * @throws PortInterruptedException if port is closed from another thread */ + def write(data: Array[Byte]): Array[Byte] = if (!closed.get) { + writing.set(true) + try { + val bytesWritten = except(NativeSerial.write(pointer, data), port) + data take bytesWritten + } finally { + synchronized { + writing.set(false) + if (closed.get) notify() + } + } + } else { + throw new PortClosedException(s"port ${port} is already closed") + } + +} + +object InternalSerial { + import NativeSerial._ + + /** Transform error code to exception if necessary. */ + private def except(result: Int, port: String): Int = result match { + case E_IO => throw new IOException(port) + case E_ACCESS_DENIED => throw new AccessDeniedException(port) + case E_BUSY => throw new PortInUseException(port) + case E_INVALID_BAUD => throw new IllegalBaudRateException("use standard baud rate") + case E_INTERRUPT => throw new PortInterruptedException(port) + case E_NO_PORT => throw new NoSuchPortException(port) + case error if error < 0 => throw new IOException(s"unknown error code: ${error}") + case success => success + } + + /** Open a new connection to a serial port. */ + def open(port: String, baud: Int): InternalSerial = synchronized { + val pointer = new Array[Long](1) + except(NativeSerial.open(port, baud, pointer), port) + new InternalSerial(port, pointer(0)) + } + + /** Set debugging for all serial connections. Debugging results in printing extra messages (from the native library) in case of errors. */ + def debug(value: Boolean) = NativeSerial.debug(value) + +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala new file mode 100644 index 0000000..aebbe3f --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala @@ -0,0 +1,36 @@ +package com.github.jodersky.flow.internal + +import java.io.File +import java.io.FileOutputStream + +/**Loads the current system's native library for flow. */ +object NativeLoader { + + def load = { + val os = System.getProperty("os.name").toLowerCase + val arch = System.getProperty("os.arch").toLowerCase + + val in = NativeLoader.getClass().getResourceAsStream("/native/" + os + "/" + arch + "/" + "libflow.so") + val temp = File.createTempFile("flow" + os + arch, ".so"); + temp.deleteOnExit() + val out = new FileOutputStream(temp); + + try { + var read: Int = 0; ; + val buffer = new Array[Byte](4096); + do { + read = in.read(buffer) + if (read != -1) { + out.write(buffer, 0, read); + } + } while (read != -1) + } finally { + in.close() + out.close + } + + System.load(temp.getAbsolutePath()) + + } + +}
\ No newline at end of file |