aboutsummaryrefslogtreecommitdiff
path: root/flow-core
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-01-24 20:21:17 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-03 20:46:28 -0800
commitf865a76c2f441f619b069505b73fcbd1cba1a67c (patch)
tree3f53c519f4575037bdebf8c8399ca25d50649543 /flow-core
parent46c30908f827e27b58166f56efa4f15917c1af4f (diff)
downloadakka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.gz
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.bz2
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.zip
Add support for Akka streams
Diffstat (limited to 'flow-core')
-rw-r--r--flow-core/build.sbt12
-rw-r--r--flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java147
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala9
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala124
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala9
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala48
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala54
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala10
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala19
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala58
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala36
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala158
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala4
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala143
14 files changed, 831 insertions, 0 deletions
diff --git a/flow-core/build.sbt b/flow-core/build.sbt
new file mode 100644
index 0000000..445b7cc
--- /dev/null
+++ b/flow-core/build.sbt
@@ -0,0 +1,12 @@
+import flow.{FlowBuild, Dependencies}
+
+FlowBuild.commonSettings
+
+libraryDependencies += Dependencies.akkaActor
+
+//there are also java sources in this project
+compileOrder in Compile := CompileOrder.Mixed
+
+enablePlugins(JniLoading)
+
+target in javah in Compile := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "src"
diff --git a/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
new file mode 100644
index 0000000..6fac8da
--- /dev/null
+++ b/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
@@ -0,0 +1,147 @@
+package com.github.jodersky.flow.internal;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import ch.jodersky.jni.NativeLoader;
+
+import com.github.jodersky.flow.AccessDeniedException;
+import com.github.jodersky.flow.InvalidSettingsException;
+import com.github.jodersky.flow.NoSuchPortException;
+import com.github.jodersky.flow.PortInUseException;
+import com.github.jodersky.flow.PortInterruptedException;
+
+/**
+ * Low-level wrapper on top of native serial backend.
+ *
+ * WARNING: Methods in this class allocate native structures and deal with pointers.
+ * These pointers are handled as longs by java and are NOT checked for correctness,
+ * therefore passing invalid pointers may have unexpected results, including but not
+ * limited to crashing the VM.
+ *
+ * See SerialConnection for a higher level, more secured wrapper
+ * of serial communication.
+ *
+ * @see com.github.jodersky.flow.internal.SerialConnection
+ */
+final class NativeSerial {
+
+ static {
+ NativeLoader.load("/com/github/jodersky/flow", "flow3");
+ }
+
+ final static int PARITY_NONE = 0;
+ final static int PARITY_ODD = 1;
+ final static int PARITY_EVEN = 2;
+
+ /**
+ * Opens a serial port.
+ *
+ * @param port name of serial port to open
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ * @return address of natively allocated serial configuration structure
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ native static long open(String port, int baud, int characterSize, boolean twoStopBits, int parity)
+ throws NoSuchPortException, AccessDeniedException, PortInUseException, InvalidSettingsException, IOException;
+
+ /**
+ * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only read into the
+ * buffer's allocated memory, its position or limit are not changed.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer to read into
+ * @return number of bytes actually read
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ native static int readDirect(long serial, ByteBuffer buffer)
+ throws IllegalArgumentException, PortInterruptedException, IOException;
+
+ /**
+ * Reads data from a previously opened serial port into an array.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer array to read data into
+ * @return number of bytes actually read
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ native static int read(long serial, byte[] buffer)
+ throws PortInterruptedException, IOException;
+
+ /**
+ * Cancels a read (any caller to read or readDirect will return with a PortInterruptedException). This function may be called from any thread.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ native static void cancelRead(long serial)
+ throws IOException;
+
+ /**
+ * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is only taken from
+ * the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer from which data is taken
+ * @param length actual amount of data that should be taken from the buffer (this is needed since the native
+ * backend does not provide a way to query the buffer's current limit)
+ * @return number of bytes actually written
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws IOException on IO error
+ */
+ native static int writeDirect(long serial, ByteBuffer buffer, int length)
+ throws IllegalArgumentException, IOException;
+
+ /**
+ * Writes data from an array to a previously opened serial port.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer array from which data is taken
+ * @param length actual amount of data that should be taken from the buffer
+ * @return number of bytes actually written
+ * @throws IOException on IO error
+ */
+ native static int write(long serial, byte[] buffer, int length)
+ throws IOException;
+
+ /**
+ * Closes an previously open serial port. Natively allocated resources are freed and the serial pointer becomes invalid,
+ * therefore this function should only be called ONCE per open serial port.
+ *
+ * A port should not be closed while it is used (by a read or write) as this
+ * results in undefined behaviour.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ native static void close(long serial)
+ throws IOException;
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ native static void debug(boolean value);
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala
new file mode 100644
index 0000000..04d64a9
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala
@@ -0,0 +1,9 @@
+package com.github.jodersky.flow
+
+/** Specifies available parities used in serial communication. */
+object Parity extends Enumeration {
+ type Parity = Value
+ val None = Value(0)
+ val Odd = Value(1)
+ val Even = Value(2)
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala
new file mode 100644
index 0000000..d3b8873
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -0,0 +1,124 @@
+package com.github.jodersky.flow
+
+import akka.actor.ExtensionKey
+import akka.util.ByteString
+
+/** Defines messages used by flow's serial IO layer. */
+object Serial extends ExtensionKey[SerialExt] {
+
+ /** Base trait for any flow-related messages. */
+ sealed trait Message
+
+ /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */
+ trait Command extends Message
+
+ /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */
+ trait Event extends Message
+
+ /** A command has failed. */
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
+
+ /**
+ * Open a new serial port.
+ *
+ * Send this command to the serial manager to request the opening of a serial port. The manager will
+ * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port.
+ * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages.
+ *
+ * In case the port is successfully opened, the operator will respond with an `Opened` message.
+ * In case the port cannot be opened, the manager will respond with a `CommandFailed` message.
+ *
+ * @param port name of serial port to open
+ * @param settings settings of serial port to open
+ * @param bufferSize maximum read and write buffer sizes
+ */
+ case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command
+
+ /**
+ * A port has been successfully opened.
+ *
+ * Event sent by a port operator, indicating that a serial port was successfully opened. The sender
+ * of this message is the operator associated to the given serial port.
+ *
+ * @param port name of opened serial port
+ */
+ case class Opened(port: String) extends Event
+
+ /**
+ * Data has been received.
+ *
+ * Event sent by an operator, indicating that data was received on the operator's serial port.
+ *
+ * @param data data received on the port
+ */
+ case class Received(data: ByteString) extends Event
+
+ /**
+ * Write data to a serial port.
+ *
+ * Send this command to an operator to write the given data to its associated serial port.
+ * An acknowledgment may be set, in which case it is sent back to the sender on a successful write.
+ * Note that a successful write does not guarantee the actual transmission of data through the serial port,
+ * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to
+ * be transmitted.
+ *
+ * @param data data to be written to port
+ * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment
+ * is a function 'number of bytes written => event')
+ */
+ case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command
+
+ /**
+ * Special type of acknowledgment that is not sent back.
+ */
+ case object NoAck extends Function1[Int, Event] {
+ def apply(length: Int) = sys.error("cannot apply NoAck")
+ }
+
+ /**
+ * Request closing of port.
+ *
+ * Send this command to an operator to close its associated port. The operator will respond
+ * with a `Closed` message upon closing the serial port.
+ */
+ case object Close extends Command
+
+ /**
+ * A port has been closed.
+ *
+ * Event sent from operator, indicating that its port has been closed.
+ */
+ case object Closed extends Event
+
+ /**
+ * Watch a directory for new ports.
+ *
+ * Send this command to the manager to get notifications when a new port (i.e. file) is created in
+ * the given directory.
+ * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message.
+ *
+ * Note: the sender is also notified of currently existing ports.
+ *
+ * @param directory the directory to watch
+ * @param skipInitial don't get notified of already existing ports
+ *
+ * @see Unwatch
+ * @see Connected
+ */
+ case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command
+
+ /**
+ * Stop receiving notifications about a previously watched directory.
+ *
+ * @param directory the directory to unwatch
+ */
+ case class Unwatch(directory: String = "/dev") extends Command
+
+ /**
+ * A new port (i.e. file) has been detected.
+ *
+ * @param port the absolute file name of the connected port
+ */
+ case class Connected(port: String) extends Event
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala
new file mode 100644
index 0000000..5c1ddf6
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala
@@ -0,0 +1,9 @@
+package com.github.jodersky.flow
+
+import akka.actor.{ ExtendedActorSystem, Props }
+import akka.io.IO
+
+/** Provides the serial IO manager. */
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala
new file mode 100644
index 0000000..d163b0a
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -0,0 +1,48 @@
+package com.github.jodersky.flow
+
+import akka.actor.{ Actor, ActorLogging, OneForOneStrategy }
+import akka.actor.SupervisorStrategy.{ Escalate, Stop }
+import internal.{ SerialConnection, Watcher }
+import scala.util.{ Failure, Success, Try }
+
+/**
+ * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to
+ * a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
+ * @see SerialOperator
+ */
+class SerialManager extends Actor with ActorLogging {
+ import SerialManager._
+ import context._
+
+ override val supervisorStrategy = OneForOneStrategy() {
+ case _: Exception if sender == watcher => Escalate
+ case _: Exception => Stop
+ }
+
+ private val watcher = actorOf(Watcher(self), "watcher")
+
+ def receive = {
+
+ case open @ Serial.Open(port, settings, bufferSize) => Try {
+ SerialConnection.open(port, settings)
+ } match {
+ case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
+ case Failure(err) => sender ! Serial.CommandFailed(open, err)
+ }
+
+ case w: Serial.Watch => watcher.forward(w)
+
+ case u: Serial.Unwatch => watcher.forward(u)
+
+ }
+
+}
+
+object SerialManager {
+
+ private def escapePortString(port: String) = port collect {
+ case '/' => '-'
+ case c => c
+ }
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
new file mode 100644
index 0000000..ec0ee27
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -0,0 +1,54 @@
+package com.github.jodersky.flow
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala }
+import internal.{ Reader, SerialConnection, ThreadDied }
+import java.nio.ByteBuffer
+
+/**
+ * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
+ * @see SerialManager
+ */
+class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging {
+ import SerialOperator._
+ import context._
+
+ val readBuffer = ByteBuffer.allocateDirect(bufferSize)
+ val reader = new Reader(connection, readBuffer, self, client)
+ val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
+
+ context.watch(client)
+ client ! Serial.Opened(connection.port)
+ reader.start()
+
+ override def postStop = {
+ connection.close()
+ }
+
+ def receive: Receive = {
+
+ case Serial.Write(data, ack) => {
+ writeBuffer.clear()
+ data.copyToBuffer(writeBuffer)
+ val sent = connection.write(writeBuffer)
+ if (ack != Serial.NoAck) sender ! ack(sent)
+ }
+
+ case Serial.Close => {
+ client ! Serial.Closed
+ context stop self
+ }
+
+ case Terminated(`client`) => {
+ context stop self
+ }
+
+ //go down with reader thread
+ case ThreadDied(`reader`, ex) => throw ex
+
+ }
+
+}
+
+object SerialOperator {
+ def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
new file mode 100644
index 0000000..087fa6e
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
@@ -0,0 +1,10 @@
+package com.github.jodersky.flow
+
+/**
+ * Groups settings used in communication over a serial port.
+ * @param baud baud rate to use with serial port
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ */
+case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala
new file mode 100644
index 0000000..ebc0e65
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala
@@ -0,0 +1,19 @@
+package com.github.jodersky.flow
+
+/** The requested port could not be found. */
+class NoSuchPortException(message: String) extends Exception(message)
+
+/** The requested port is in use by someone else. */
+class PortInUseException(message: String) extends Exception(message)
+
+/** Permissions are not sufficient to open a serial port. */
+class AccessDeniedException(message: String) extends Exception(message)
+
+/** The settings specified are invalid. */
+class InvalidSettingsException(message: String) extends Exception(message)
+
+/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */
+class PortInterruptedException(message: String) extends Exception(message)
+
+/** The specified port has been closed. */
+class PortClosedException(message: String) extends Exception(message)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
new file mode 100644
index 0000000..a5fdc40
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
@@ -0,0 +1,58 @@
+package com.github.jodersky.flow
+package internal
+
+import java.io.{ File, FileOutputStream, InputStream, OutputStream }
+
+/** Handles loading of the current platform's native library for flow. */
+object NativeLoader {
+
+ private final val BufferSize = 4096
+
+ private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "")
+
+ private def arch = System.getProperty("os.arch").toLowerCase
+
+ /** Extract a resource from this class loader to a temporary file. */
+ private def extract(path: String, prefix: String): Option[File] = {
+ var in: Option[InputStream] = None
+ var out: Option[OutputStream] = None
+
+ try {
+ in = Option(NativeLoader.getClass.getResourceAsStream(path))
+ if (in.isEmpty) return None
+
+ val file = File.createTempFile(prefix, "")
+ out = Some(new FileOutputStream(file))
+
+ val buffer = new Array[Byte](BufferSize)
+ var length = -1;
+ do {
+ length = in.get.read(buffer)
+ if (length != -1) out.get.write(buffer, 0, length)
+ } while (length != -1)
+
+ Some(file)
+ } finally {
+ in.foreach(_.close)
+ out.foreach(_.close)
+ }
+ }
+
+ private def loadFromJar(library: String) = {
+ val fqlib = System.mapLibraryName(library) //fully qualified library name
+ val path = s"/native/${os}-${arch}/${fqlib}"
+ extract(path, fqlib) match {
+ case Some(file) => System.load(file.getAbsolutePath)
+ case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " +
+ "the native library does not exist for your specific architecture/OS combination." +
+ "Could not find " + path + ".")
+ }
+ }
+
+ def load(library: String) = try {
+ System.loadLibrary(library)
+ } catch {
+ case ex: UnsatisfiedLinkError => loadFromJar(library)
+ }
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
new file mode 100644
index 0000000..59ad575
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
@@ -0,0 +1,36 @@
+package com.github.jodersky.flow
+package internal
+
+import akka.actor.{ Actor, ActorRef }
+import akka.util.ByteString
+import java.nio.ByteBuffer
+
+class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread {
+ def readLoop() = {
+ var stop = false
+ while (!serial.isClosed && !stop) {
+ try {
+ buffer.clear()
+ val length = serial.read(buffer)
+ buffer.limit(length)
+ val data = ByteString.fromByteBuffer(buffer)
+ client.tell(Serial.Received(data), operator)
+ } catch {
+
+ //don't do anything if port is interrupted
+ case ex: PortInterruptedException => {}
+
+ //stop and tell operator on other exception
+ case ex: Exception => {
+ stop = true
+ operator.tell(ThreadDied(this, ex), Actor.noSender)
+ }
+ }
+ }
+ }
+
+ override def run() {
+ this.setName("flow-reader " + serial.port)
+ readLoop()
+ }
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
new file mode 100644
index 0000000..73416e3
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
@@ -0,0 +1,158 @@
+package com.github.jodersky.flow
+package internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast
+ * to the latter, this class encapsulates and secures any pointers used to communicate with the native
+ * backend and is thread-safe.
+ *
+ * The underlying serial port is assumed open when this class is initialized.
+ */
+class SerialConnection private (
+ val port: String,
+ val settings: SerialSettings,
+ private val pointer: Long
+) {
+
+ import SerialConnection._
+
+ private var reading: Boolean = false
+ private val readLock = new Object
+
+ private var writing: Boolean = false
+ private val writeLock = new Object
+
+ private val closed = new AtomicBoolean(false)
+
+ /**
+ * Checks if this serial port is closed.
+ */
+ def isClosed = closed.get()
+
+ /**
+ * Closes the underlying serial connection. Any callers blocked on read or write will return.
+ * A call of this method has no effect if the serial port is already closed.
+ * @throws IOException on IO error
+ */
+ def close(): Unit = this.synchronized {
+ if (!closed.get) {
+ closed.set(true)
+ NativeSerial.cancelRead(pointer)
+ readLock.synchronized {
+ while (reading) this.wait()
+ }
+ writeLock.synchronized {
+ while (writing) this.wait()
+ }
+ NativeSerial.close(pointer)
+ }
+ }
+
+ /**
+ * Reads data from underlying serial connection into a ByteBuffer.
+ * Note that data is read into the buffer's memory, its attributes
+ * such as position and limit are not modified.
+ *
+ * A call to this method is blocking, however it is interrupted
+ * if the connection is closed.
+ *
+ * This method works for direct and indirect buffers but is optimized
+ * for the former.
+ *
+ * @param buffer a ByteBuffer into which data is read
+ * @return the actual number of bytes read
+ * @throws PortInterruptedException if port is closed while reading
+ * @throws IOException on IO error
+ */
+ def read(buffer: ByteBuffer): Int = readLock.synchronized {
+ if (!closed.get) {
+ reading = true
+ try {
+ transfer(
+ b => NativeSerial.readDirect(pointer, b),
+ b => NativeSerial.read(pointer, b.array())
+ )(buffer)
+ } finally {
+ reading = false
+ if (closed.get) readLock.notify()
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is closed")
+ }
+ }
+
+ /**
+ * Writes data from a ByteBuffer to underlying serial connection.
+ * Note that data is read from the buffer's memory, its attributes
+ * such as position and limit are not modified.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * This method works for direct and indirect buffers but is optimized
+ * for the former.
+ *
+ * @param buffer a ByteBuffer from which data is taken
+ * @return the actual number of bytes written
+ * @throws IOException on IO error
+ */
+ def write(buffer: ByteBuffer): Int = writeLock.synchronized {
+ if (!closed.get) {
+ writing = true
+ try {
+ transfer(
+ b => NativeSerial.writeDirect(pointer, b, b.position()),
+ b => NativeSerial.write(pointer, b.array(), b.position())
+ )(buffer)
+ } finally {
+ writing = false
+ if (closed.get) writeLock.notify()
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is closed")
+ }
+ }
+
+ private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) {
+ direct(buffer)
+ } else if (buffer.hasArray()) {
+ indirect(buffer)
+ } else {
+ throw new IllegalArgumentException("buffer is not direct and has no array");
+ }
+
+}
+
+object SerialConnection {
+ import NativeSerial._
+
+ /**
+ * Opens a new connection to a serial port.
+ * This method acts as a factory to creating serial connections.
+ *
+ * @param port name of serial port to open
+ * @param settings settings with which to initialize the connection
+ * @return an instance of the open serial connection
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ def open(port: String, settings: SerialSettings): SerialConnection = synchronized {
+ val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id)
+ new SerialConnection(port, settings, pointer)
+ }
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
new file mode 100644
index 0000000..1470aa5
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
@@ -0,0 +1,4 @@
+package com.github.jodersky.flow
+package internal
+
+case class ThreadDied(thread: Thread, reason: Exception)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
new file mode 100644
index 0000000..8d89fb4
--- /dev/null
+++ b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
@@ -0,0 +1,143 @@
+package com.github.jodersky.flow
+package internal
+
+import akka.actor.{ Actor, ActorRef, Props, Terminated }
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
+import java.nio.file.StandardWatchEventKinds._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
+import scala.util.{ Failure, Success, Try }
+
+class Watcher(from: Option[ActorRef]) extends Actor {
+
+ private val watcher = new Watcher.WatcherThread(self)
+
+ //directory -> subscribers
+ private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
+
+ //directory -> watchkey
+ private val keys: Map[String, WatchKey] = Map.empty
+
+ def subscribe(directory: String, client: ActorRef): WatchKey = {
+ val normal = Paths.get(directory).toAbsolutePath
+ val index = normal.toString
+ val key = keys.getOrElseUpdate(index, watcher.register(normal))
+ clients addBinding (index, client)
+ key
+ }
+
+ def unsubscribe(directory: String, client: ActorRef): Unit = {
+ val index = Paths.get(directory).toAbsolutePath.toString
+
+ clients removeBinding (index, sender)
+
+ if (clients.get(index).isEmpty && keys.get(index).isDefined) {
+ keys(index).cancel()
+ keys -= index
+ }
+ }
+
+ def reply(msg: Any, sender: ActorRef) = {
+ val origin = from match {
+ case Some(ref) => ref
+ case None => self
+ }
+ sender.tell(msg, origin)
+ }
+
+ override def preStart() = {
+ watcher.setDaemon(true)
+ watcher.setName("flow-watcher")
+ watcher.start()
+ }
+
+ def receive = {
+
+ case w @ Serial.Watch(directory, skipInitial) =>
+ val normalPath = Paths.get(directory).toAbsolutePath
+ val normal = normalPath.toString
+
+ Try {
+ subscribe(directory, sender)
+ } match {
+ case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
+ case Success(key) =>
+ context watch sender
+ if (!skipInitial) {
+ Files.newDirectoryStream(normalPath) foreach { path =>
+ if (!Files.isDirectory(path)) {
+ reply(Serial.Connected(path.toString), sender)
+ }
+ }
+ }
+ }
+
+ case u @ Serial.Unwatch(directory) =>
+ val normal = Paths.get(directory).toAbsolutePath.toString
+
+ clients.removeBinding(normal, sender)
+
+ if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
+ keys(normal).cancel()
+ keys -= normal
+ }
+
+ case Terminated(client) =>
+ for ((directory, c) <- clients if c == client) {
+ unsubscribe(directory, client)
+ }
+
+ case Watcher.NewFile(directory, file) =>
+ val normal = directory.toAbsolutePath
+ val absFile = normal resolve file
+ clients.getOrElse(normal.toString, Set.empty) foreach { client =>
+ reply(Serial.Connected(absFile.toString), client)
+ }
+
+ case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
+
+ }
+
+ override def postStop() = {
+ watcher.close()
+ }
+
+}
+
+object Watcher {
+ private case class NewFile(directory: Path, file: Path)
+
+ private class WatcherThread(actor: ActorRef) extends Thread {
+
+ private val service = FileSystems.getDefault().newWatchService()
+
+ def register(directory: Path) = directory.register(service, ENTRY_CREATE)
+
+ override def run(): Unit = {
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ actor.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
+ }
+ }
+ }
+
+ def close() = service.close //causes the service to throw a ClosedWatchServiceException
+ }
+
+ def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
+
+}