diff options
author | Jakob Odersky <jodersky@gmail.com> | 2014-01-13 17:40:34 +0100 |
---|---|---|
committer | Jakob Odersky <jodersky@gmail.com> | 2014-01-14 15:43:00 +0100 |
commit | 5c94ee017051d51f51e06a61a8bc4e70a15e36da (patch) | |
tree | 2e48701ebb4177557234dbbd22b3b7058cad36de /flow/src/main/scala | |
parent | 84f640208ed6e4e155e87ebcb73a82c021174ea7 (diff) | |
download | akka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.tar.gz akka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.tar.bz2 akka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.zip |
enable easier cross-compilation
Diffstat (limited to 'flow/src/main/scala')
9 files changed, 445 insertions, 0 deletions
diff --git a/flow/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow/src/main/scala/com/github/jodersky/flow/Parity.scala new file mode 100644 index 0000000..9bf52a6 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/Parity.scala @@ -0,0 +1,9 @@ +package com.github.jodersky.flow + +/** Specifies available parities used in serial communication. */ +object Parity extends Enumeration { + type Parity = Value + val None = Value(0) + val Odd = Value(1) + val Even = Value(2) +}
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala new file mode 100644 index 0000000..abc8f39 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -0,0 +1,108 @@ +package com.github.jodersky.flow + +import akka.actor.ActorRef +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by flow's serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ + trait Command + + /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ + trait Event + + /** A command has failed. */ + case class CommandFailed(command: Command, reason: Throwable) extends Event + + /** + * Open a new serial port. + * + * Send this command to the serial manager to request the opening of a serial port. The manager will + * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. + * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. + * + * In case the port is successfully opened, the operator will respond with an `Opened` message. + * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. + * + * @param settings settings of serial port to open + */ + case class Open(settings: SerialSettings) extends Command + + /** + * A port has been successfully opened. + * + * Event sent by a port operator, indicating that a serial port was successfully opened. The sender + * of this message is the operator associated to the given serial port. Furthermore, an additional reference + * to the operator is provided in this class' `operator` field. + * + * @param settings settings of port that was opened + * @param operator operator associated with the serial port + */ + case class Opened(settings: SerialSettings, operator: ActorRef) extends Event + + /** + * Register an actor to receive events. + * + * Send this command to a serial operator to register an actor for notification on the reception of data on the operator's associated port. + * Upon reception, data will be sent by the operator to registered actors in form of `Received` events. + * + * @param receiver actor to register + */ + case class Register(receiver: ActorRef) extends Command + + /** + * Unregister an actor from receiving events. + * + * Send this command to a serial operator to unregister an actor for notification on the reception of data on the operator's associated port. + * + * @param receiver actor to unregister + */ + case class Unregister(receiver: ActorRef) extends Command + + /** + * Data has been received. + * + * Event sent by an operator, indicating that data was received on the operator's serial port. + * Clients must register (see `Register`) with a serial operator to receive these events. + * + * @param data data received on the port + */ + case class Received(data: ByteString) extends Event + + /** + * Write data to a serial port. + * + * Send this command to an operator to write the given data to its associated serial port. + * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. + * Note that a successful write does not guarantee the actual transmission of data through the serial port, + * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to + * be written. + * + * @param data data to be written to port + * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending + */ + case class Write(data: ByteString, ack: Event = NoAck) extends Command + + /** + * Special type of acknowledgment that is not sent back. + */ + case object NoAck extends Event + + /** + * Request closing of port. + * + * Send this command to an operator to close its associated port. The operator will respond + * with a `Closed` message upon closing the serial port. + */ + case object Close extends Command + + /** + * A port has been closed. + * + * Event sent from operator, indicating that its port has been closed. + */ + case object Closed extends Event + +} diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala new file mode 100644 index 0000000..826a4e9 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala @@ -0,0 +1,10 @@ +package com.github.jodersky.flow + +import akka.actor.ExtendedActorSystem +import akka.actor.Props +import akka.io.IO + +/** Provides the serial IO manager. */ +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +}
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala new file mode 100644 index 0000000..b3128ac --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -0,0 +1,55 @@ +package com.github.jodersky.flow + +import java.io.IOException + +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +import com.github.jodersky.flow.internal.InternalSerial + +import Serial._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.OneForOneStrategy +import akka.actor.Props +import akka.actor.SupervisorStrategy.Escalate +import akka.actor.SupervisorStrategy.Stop +import akka.actor.actorRef2Scala + +/** + * Actor that manages serial port creation. Once opened, a serial port is handed over to + * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. + * @see SerialOperator + */ +class SerialManager extends Actor with ActorLogging { + import SerialManager._ + import context._ + + override val supervisorStrategy = + OneForOneStrategy() { + case _: IOException => Stop + case _: Exception => Escalate + } + + def receive = { + case c @ Open(s) => Try { InternalSerial.open(s.port, s.baud, s.characterSize, s.twoStopBits, s.parity.id) } match { + case Failure(t) => sender ! CommandFailed(c, t) + case Success(serial) => { + val operator = context.actorOf(SerialOperator(serial), name = escapePortString(s.port)) + val settings = SerialSettings(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity)) + sender.tell(Opened(settings, operator), operator) + } + } + } + +} + +object SerialManager { + + private def escapePortString(port: String) = port collect { + case '/' => '-' + case c => c + } + +}
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala new file mode 100644 index 0000000..3ac50c0 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -0,0 +1,101 @@ +package com.github.jodersky.flow + +import java.io.IOException +import com.github.jodersky.flow.internal.InternalSerial +import Serial._ +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.actor.Terminated +import akka.actor.actorRef2Scala +import akka.util.ByteString +import scala.collection.mutable.HashSet +import akka.actor.Props + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging { + import SerialOperator._ + import context._ + + private val receivers = new HashSet[ActorRef] + private val receiversLock = new Object + private def tellAllReceivers(msg: Any) = receiversLock.synchronized { + receivers.foreach { receiver => + receiver ! msg + } + } + + private object Reader extends Thread { + def readLoop() = { + var continueReading = true + while (continueReading) { + try { + val data = ByteString(serial.read()) + tellAllReceivers(Received(data)) + } catch { + + //port is closing, stop thread gracefully + case ex: PortInterruptedException => { + continueReading = false + } + + //something else went wrong stop and tell actor + case ex: Exception => { + continueReading = false + self ! ReadException(ex) + } + } + } + } + + def name = this.getName() + + override def run() { + this.setName("flow-reader " + serial.port) + readLoop() + } + } + + override def preStart() = { + Reader.start() + } + + override def postStop = { + serial.close() + } + + def receive: Receive = { + + case Register(actor) => receiversLock.synchronized { + receivers += actor + } + + case Unregister(actor) => receiversLock.synchronized { + receivers -= actor + } + + case Write(data, ack) => { + val sent = serial.write(data.toArray) + if (ack != NoAck) sender ! ack + } + + case Close => { + tellAllReceivers(Closed) + context stop self + } + + //go down with reader thread + case ReadException(ex) => throw ex + + } + +} + +object SerialOperator { + private case class ReadException(ex: Exception) + + def apply(serial: InternalSerial) = Props(classOf[SerialOperator], serial) +}
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala new file mode 100644 index 0000000..a3bc5e4 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala @@ -0,0 +1,11 @@ +package com.github.jodersky.flow + +/** + * Groups settings used in communication over a serial port. + * @param port name of serial port + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + */ +case class SerialSettings(port: String, baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None)
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala new file mode 100644 index 0000000..5923ba6 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala @@ -0,0 +1,21 @@ +package com.github.jodersky.flow + +import java.io.IOException + +/** The requested port could not be found. */ +class NoSuchPortException(message: String) extends Exception(message) + +/** The requested port is in use by someone else. */ +class PortInUseException(message: String) extends Exception(message) + +/** Permissions are not sufficient to open a serial port. */ +class AccessDeniedException(message: String) extends Exception(message) + +/** The settings specified are invalid. */ +class InvalidSettingsException(message: String) extends Exception(message) + +/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ +class PortInterruptedException(message: String) extends Exception(message) + +/** The specified port has been closed. */ +class PortClosedException(message: String) extends Exception(message)
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala new file mode 100644 index 0000000..e69f91a --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala @@ -0,0 +1,92 @@ +package com.github.jodersky.flow.internal + +import java.io.IOException +import com.github.jodersky.flow._ +import java.util.concurrent.atomic.AtomicBoolean + +/** Wraps `NativeSerial` in a more object-oriented style, still quite low level. */ +class InternalSerial private (val port: String, val baud: Int, val characterSize: Int, val twoStopBits: Boolean, val parity: Int, private val pointer: Long) { + import InternalSerial._ + + private val reading = new AtomicBoolean(false) + private val writing = new AtomicBoolean(false) + private val closed = new AtomicBoolean(false) + + /** Closes the underlying serial connection. Any threads blocking on read or write will return. */ + def close(): Unit = synchronized { + if (!closed.get()) { + closed.set(true) + except(NativeSerial.interrupt(pointer), port) + if (writing.get()) wait() + if (reading.get()) wait() + except(NativeSerial.close(pointer), port) + } + } + + /** + * Read data from underlying serial connection. + * @throws PortInterruptedException if port is closed from another thread + */ + def read(): Array[Byte] = if (!closed.get) { + reading.set(true) + try { + val buffer = new Array[Byte](100) + val bytesRead = except(NativeSerial.read(pointer, buffer), port) + buffer take bytesRead + } finally { + synchronized { + reading.set(false) + if (closed.get) notify() + } + } + } else { + throw new PortClosedException(s"port ${port} is already closed") + } + + /** + * Write data to underlying serial connection. + * @throws PortInterruptedException if port is closed from another thread + */ + def write(data: Array[Byte]): Array[Byte] = if (!closed.get) { + writing.set(true) + try { + val bytesWritten = except(NativeSerial.write(pointer, data), port) + data take bytesWritten + } finally { + synchronized { + writing.set(false) + if (closed.get) notify() + } + } + } else { + throw new PortClosedException(s"port ${port} is already closed") + } + +} + +object InternalSerial { + import NativeSerial._ + + /** Transform error code to exception if necessary. */ + private def except(result: Int, port: String): Int = result match { + case E_IO => throw new IOException(port) + case E_ACCESS_DENIED => throw new AccessDeniedException(port) + case E_BUSY => throw new PortInUseException(port) + case E_INVALID_SETTINGS => throw new InvalidSettingsException("the provided settings are invalid: be sure to use standard baud rate, character size and parity.") + case E_INTERRUPT => throw new PortInterruptedException(port) + case E_NO_PORT => throw new NoSuchPortException(port) + case error if error < 0 => throw new IOException(s"unknown error code: ${error}") + case success => success + } + + /** Open a new connection to a serial port. */ + def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): InternalSerial = synchronized { + val pointer = new Array[Long](1) + except(NativeSerial.open(port, baud, characterSize, twoStopBits, parity, pointer), port) + new InternalSerial(port, baud, characterSize, twoStopBits, parity, pointer(0)) + } + + /** Set debugging for all serial connections. Debugging results in printing extra messages from the native library in case of errors. */ + def debug(value: Boolean) = NativeSerial.debug(value) + +}
\ No newline at end of file diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala new file mode 100644 index 0000000..034be96 --- /dev/null +++ b/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala @@ -0,0 +1,38 @@ +package com.github.jodersky.flow.internal + +import java.io.File +import java.io.FileOutputStream +import scalax.file.Path +import scalax.io.Resource +import scala.util.Try + +/** Handles loading of the current platform's native library for flow. */ +object NativeLoader { + + def extract(): Option[File] = { + val os = System.getProperty("os.name").toLowerCase.filter(_ != ' ') + val arch = System.getProperty("os.arch").toLowerCase + val fqlib = System.mapLibraryName("flow") //fully qualified library name + + val in = NativeLoader.getClass().getResourceAsStream(s"/native/${arch}-${os}/${fqlib}") + if (in == null) return None + + val temp = Path.createTempFile() + Resource.fromInputStream(in).copyDataTo(temp) + temp.fileOption + } + + def loadFromJar() = extract() match { + case Some(file) => System.load(file.getAbsolutePath) + case None => throw new UnsatisfiedLinkError("cannot extract native library, the native library may not exist for your specific os/architecture combination") + } + + def load = { + try { + System.loadLibrary("flow") + } catch { + case ex: UnsatisfiedLinkError => loadFromJar() + } + } + +}
\ No newline at end of file |