aboutsummaryrefslogtreecommitdiff
path: root/flow/src/main/scala
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2014-01-13 17:40:34 +0100
committerJakob Odersky <jodersky@gmail.com>2014-01-14 15:43:00 +0100
commit5c94ee017051d51f51e06a61a8bc4e70a15e36da (patch)
tree2e48701ebb4177557234dbbd22b3b7058cad36de /flow/src/main/scala
parent84f640208ed6e4e155e87ebcb73a82c021174ea7 (diff)
downloadakka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.tar.gz
akka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.tar.bz2
akka-serial-5c94ee017051d51f51e06a61a8bc4e70a15e36da.zip
enable easier cross-compilation
Diffstat (limited to 'flow/src/main/scala')
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/Parity.scala9
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/Serial.scala108
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala10
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala55
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala101
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala11
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/exceptions.scala21
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala92
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala38
9 files changed, 445 insertions, 0 deletions
diff --git a/flow/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow/src/main/scala/com/github/jodersky/flow/Parity.scala
new file mode 100644
index 0000000..9bf52a6
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/Parity.scala
@@ -0,0 +1,9 @@
+package com.github.jodersky.flow
+
+/** Specifies available parities used in serial communication. */
+object Parity extends Enumeration {
+ type Parity = Value
+ val None = Value(0)
+ val Odd = Value(1)
+ val Even = Value(2)
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala
new file mode 100644
index 0000000..abc8f39
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -0,0 +1,108 @@
+package com.github.jodersky.flow
+
+import akka.actor.ActorRef
+import akka.actor.ExtensionKey
+import akka.util.ByteString
+
+/** Defines messages used by flow's serial IO layer. */
+object Serial extends ExtensionKey[SerialExt] {
+
+ /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */
+ trait Command
+
+ /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */
+ trait Event
+
+ /** A command has failed. */
+ case class CommandFailed(command: Command, reason: Throwable) extends Event
+
+ /**
+ * Open a new serial port.
+ *
+ * Send this command to the serial manager to request the opening of a serial port. The manager will
+ * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port.
+ * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages.
+ *
+ * In case the port is successfully opened, the operator will respond with an `Opened` message.
+ * In case the port cannot be opened, the manager will respond with a `CommandFailed` message.
+ *
+ * @param settings settings of serial port to open
+ */
+ case class Open(settings: SerialSettings) extends Command
+
+ /**
+ * A port has been successfully opened.
+ *
+ * Event sent by a port operator, indicating that a serial port was successfully opened. The sender
+ * of this message is the operator associated to the given serial port. Furthermore, an additional reference
+ * to the operator is provided in this class' `operator` field.
+ *
+ * @param settings settings of port that was opened
+ * @param operator operator associated with the serial port
+ */
+ case class Opened(settings: SerialSettings, operator: ActorRef) extends Event
+
+ /**
+ * Register an actor to receive events.
+ *
+ * Send this command to a serial operator to register an actor for notification on the reception of data on the operator's associated port.
+ * Upon reception, data will be sent by the operator to registered actors in form of `Received` events.
+ *
+ * @param receiver actor to register
+ */
+ case class Register(receiver: ActorRef) extends Command
+
+ /**
+ * Unregister an actor from receiving events.
+ *
+ * Send this command to a serial operator to unregister an actor for notification on the reception of data on the operator's associated port.
+ *
+ * @param receiver actor to unregister
+ */
+ case class Unregister(receiver: ActorRef) extends Command
+
+ /**
+ * Data has been received.
+ *
+ * Event sent by an operator, indicating that data was received on the operator's serial port.
+ * Clients must register (see `Register`) with a serial operator to receive these events.
+ *
+ * @param data data received on the port
+ */
+ case class Received(data: ByteString) extends Event
+
+ /**
+ * Write data to a serial port.
+ *
+ * Send this command to an operator to write the given data to its associated serial port.
+ * An acknowledgment may be set, in which case it is sent back to the sender on a successful write.
+ * Note that a successful write does not guarantee the actual transmission of data through the serial port,
+ * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to
+ * be written.
+ *
+ * @param data data to be written to port
+ * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending
+ */
+ case class Write(data: ByteString, ack: Event = NoAck) extends Command
+
+ /**
+ * Special type of acknowledgment that is not sent back.
+ */
+ case object NoAck extends Event
+
+ /**
+ * Request closing of port.
+ *
+ * Send this command to an operator to close its associated port. The operator will respond
+ * with a `Closed` message upon closing the serial port.
+ */
+ case object Close extends Command
+
+ /**
+ * A port has been closed.
+ *
+ * Event sent from operator, indicating that its port has been closed.
+ */
+ case object Closed extends Event
+
+}
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala
new file mode 100644
index 0000000..826a4e9
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialExt.scala
@@ -0,0 +1,10 @@
+package com.github.jodersky.flow
+
+import akka.actor.ExtendedActorSystem
+import akka.actor.Props
+import akka.io.IO
+
+/** Provides the serial IO manager. */
+class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
+ lazy val manager = system.actorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala
new file mode 100644
index 0000000..b3128ac
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -0,0 +1,55 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Try
+
+import com.github.jodersky.flow.internal.InternalSerial
+
+import Serial._
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.OneForOneStrategy
+import akka.actor.Props
+import akka.actor.SupervisorStrategy.Escalate
+import akka.actor.SupervisorStrategy.Stop
+import akka.actor.actorRef2Scala
+
+/**
+ * Actor that manages serial port creation. Once opened, a serial port is handed over to
+ * a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
+ * @see SerialOperator
+ */
+class SerialManager extends Actor with ActorLogging {
+ import SerialManager._
+ import context._
+
+ override val supervisorStrategy =
+ OneForOneStrategy() {
+ case _: IOException => Stop
+ case _: Exception => Escalate
+ }
+
+ def receive = {
+ case c @ Open(s) => Try { InternalSerial.open(s.port, s.baud, s.characterSize, s.twoStopBits, s.parity.id) } match {
+ case Failure(t) => sender ! CommandFailed(c, t)
+ case Success(serial) => {
+ val operator = context.actorOf(SerialOperator(serial), name = escapePortString(s.port))
+ val settings = SerialSettings(serial.port, serial.baud, serial.characterSize, serial.twoStopBits, Parity(serial.parity))
+ sender.tell(Opened(settings, operator), operator)
+ }
+ }
+ }
+
+}
+
+object SerialManager {
+
+ private def escapePortString(port: String) = port collect {
+ case '/' => '-'
+ case c => c
+ }
+
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
new file mode 100644
index 0000000..3ac50c0
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -0,0 +1,101 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+import com.github.jodersky.flow.internal.InternalSerial
+import Serial._
+import akka.actor.Actor
+import akka.actor.ActorLogging
+import akka.actor.ActorRef
+import akka.actor.Terminated
+import akka.actor.actorRef2Scala
+import akka.util.ByteString
+import scala.collection.mutable.HashSet
+import akka.actor.Props
+
+/**
+ * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
+ * @see SerialManager
+ */
+class SerialOperator(serial: InternalSerial) extends Actor with ActorLogging {
+ import SerialOperator._
+ import context._
+
+ private val receivers = new HashSet[ActorRef]
+ private val receiversLock = new Object
+ private def tellAllReceivers(msg: Any) = receiversLock.synchronized {
+ receivers.foreach { receiver =>
+ receiver ! msg
+ }
+ }
+
+ private object Reader extends Thread {
+ def readLoop() = {
+ var continueReading = true
+ while (continueReading) {
+ try {
+ val data = ByteString(serial.read())
+ tellAllReceivers(Received(data))
+ } catch {
+
+ //port is closing, stop thread gracefully
+ case ex: PortInterruptedException => {
+ continueReading = false
+ }
+
+ //something else went wrong stop and tell actor
+ case ex: Exception => {
+ continueReading = false
+ self ! ReadException(ex)
+ }
+ }
+ }
+ }
+
+ def name = this.getName()
+
+ override def run() {
+ this.setName("flow-reader " + serial.port)
+ readLoop()
+ }
+ }
+
+ override def preStart() = {
+ Reader.start()
+ }
+
+ override def postStop = {
+ serial.close()
+ }
+
+ def receive: Receive = {
+
+ case Register(actor) => receiversLock.synchronized {
+ receivers += actor
+ }
+
+ case Unregister(actor) => receiversLock.synchronized {
+ receivers -= actor
+ }
+
+ case Write(data, ack) => {
+ val sent = serial.write(data.toArray)
+ if (ack != NoAck) sender ! ack
+ }
+
+ case Close => {
+ tellAllReceivers(Closed)
+ context stop self
+ }
+
+ //go down with reader thread
+ case ReadException(ex) => throw ex
+
+ }
+
+}
+
+object SerialOperator {
+ private case class ReadException(ex: Exception)
+
+ def apply(serial: InternalSerial) = Props(classOf[SerialOperator], serial)
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
new file mode 100644
index 0000000..a3bc5e4
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
@@ -0,0 +1,11 @@
+package com.github.jodersky.flow
+
+/**
+ * Groups settings used in communication over a serial port.
+ * @param port name of serial port
+ * @param baud baud rate to use with serial port
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ */
+case class SerialSettings(port: String, baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala
new file mode 100644
index 0000000..5923ba6
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/exceptions.scala
@@ -0,0 +1,21 @@
+package com.github.jodersky.flow
+
+import java.io.IOException
+
+/** The requested port could not be found. */
+class NoSuchPortException(message: String) extends Exception(message)
+
+/** The requested port is in use by someone else. */
+class PortInUseException(message: String) extends Exception(message)
+
+/** Permissions are not sufficient to open a serial port. */
+class AccessDeniedException(message: String) extends Exception(message)
+
+/** The settings specified are invalid. */
+class InvalidSettingsException(message: String) extends Exception(message)
+
+/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */
+class PortInterruptedException(message: String) extends Exception(message)
+
+/** The specified port has been closed. */
+class PortClosedException(message: String) extends Exception(message) \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
new file mode 100644
index 0000000..e69f91a
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
@@ -0,0 +1,92 @@
+package com.github.jodersky.flow.internal
+
+import java.io.IOException
+import com.github.jodersky.flow._
+import java.util.concurrent.atomic.AtomicBoolean
+
+/** Wraps `NativeSerial` in a more object-oriented style, still quite low level. */
+class InternalSerial private (val port: String, val baud: Int, val characterSize: Int, val twoStopBits: Boolean, val parity: Int, private val pointer: Long) {
+ import InternalSerial._
+
+ private val reading = new AtomicBoolean(false)
+ private val writing = new AtomicBoolean(false)
+ private val closed = new AtomicBoolean(false)
+
+ /** Closes the underlying serial connection. Any threads blocking on read or write will return. */
+ def close(): Unit = synchronized {
+ if (!closed.get()) {
+ closed.set(true)
+ except(NativeSerial.interrupt(pointer), port)
+ if (writing.get()) wait()
+ if (reading.get()) wait()
+ except(NativeSerial.close(pointer), port)
+ }
+ }
+
+ /**
+ * Read data from underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread
+ */
+ def read(): Array[Byte] = if (!closed.get) {
+ reading.set(true)
+ try {
+ val buffer = new Array[Byte](100)
+ val bytesRead = except(NativeSerial.read(pointer, buffer), port)
+ buffer take bytesRead
+ } finally {
+ synchronized {
+ reading.set(false)
+ if (closed.get) notify()
+ }
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is already closed")
+ }
+
+ /**
+ * Write data to underlying serial connection.
+ * @throws PortInterruptedException if port is closed from another thread
+ */
+ def write(data: Array[Byte]): Array[Byte] = if (!closed.get) {
+ writing.set(true)
+ try {
+ val bytesWritten = except(NativeSerial.write(pointer, data), port)
+ data take bytesWritten
+ } finally {
+ synchronized {
+ writing.set(false)
+ if (closed.get) notify()
+ }
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is already closed")
+ }
+
+}
+
+object InternalSerial {
+ import NativeSerial._
+
+ /** Transform error code to exception if necessary. */
+ private def except(result: Int, port: String): Int = result match {
+ case E_IO => throw new IOException(port)
+ case E_ACCESS_DENIED => throw new AccessDeniedException(port)
+ case E_BUSY => throw new PortInUseException(port)
+ case E_INVALID_SETTINGS => throw new InvalidSettingsException("the provided settings are invalid: be sure to use standard baud rate, character size and parity.")
+ case E_INTERRUPT => throw new PortInterruptedException(port)
+ case E_NO_PORT => throw new NoSuchPortException(port)
+ case error if error < 0 => throw new IOException(s"unknown error code: ${error}")
+ case success => success
+ }
+
+ /** Open a new connection to a serial port. */
+ def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): InternalSerial = synchronized {
+ val pointer = new Array[Long](1)
+ except(NativeSerial.open(port, baud, characterSize, twoStopBits, parity, pointer), port)
+ new InternalSerial(port, baud, characterSize, twoStopBits, parity, pointer(0))
+ }
+
+ /** Set debugging for all serial connections. Debugging results in printing extra messages from the native library in case of errors. */
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
new file mode 100644
index 0000000..034be96
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
@@ -0,0 +1,38 @@
+package com.github.jodersky.flow.internal
+
+import java.io.File
+import java.io.FileOutputStream
+import scalax.file.Path
+import scalax.io.Resource
+import scala.util.Try
+
+/** Handles loading of the current platform's native library for flow. */
+object NativeLoader {
+
+ def extract(): Option[File] = {
+ val os = System.getProperty("os.name").toLowerCase.filter(_ != ' ')
+ val arch = System.getProperty("os.arch").toLowerCase
+ val fqlib = System.mapLibraryName("flow") //fully qualified library name
+
+ val in = NativeLoader.getClass().getResourceAsStream(s"/native/${arch}-${os}/${fqlib}")
+ if (in == null) return None
+
+ val temp = Path.createTempFile()
+ Resource.fromInputStream(in).copyDataTo(temp)
+ temp.fileOption
+ }
+
+ def loadFromJar() = extract() match {
+ case Some(file) => System.load(file.getAbsolutePath)
+ case None => throw new UnsatisfiedLinkError("cannot extract native library, the native library may not exist for your specific os/architecture combination")
+ }
+
+ def load = {
+ try {
+ System.loadLibrary("flow")
+ } catch {
+ case ex: UnsatisfiedLinkError => loadFromJar()
+ }
+ }
+
+} \ No newline at end of file