diff options
author | Jakob Odersky <jakob@odersky.com> | 2016-01-24 20:21:17 -0800 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2016-02-03 20:46:28 -0800 |
commit | f865a76c2f441f619b069505b73fcbd1cba1a67c (patch) | |
tree | 3f53c519f4575037bdebf8c8399ca25d50649543 /flow-main/src/main/scala/com | |
parent | 46c30908f827e27b58166f56efa4f15917c1af4f (diff) | |
download | akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.gz akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.bz2 akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.zip |
Add support for Akka streams
Diffstat (limited to 'flow-main/src/main/scala/com')
12 files changed, 0 insertions, 672 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala deleted file mode 100644 index 04d64a9..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.github.jodersky.flow - -/** Specifies available parities used in serial communication. */ -object Parity extends Enumeration { - type Parity = Value - val None = Value(0) - val Odd = Value(1) - val Even = Value(2) -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala deleted file mode 100644 index d3b8873..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala +++ /dev/null @@ -1,124 +0,0 @@ -package com.github.jodersky.flow - -import akka.actor.ExtensionKey -import akka.util.ByteString - -/** Defines messages used by flow's serial IO layer. */ -object Serial extends ExtensionKey[SerialExt] { - - /** Base trait for any flow-related messages. */ - sealed trait Message - - /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ - trait Command extends Message - - /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ - trait Event extends Message - - /** A command has failed. */ - case class CommandFailed(command: Command, reason: Throwable) extends Event - - /** - * Open a new serial port. - * - * Send this command to the serial manager to request the opening of a serial port. The manager will - * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. - * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. - * - * In case the port is successfully opened, the operator will respond with an `Opened` message. - * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. - * - * @param port name of serial port to open - * @param settings settings of serial port to open - * @param bufferSize maximum read and write buffer sizes - */ - case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command - - /** - * A port has been successfully opened. - * - * Event sent by a port operator, indicating that a serial port was successfully opened. The sender - * of this message is the operator associated to the given serial port. - * - * @param port name of opened serial port - */ - case class Opened(port: String) extends Event - - /** - * Data has been received. - * - * Event sent by an operator, indicating that data was received on the operator's serial port. - * - * @param data data received on the port - */ - case class Received(data: ByteString) extends Event - - /** - * Write data to a serial port. - * - * Send this command to an operator to write the given data to its associated serial port. - * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. - * Note that a successful write does not guarantee the actual transmission of data through the serial port, - * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to - * be transmitted. - * - * @param data data to be written to port - * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment - * is a function 'number of bytes written => event') - */ - case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command - - /** - * Special type of acknowledgment that is not sent back. - */ - case object NoAck extends Function1[Int, Event] { - def apply(length: Int) = sys.error("cannot apply NoAck") - } - - /** - * Request closing of port. - * - * Send this command to an operator to close its associated port. The operator will respond - * with a `Closed` message upon closing the serial port. - */ - case object Close extends Command - - /** - * A port has been closed. - * - * Event sent from operator, indicating that its port has been closed. - */ - case object Closed extends Event - - /** - * Watch a directory for new ports. - * - * Send this command to the manager to get notifications when a new port (i.e. file) is created in - * the given directory. - * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. - * - * Note: the sender is also notified of currently existing ports. - * - * @param directory the directory to watch - * @param skipInitial don't get notified of already existing ports - * - * @see Unwatch - * @see Connected - */ - case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command - - /** - * Stop receiving notifications about a previously watched directory. - * - * @param directory the directory to unwatch - */ - case class Unwatch(directory: String = "/dev") extends Command - - /** - * A new port (i.e. file) has been detected. - * - * @param port the absolute file name of the connected port - */ - case class Connected(port: String) extends Event - -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala deleted file mode 100644 index 5c1ddf6..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala +++ /dev/null @@ -1,9 +0,0 @@ -package com.github.jodersky.flow - -import akka.actor.{ ExtendedActorSystem, Props } -import akka.io.IO - -/** Provides the serial IO manager. */ -class SerialExt(system: ExtendedActorSystem) extends IO.Extension { - lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala deleted file mode 100644 index d163b0a..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ /dev/null @@ -1,48 +0,0 @@ -package com.github.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } -import akka.actor.SupervisorStrategy.{ Escalate, Stop } -import internal.{ SerialConnection, Watcher } -import scala.util.{ Failure, Success, Try } - -/** - * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to - * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. - * @see SerialOperator - */ -class SerialManager extends Actor with ActorLogging { - import SerialManager._ - import context._ - - override val supervisorStrategy = OneForOneStrategy() { - case _: Exception if sender == watcher => Escalate - case _: Exception => Stop - } - - private val watcher = actorOf(Watcher(self), "watcher") - - def receive = { - - case open @ Serial.Open(port, settings, bufferSize) => Try { - SerialConnection.open(port, settings) - } match { - case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) - case Failure(err) => sender ! Serial.CommandFailed(open, err) - } - - case w: Serial.Watch => watcher.forward(w) - - case u: Serial.Unwatch => watcher.forward(u) - - } - -} - -object SerialManager { - - private def escapePortString(port: String) = port collect { - case '/' => '-' - case c => c - } - -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala deleted file mode 100644 index ec0ee27..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ /dev/null @@ -1,54 +0,0 @@ -package com.github.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } -import internal.{ Reader, SerialConnection, ThreadDied } -import java.nio.ByteBuffer - -/** - * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. - * @see SerialManager - */ -class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging { - import SerialOperator._ - import context._ - - val readBuffer = ByteBuffer.allocateDirect(bufferSize) - val reader = new Reader(connection, readBuffer, self, client) - val writeBuffer = ByteBuffer.allocateDirect(bufferSize) - - context.watch(client) - client ! Serial.Opened(connection.port) - reader.start() - - override def postStop = { - connection.close() - } - - def receive: Receive = { - - case Serial.Write(data, ack) => { - writeBuffer.clear() - data.copyToBuffer(writeBuffer) - val sent = connection.write(writeBuffer) - if (ack != Serial.NoAck) sender ! ack(sent) - } - - case Serial.Close => { - client ! Serial.Closed - context stop self - } - - case Terminated(`client`) => { - context stop self - } - - //go down with reader thread - case ThreadDied(`reader`, ex) => throw ex - - } - -} - -object SerialOperator { - def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala deleted file mode 100644 index 087fa6e..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala +++ /dev/null @@ -1,10 +0,0 @@ -package com.github.jodersky.flow - -/** - * Groups settings used in communication over a serial port. - * @param baud baud rate to use with serial port - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - */ -case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala deleted file mode 100644 index ebc0e65..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala +++ /dev/null @@ -1,19 +0,0 @@ -package com.github.jodersky.flow - -/** The requested port could not be found. */ -class NoSuchPortException(message: String) extends Exception(message) - -/** The requested port is in use by someone else. */ -class PortInUseException(message: String) extends Exception(message) - -/** Permissions are not sufficient to open a serial port. */ -class AccessDeniedException(message: String) extends Exception(message) - -/** The settings specified are invalid. */ -class InvalidSettingsException(message: String) extends Exception(message) - -/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ -class PortInterruptedException(message: String) extends Exception(message) - -/** The specified port has been closed. */ -class PortClosedException(message: String) extends Exception(message) diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala deleted file mode 100644 index a5fdc40..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala +++ /dev/null @@ -1,58 +0,0 @@ -package com.github.jodersky.flow -package internal - -import java.io.{ File, FileOutputStream, InputStream, OutputStream } - -/** Handles loading of the current platform's native library for flow. */ -object NativeLoader { - - private final val BufferSize = 4096 - - private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "") - - private def arch = System.getProperty("os.arch").toLowerCase - - /** Extract a resource from this class loader to a temporary file. */ - private def extract(path: String, prefix: String): Option[File] = { - var in: Option[InputStream] = None - var out: Option[OutputStream] = None - - try { - in = Option(NativeLoader.getClass.getResourceAsStream(path)) - if (in.isEmpty) return None - - val file = File.createTempFile(prefix, "") - out = Some(new FileOutputStream(file)) - - val buffer = new Array[Byte](BufferSize) - var length = -1; - do { - length = in.get.read(buffer) - if (length != -1) out.get.write(buffer, 0, length) - } while (length != -1) - - Some(file) - } finally { - in.foreach(_.close) - out.foreach(_.close) - } - } - - private def loadFromJar(library: String) = { - val fqlib = System.mapLibraryName(library) //fully qualified library name - val path = s"/native/${os}-${arch}/${fqlib}" - extract(path, fqlib) match { - case Some(file) => System.load(file.getAbsolutePath) - case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " + - "the native library does not exist for your specific architecture/OS combination." + - "Could not find " + path + ".") - } - } - - def load(library: String) = try { - System.loadLibrary(library) - } catch { - case ex: UnsatisfiedLinkError => loadFromJar(library) - } - -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala deleted file mode 100644 index 59ad575..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.github.jodersky.flow -package internal - -import akka.actor.{ Actor, ActorRef } -import akka.util.ByteString -import java.nio.ByteBuffer - -class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread { - def readLoop() = { - var stop = false - while (!serial.isClosed && !stop) { - try { - buffer.clear() - val length = serial.read(buffer) - buffer.limit(length) - val data = ByteString.fromByteBuffer(buffer) - client.tell(Serial.Received(data), operator) - } catch { - - //don't do anything if port is interrupted - case ex: PortInterruptedException => {} - - //stop and tell operator on other exception - case ex: Exception => { - stop = true - operator.tell(ThreadDied(this, ex), Actor.noSender) - } - } - } - } - - override def run() { - this.setName("flow-reader " + serial.port) - readLoop() - } -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala deleted file mode 100644 index 73416e3..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala +++ /dev/null @@ -1,158 +0,0 @@ -package com.github.jodersky.flow -package internal - -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicBoolean - -/** - * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast - * to the latter, this class encapsulates and secures any pointers used to communicate with the native - * backend and is thread-safe. - * - * The underlying serial port is assumed open when this class is initialized. - */ -class SerialConnection private ( - val port: String, - val settings: SerialSettings, - private val pointer: Long -) { - - import SerialConnection._ - - private var reading: Boolean = false - private val readLock = new Object - - private var writing: Boolean = false - private val writeLock = new Object - - private val closed = new AtomicBoolean(false) - - /** - * Checks if this serial port is closed. - */ - def isClosed = closed.get() - - /** - * Closes the underlying serial connection. Any callers blocked on read or write will return. - * A call of this method has no effect if the serial port is already closed. - * @throws IOException on IO error - */ - def close(): Unit = this.synchronized { - if (!closed.get) { - closed.set(true) - NativeSerial.cancelRead(pointer) - readLock.synchronized { - while (reading) this.wait() - } - writeLock.synchronized { - while (writing) this.wait() - } - NativeSerial.close(pointer) - } - } - - /** - * Reads data from underlying serial connection into a ByteBuffer. - * Note that data is read into the buffer's memory, its attributes - * such as position and limit are not modified. - * - * A call to this method is blocking, however it is interrupted - * if the connection is closed. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer into which data is read - * @return the actual number of bytes read - * @throws PortInterruptedException if port is closed while reading - * @throws IOException on IO error - */ - def read(buffer: ByteBuffer): Int = readLock.synchronized { - if (!closed.get) { - reading = true - try { - transfer( - b => NativeSerial.readDirect(pointer, b), - b => NativeSerial.read(pointer, b.array()) - )(buffer) - } finally { - reading = false - if (closed.get) readLock.notify() - } - } else { - throw new PortClosedException(s"port ${port} is closed") - } - } - - /** - * Writes data from a ByteBuffer to underlying serial connection. - * Note that data is read from the buffer's memory, its attributes - * such as position and limit are not modified. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer from which data is taken - * @return the actual number of bytes written - * @throws IOException on IO error - */ - def write(buffer: ByteBuffer): Int = writeLock.synchronized { - if (!closed.get) { - writing = true - try { - transfer( - b => NativeSerial.writeDirect(pointer, b, b.position()), - b => NativeSerial.write(pointer, b.array(), b.position()) - )(buffer) - } finally { - writing = false - if (closed.get) writeLock.notify() - } - } else { - throw new PortClosedException(s"port ${port} is closed") - } - } - - private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) { - direct(buffer) - } else if (buffer.hasArray()) { - indirect(buffer) - } else { - throw new IllegalArgumentException("buffer is not direct and has no array"); - } - -} - -object SerialConnection { - import NativeSerial._ - - /** - * Opens a new connection to a serial port. - * This method acts as a factory to creating serial connections. - * - * @param port name of serial port to open - * @param settings settings with which to initialize the connection - * @return an instance of the open serial connection - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - def open(port: String, settings: SerialSettings): SerialConnection = synchronized { - val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id) - new SerialConnection(port, settings, pointer) - } - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - def debug(value: Boolean) = NativeSerial.debug(value) - -} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala deleted file mode 100644 index 1470aa5..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala +++ /dev/null @@ -1,4 +0,0 @@ -package com.github.jodersky.flow -package internal - -case class ThreadDied(thread: Thread, reason: Exception) diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala deleted file mode 100644 index 8d89fb4..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala +++ /dev/null @@ -1,143 +0,0 @@ -package com.github.jodersky.flow -package internal - -import akka.actor.{ Actor, ActorRef, Props, Terminated } -import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } -import java.nio.file.StandardWatchEventKinds._ -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } -import scala.util.{ Failure, Success, Try } - -class Watcher(from: Option[ActorRef]) extends Actor { - - private val watcher = new Watcher.WatcherThread(self) - - //directory -> subscribers - private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] - - //directory -> watchkey - private val keys: Map[String, WatchKey] = Map.empty - - def subscribe(directory: String, client: ActorRef): WatchKey = { - val normal = Paths.get(directory).toAbsolutePath - val index = normal.toString - val key = keys.getOrElseUpdate(index, watcher.register(normal)) - clients addBinding (index, client) - key - } - - def unsubscribe(directory: String, client: ActorRef): Unit = { - val index = Paths.get(directory).toAbsolutePath.toString - - clients removeBinding (index, sender) - - if (clients.get(index).isEmpty && keys.get(index).isDefined) { - keys(index).cancel() - keys -= index - } - } - - def reply(msg: Any, sender: ActorRef) = { - val origin = from match { - case Some(ref) => ref - case None => self - } - sender.tell(msg, origin) - } - - override def preStart() = { - watcher.setDaemon(true) - watcher.setName("flow-watcher") - watcher.start() - } - - def receive = { - - case w @ Serial.Watch(directory, skipInitial) => - val normalPath = Paths.get(directory).toAbsolutePath - val normal = normalPath.toString - - Try { - subscribe(directory, sender) - } match { - case Failure(err) => reply(Serial.CommandFailed(w, err), sender) - case Success(key) => - context watch sender - if (!skipInitial) { - Files.newDirectoryStream(normalPath) foreach { path => - if (!Files.isDirectory(path)) { - reply(Serial.Connected(path.toString), sender) - } - } - } - } - - case u @ Serial.Unwatch(directory) => - val normal = Paths.get(directory).toAbsolutePath.toString - - clients.removeBinding(normal, sender) - - if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { - keys(normal).cancel() - keys -= normal - } - - case Terminated(client) => - for ((directory, c) <- clients if c == client) { - unsubscribe(directory, client) - } - - case Watcher.NewFile(directory, file) => - val normal = directory.toAbsolutePath - val absFile = normal resolve file - clients.getOrElse(normal.toString, Set.empty) foreach { client => - reply(Serial.Connected(absFile.toString), client) - } - - case ThreadDied(`watcher`, err) => throw err //go down with watcher thread - - } - - override def postStop() = { - watcher.close() - } - -} - -object Watcher { - private case class NewFile(directory: Path, file: Path) - - private class WatcherThread(actor: ActorRef) extends Thread { - - private val service = FileSystems.getDefault().newWatchService() - - def register(directory: Path) = directory.register(service, ENTRY_CREATE) - - override def run(): Unit = { - var stop = false - while (!stop) { - try { - val key = service.take() - key.pollEvents() foreach { ev => - val event = ev.asInstanceOf[WatchEvent[Path]] - if (event.kind == ENTRY_CREATE) { - val directory = key.watchable().asInstanceOf[Path] - val file = event.context() - actor.tell(NewFile(directory, file), Actor.noSender) - } - } - key.reset() - } catch { - case _: InterruptedException => stop = true - case _: ClosedWatchServiceException => stop = true - case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender) - } - } - } - - def close() = service.close //causes the service to throw a ClosedWatchServiceException - } - - def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) - -} |