diff options
author | Jakob Odersky <jodersky@gmail.com> | 2015-05-06 10:51:29 +0200 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2015-05-06 12:13:11 +0200 |
commit | 5e611b35a891ab35d15a5eb8fcb8fcee6de1bb08 (patch) | |
tree | af3d48f72ac401fae995b8c9e31f4c96c038c7b2 /flow-main/src/main/scala | |
parent | 8a5734ed1bf73838a274409a929d1728bbd8cd40 (diff) | |
download | akka-serial-5e611b35a891ab35d15a5eb8fcb8fcee6de1bb08.tar.gz akka-serial-5e611b35a891ab35d15a5eb8fcb8fcee6de1bb08.tar.bz2 akka-serial-5e611b35a891ab35d15a5eb8fcb8fcee6de1bb08.zip |
refactor build
Diffstat (limited to 'flow-main/src/main/scala')
11 files changed, 515 insertions, 0 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala new file mode 100644 index 0000000..9bf52a6 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/Parity.scala @@ -0,0 +1,9 @@ +package com.github.jodersky.flow + +/** Specifies available parities used in serial communication. */ +object Parity extends Enumeration { + type Parity = Value + val None = Value(0) + val Odd = Value(1) + val Even = Value(2) +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala new file mode 100644 index 0000000..b6e9d62 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -0,0 +1,93 @@ +package com.github.jodersky.flow + +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by flow's serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + /** Base trait for any flow-related messages. */ + sealed trait Message + + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ + trait Command extends Message + + /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ + trait Event extends Message + + /** A command has failed. */ + case class CommandFailed(command: Command, reason: Throwable) extends Event + + /** + * Open a new serial port. + * + * Send this command to the serial manager to request the opening of a serial port. The manager will + * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. + * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. + * + * In case the port is successfully opened, the operator will respond with an `Opened` message. + * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. + * + * @param port name of serial port to open + * @param settings settings of serial port to open + * @param bufferSize maximum read and write buffer sizes + */ + case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command + + /** + * A port has been successfully opened. + * + * Event sent by a port operator, indicating that a serial port was successfully opened. The sender + * of this message is the operator associated to the given serial port. + * + * @param port name of opened serial port + */ + case class Opened(port: String) extends Event + + /** + * Data has been received. + * + * Event sent by an operator, indicating that data was received on the operator's serial port. + * + * @param data data received on the port + */ + case class Received(data: ByteString) extends Event + + /** + * Write data to a serial port. + * + * Send this command to an operator to write the given data to its associated serial port. + * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. + * Note that a successful write does not guarantee the actual transmission of data through the serial port, + * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to + * be transmitted. + * + * @param data data to be written to port + * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment + * is a function 'number of bytes written => event') + */ + case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command + + /** + * Special type of acknowledgment that is not sent back. + */ + case object NoAck extends Function1[Int, Event] { + def apply(length: Int) = sys.error("cannot apply NoAck") + } + + /** + * Request closing of port. + * + * Send this command to an operator to close its associated port. The operator will respond + * with a `Closed` message upon closing the serial port. + */ + case object Close extends Command + + /** + * A port has been closed. + * + * Event sent from operator, indicating that its port has been closed. + */ + case object Closed extends Event + +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala new file mode 100644 index 0000000..38e140a --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialExt.scala @@ -0,0 +1,10 @@ +package com.github.jodersky.flow + +import akka.actor.ExtendedActorSystem +import akka.actor.Props +import akka.io.IO + +/** Provides the serial IO manager. */ +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala new file mode 100644 index 0000000..eb8c44e --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -0,0 +1,48 @@ +package com.github.jodersky.flow + +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import com.github.jodersky.flow.internal.SerialConnection + +import Serial.CommandFailed +import Serial.Open +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.OneForOneStrategy +import akka.actor.SupervisorStrategy.Stop +import akka.actor.actorRef2Scala + +/** + * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to + * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. + * @see SerialOperator + */ +class SerialManager extends Actor with ActorLogging { + import SerialManager._ + import context._ + + override val supervisorStrategy = OneForOneStrategy() { + case _: Exception => Stop + } + + def receive = { + case open @ Open(port, settings, bufferSize) => Try { + SerialConnection.open(port, settings) + } match { + case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) + case Failure(err) => sender ! CommandFailed(open, err) + } + } + +} + +object SerialManager { + + private def escapePortString(port: String) = port collect { + case '/' => '-' + case c => c + } + +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala new file mode 100644 index 0000000..5524125 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -0,0 +1,68 @@ +package com.github.jodersky.flow + +import java.nio.ByteBuffer + +import com.github.jodersky.flow.internal.Reader +import com.github.jodersky.flow.internal.ReaderDied +import com.github.jodersky.flow.internal.SerialConnection + +import Serial.Close +import Serial.Closed +import Serial.NoAck +import Serial.Opened +import Serial.Write +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Props +import akka.actor.Terminated +import akka.actor.actorRef2Scala + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging { + import SerialOperator._ + import context._ + + val readBuffer = ByteBuffer.allocateDirect(bufferSize) + val reader = new Reader(connection, readBuffer, self, client) + val writeBuffer = ByteBuffer.allocateDirect(bufferSize) + + context.watch(client) + client ! Opened(connection.port) + reader.start() + + override def postStop = { + connection.close() + } + + def receive: Receive = { + + case Write(data, ack) => { + writeBuffer.clear() + data.copyToBuffer(writeBuffer) + val sent = connection.write(writeBuffer) + if (ack != NoAck) sender ! ack(sent) + } + + case Close => { + client ! Closed + context stop self + } + + case Terminated(`client`) => { + context stop self + } + + //go down with reader thread + case ReaderDied(ex) => throw ex + + } + +} + +object SerialOperator { + def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala new file mode 100644 index 0000000..08a5556 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialSettings.scala @@ -0,0 +1,10 @@ +package com.github.jodersky.flow + +/** + * Groups settings used in communication over a serial port. + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + */ +case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None)
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala new file mode 100644 index 0000000..adefcea --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/exceptions.scala @@ -0,0 +1,19 @@ +package com.github.jodersky.flow + +/** The requested port could not be found. */ +class NoSuchPortException(message: String) extends Exception(message) + +/** The requested port is in use by someone else. */ +class PortInUseException(message: String) extends Exception(message) + +/** Permissions are not sufficient to open a serial port. */ +class AccessDeniedException(message: String) extends Exception(message) + +/** The settings specified are invalid. */ +class InvalidSettingsException(message: String) extends Exception(message) + +/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ +class PortInterruptedException(message: String) extends Exception(message) + +/** The specified port has been closed. */ +class PortClosedException(message: String) extends Exception(message)
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala new file mode 100644 index 0000000..2526293 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala @@ -0,0 +1,58 @@ +package com.github.jodersky.flow.internal + +import java.io.File +import java.io.FileOutputStream +import java.io.InputStream +import java.io.OutputStream + +/** Handles loading of the current platform's native library for flow. */ +object NativeLoader { + + private final val BufferSize = 4096 + + private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "") + + private def arch = System.getProperty("os.arch").toLowerCase + + /** Extract a resource from this class loader to a temporary file. */ + private def extract(path: String, prefix: String): Option[File] = { + var in: Option[InputStream] = None + var out: Option[OutputStream] = None + + try { + in = Option(NativeLoader.getClass.getResourceAsStream(path)) + if (in.isEmpty) return None + + val file = File.createTempFile(prefix, "") + out = Some(new FileOutputStream(file)) + + val buffer = new Array[Byte](BufferSize) + var length = -1; + do { + length = in.get.read(buffer) + if (length != -1) out.get.write(buffer, 0, length) + } while (length != -1) + + Some(file) + } finally { + in.foreach(_.close) + out.foreach(_.close) + } + } + + private def loadFromJar(library: String) = { + val fqlib = System.mapLibraryName(library) //fully qualified library name + extract(s"/native/${os}-${arch}/${fqlib}", fqlib) match { + case Some(file) => System.load(file.getAbsolutePath) + case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " + + "the native library may not exist for your specific architecture/OS combination.") + } + } + + def load(library: String) = try { + System.loadLibrary(library) + } catch { + case ex: UnsatisfiedLinkError => loadFromJar(library) + } + +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala new file mode 100644 index 0000000..7b3f2ef --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala @@ -0,0 +1,40 @@ +package com.github.jodersky.flow.internal + +import java.nio.ByteBuffer + +import com.github.jodersky.flow.PortInterruptedException +import com.github.jodersky.flow.Serial.Received + +import akka.actor.Actor +import akka.actor.ActorRef +import akka.util.ByteString + +class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread { + def readLoop() = { + var stop = false + while (!serial.isClosed && !stop) { + try { + buffer.clear() + val length = serial.read(buffer) + buffer.limit(length) + val data = ByteString.fromByteBuffer(buffer) + client.tell(Received(data), operator) + } catch { + + //don't do anything if port is interrupted + case ex: PortInterruptedException => {} + + //stop and tell operator on other exception + case ex: Exception => { + stop = true + operator.tell(ReaderDied(ex), Actor.noSender) + } + } + } + } + + override def run() { + this.setName("flow-reader " + serial.port) + readLoop() + } +}
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala new file mode 100644 index 0000000..7dd9954 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala @@ -0,0 +1,3 @@ +package com.github.jodersky.flow.internal + +case class ReaderDied(reason: Exception)
\ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala new file mode 100644 index 0000000..b4242fe --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala @@ -0,0 +1,157 @@ +package com.github.jodersky.flow.internal + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean + +import com.github.jodersky.flow.PortClosedException +import com.github.jodersky.flow.SerialSettings + +/** + * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast + * to the latter, this class encapsulates and secures any pointers used to communicate with the native + * backend and is thread-safe. + * + * The underlying serial port is assumed open when this class is initialized. + */ +class SerialConnection private ( + val port: String, + val settings: SerialSettings, + private val pointer: Long) { + + import SerialConnection._ + + private var reading: Boolean = false + private val readLock = new Object + + private var writing: Boolean = false + private val writeLock = new Object + + private val closed = new AtomicBoolean(false) + + /** + * Checks if this serial port is closed. + */ + def isClosed = closed.get() + + /** + * Closes the underlying serial connection. Any callers blocked on read or write will return. + * A call of this method has no effect if the serial port is already closed. + * @throws IOException on IO error + */ + def close(): Unit = this.synchronized { + if (!closed.get) { + closed.set(true) + NativeSerial.cancelRead(pointer) + readLock.synchronized { + while (reading) this.wait() + } + writeLock.synchronized { + while (writing) this.wait() + } + NativeSerial.close(pointer) + } + } + + /** + * Reads data from underlying serial connection into a ByteBuffer. + * Note that data is read into the buffer's memory, its attributes + * such as position and limit are not modified. + * + * A call to this method is blocking, however it is interrupted + * if the connection is closed. + * + * This method works for direct and indirect buffers but is optimized + * for the former. + * + * @param buffer a ByteBuffer into which data is read + * @return the actual number of bytes read + * @throws PortInterruptedException if port is closed while reading + * @throws IOException on IO error + */ + def read(buffer: ByteBuffer): Int = readLock.synchronized { + if (!closed.get) { + reading = true + try { + transfer( + b => NativeSerial.readDirect(pointer, b), + b => NativeSerial.read(pointer, b.array()))(buffer) + } finally { + reading = false + if (closed.get) readLock.notify() + } + } else { + throw new PortClosedException(s"port ${port} is closed") + } + } + + /** + * Writes data from a ByteBuffer to underlying serial connection. + * Note that data is read from the buffer's memory, its attributes + * such as position and limit are not modified. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * This method works for direct and indirect buffers but is optimized + * for the former. + * + * @param buffer a ByteBuffer from which data is taken + * @return the actual number of bytes written + * @throws IOException on IO error + */ + def write(buffer: ByteBuffer): Int = writeLock.synchronized { + if (!closed.get) { + writing = true + try { + transfer( + b => NativeSerial.writeDirect(pointer, b, b.position()), + b => NativeSerial.write(pointer, b.array(), b.position()))(buffer) + } finally { + writing = false + if (closed.get) writeLock.notify() + } + } else { + throw new PortClosedException(s"port ${port} is closed") + } + } + + private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) { + direct(buffer) + } else if (buffer.hasArray()) { + indirect(buffer) + } else { + throw new IllegalArgumentException("buffer is not direct and has no array"); + } + +} + +object SerialConnection { + import NativeSerial._ + + /** + * Opens a new connection to a serial port. + * This method acts as a factory to creating serial connections. + * + * @param port name of serial port to open + * @param settings settings with which to initialize the connection + * @return an instance of the open serial connection + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + def open(port: String, settings: SerialSettings): SerialConnection = synchronized { + val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id) + new SerialConnection(port, settings, pointer) + } + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + def debug(value: Boolean) = NativeSerial.debug(value) + +}
\ No newline at end of file |