diff options
author | Jakob Odersky <jakob@odersky.com> | 2017-01-08 21:16:25 +0100 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2017-01-21 17:22:10 -0800 |
commit | 23959966760174477a6b0fcbf9dd1e8ef37c643b (patch) | |
tree | 9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /sync | |
parent | 6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff) | |
download | akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2 akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip |
Rename project to akka-serial
Diffstat (limited to 'sync')
-rw-r--r-- | sync/build.sbt | 5 | ||||
-rw-r--r-- | sync/src/main/scala/akka/serial/Parity.scala | 9 | ||||
-rw-r--r-- | sync/src/main/scala/akka/serial/SerialSettings.scala | 10 | ||||
-rw-r--r-- | sync/src/main/scala/akka/serial/exceptions.scala | 19 | ||||
-rw-r--r-- | sync/src/main/scala/akka/serial/sync/SerialConnection.scala | 142 | ||||
-rw-r--r-- | sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala | 109 | ||||
-rw-r--r-- | sync/src/test/scala/akka/serial/PseudoTerminal.scala | 43 | ||||
-rw-r--r-- | sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala | 101 |
8 files changed, 438 insertions, 0 deletions
diff --git a/sync/build.sbt b/sync/build.sbt new file mode 100644 index 0000000..547a3fd --- /dev/null +++ b/sync/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.scalatest % "test" + +target in javah := (baseDirectory in ThisBuild).value / "native" / "src" / "include" diff --git a/sync/src/main/scala/akka/serial/Parity.scala b/sync/src/main/scala/akka/serial/Parity.scala new file mode 100644 index 0000000..a1e1d04 --- /dev/null +++ b/sync/src/main/scala/akka/serial/Parity.scala @@ -0,0 +1,9 @@ +package akka.serial + +/** Specifies available parities used in serial communication. */ +object Parity extends Enumeration { + type Parity = Value + val None = Value(0) + val Odd = Value(1) + val Even = Value(2) +} diff --git a/sync/src/main/scala/akka/serial/SerialSettings.scala b/sync/src/main/scala/akka/serial/SerialSettings.scala new file mode 100644 index 0000000..56345ae --- /dev/null +++ b/sync/src/main/scala/akka/serial/SerialSettings.scala @@ -0,0 +1,10 @@ +package akka.serial + +/** + * Groups settings used in communication over a serial port. + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + */ +case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/sync/src/main/scala/akka/serial/exceptions.scala b/sync/src/main/scala/akka/serial/exceptions.scala new file mode 100644 index 0000000..26dfceb --- /dev/null +++ b/sync/src/main/scala/akka/serial/exceptions.scala @@ -0,0 +1,19 @@ +package akka.serial + +/** The requested port could not be found. */ +class NoSuchPortException(message: String) extends Exception(message) + +/** The requested port is in use by someone else. */ +class PortInUseException(message: String) extends Exception(message) + +/** Permissions are not sufficient to open a serial port. */ +class AccessDeniedException(message: String) extends Exception(message) + +/** The settings specified are invalid. */ +class InvalidSettingsException(message: String) extends Exception(message) + +/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ +class PortInterruptedException(message: String) extends Exception(message) + +/** The specified port has been closed. */ +class PortClosedException(message: String) extends Exception(message) diff --git a/sync/src/main/scala/akka/serial/sync/SerialConnection.scala b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala new file mode 100644 index 0000000..7d80a4c --- /dev/null +++ b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala @@ -0,0 +1,142 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In + * contrast to the latter, this class encapsulates and secures any pointers used to communicate with + * the native backend and is thread-safe. + * + * The underlying serial port is assumed open when this class is initialized. + */ +class SerialConnection private ( + unsafe: UnsafeSerial, + val port: String +) { + + private var reading: Boolean = false + private val readLock = new Object + + private var writing: Boolean = false + private val writeLock = new Object + + private val closed = new AtomicBoolean(false) + + /** + * Checks if this serial port is closed. + */ + def isClosed = closed.get() + + /** + * Closes the underlying serial connection. Any callers blocked on read or write will return. + * A call of this method has no effect if the serial port is already closed. + * @throws IOException on IO error + */ + def close(): Unit = this.synchronized { + if (!closed.get) { + closed.set(true) + unsafe.cancelRead() + readLock.synchronized { + while (reading) this.wait() + } + writeLock.synchronized { + while (writing) this.wait() + } + unsafe.close() + } + } + + /** + * Reads data from underlying serial connection into a ByteBuffer. + * Note that data is read into the buffer's memory, starting at the + * first position. The buffer's limit is set to the number of bytes + * read. + * + * A call to this method is blocking, however it is interrupted + * if the connection is closed. + * + * This method works only for direct buffers. + * + * @param buffer a ByteBuffer into which data is read + * @return the actual number of bytes read + * @throws PortInterruptedException if port is closed while reading + * @throws IOException on IO error + */ + def read(buffer: ByteBuffer): Int = readLock.synchronized { + if (!closed.get) { + try { + reading = true + val n = unsafe.read(buffer) + buffer.limit(n) + n + } finally { + reading = false + if (closed.get) readLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + + /** + * Writes data from a ByteBuffer to underlying serial connection. + * Note that data is read from the buffer's memory, its attributes + * such as position and limit are not modified. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * This method works only for direct buffers. + * + * @param buffer a ByteBuffer from which data is taken + * @return the actual number of bytes written + * @throws IOException on IO error + */ + def write(buffer: ByteBuffer): Int = writeLock.synchronized { + if (!closed.get) { + try { + writing = true + unsafe.write(buffer, buffer.position) + } finally { + writing = false + if (closed.get) writeLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + +} + +object SerialConnection { + + /** + * Opens a new connection to a serial port. + * This method acts as a factory to creating serial connections. + * + * @param port name of serial port to open + * @param settings settings with which to initialize the connection + * @return an instance of the open serial connection + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + def open( + port: String, + settings: SerialSettings + ): SerialConnection = synchronized { + val pointer = UnsafeSerial.open( + port, + settings.baud, + settings.characterSize, + settings.twoStopBits, + settings.parity.id + ) + new SerialConnection(new UnsafeSerial(pointer), port) + } + +} diff --git a/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala new file mode 100644 index 0000000..dcca960 --- /dev/null +++ b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala @@ -0,0 +1,109 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer + +import ch.jodersky.jni.nativeLoader + +/** + * Low-level wrapper of native serial backend. + * + * WARNING: Methods in this class allocate native structures and deal with pointers. These + * pointers are handled as longs by java and are NOT checked for correctness, therefore passing + * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. + * + * See SerialConnection for a higher-level, more secured wrapper + * of serial communication. + * + * @param serialAddr address of natively allocated serial configuration structure + */ +@nativeLoader("akkaserial1") +private[serial] class UnsafeSerial(final val serialAddr: Long) { + + final val ParityNone: Int = 0 + final val ParityOdd: Int = 1 + final val ParityEven: Int = 2 + + /** + * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only + * read into the buffer's allocated memory, its position or limit are not changed. + * + * The read is blocking, however it may be interrupted by calling cancelRead() on the given + * serial port. + * + * @param buffer direct ByteBuffer to read into + * @return number of bytes actually read + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws PortInterruptedException if the call to this function was interrupted + * @throws IOException on IO error + */ + @native def read(buffer: ByteBuffer): Int + + /** + * Cancels a read (any caller to read or readDirect will return with a + * PortInterruptedException). This function may be called from any thread. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def cancelRead(): Unit + + /** + * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is + * only taken from the buffer's allocated memory, its position or limit are not changed. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * @param serial address of natively allocated serial configuration structure + * @param buffer direct ByteBuffer from which data is taken + * @param length actual amount of data that should be taken from the buffer (this is needed since the native + * backend does not provide a way to query the buffer's current limit) + * @return number of bytes actually written + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws IOException on IO error + */ + @native def write(buffer: ByteBuffer, length: Int): Int + + /** + * Closes an previously open serial port. Natively allocated resources are freed and the serial + * pointer becomes invalid, therefore this function should only be called ONCE per open serial + * port. + * + * A port should not be closed while it is used (by a read or write) as this + * results in undefined behaviour. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def close(): Unit + +} + +private[serial] object UnsafeSerial { + + /** + * Opens a serial port. + * + * @param port name of serial port to open + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + * @return address of natively allocated serial configuration structure + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + @native def debug(value: Boolean): Unit + +} diff --git a/sync/src/test/scala/akka/serial/PseudoTerminal.scala b/sync/src/test/scala/akka/serial/PseudoTerminal.scala new file mode 100644 index 0000000..3e9e9fe --- /dev/null +++ b/sync/src/test/scala/akka/serial/PseudoTerminal.scala @@ -0,0 +1,43 @@ +package akka.serial + +import java.io.{File, IOException} +import java.nio.file.Files + +import scala.concurrent.duration._ +import scala.sys.process._ +import scala.util.control.NonFatal + +trait PseudoTerminal { + + final val SetupTimeout = 100.milliseconds + + def withEcho[A](action: (String, SerialSettings) => A): A = { + val dir = Files.createTempDirectory("akka-serial-pty").toFile + val pty = new File(dir, "pty") + + val socat = try { + val s = Seq( + "socat", + "-d -d", + s"exec:cat,pty,raw,b115200,echo=0", + s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}" + ).run(ProcessLogger(println(_)), false) + Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up + s + } catch { + case NonFatal(ex) => + throw new IOException( + "Error running echo service, make sure the program 'socat' is installed", ex) + } + + try { + val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200)) + Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys + result + } finally { + socat.destroy() + dir.delete() + } + } + +} diff --git a/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala new file mode 100644 index 0000000..24069d7 --- /dev/null +++ b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala @@ -0,0 +1,101 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer +import org.scalatest._ + +class SerialConnectionSpec extends WordSpec with PseudoTerminal { + + def withEchoConnection[A](action: SerialConnection => A): A = { + withEcho { (port, settings) => + val connection = SerialConnection.open(port, settings) + try { + action(connection) + } finally { + connection.close() + } + } + } + + "A SerialConnection" should { + + "open a valid port" in { + withEcho { (port, settings) => + SerialConnection.open(port, settings) + } + } + + "throw an exception on an invalid port" in { + val settings = SerialSettings(baud = 115200) + intercept[NoSuchPortException] { + SerialConnection.open("/dev/nonexistant", settings) + } + } + + "read the same data it writes to an echo pty" in { + withEchoConnection { conn => + /* Note: this test assumes that all data will be written and read + * within single write and read calls. This in turn assumes that + * internal operating system buffers have enough capacity to + * store all data. */ + val bufferSize = 64 + + val outString = "hello world" + val outBuffer = ByteBuffer.allocateDirect(bufferSize) + val outData = outString.getBytes + outBuffer.put(outData) + conn.write(outBuffer) + + val inBuffer = ByteBuffer.allocateDirect(bufferSize) + conn.read(inBuffer) + val inData = new Array[Byte](inBuffer.remaining()) + inBuffer.get(inData) + val inString = new String(inData) + + assert(inString == outString) + } + } + + "interrupt a read when closing a port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + + val closer = new Thread { + override def run(): Unit = { + Thread.sleep(100) + conn.close() + } + } + closer.start() + intercept[PortInterruptedException]{ + conn.read(buffer) + } + closer.join() + } + } + + "throw an exception when reading from a closed port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + conn.close() + + intercept[PortClosedException]{ + conn.read(buffer) + } + } + } + + "throw an exception when writing to a closed port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + conn.close() + + intercept[PortClosedException]{ + conn.write(buffer) + } + } + } + + } + +} |