aboutsummaryrefslogtreecommitdiff
path: root/sync
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-08 21:16:25 +0100
committerJakob Odersky <jakob@odersky.com>2017-01-21 17:22:10 -0800
commit23959966760174477a6b0fcbf9dd1e8ef37c643b (patch)
tree9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /sync
parent6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff)
downloadakka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip
Rename project to akka-serial
Diffstat (limited to 'sync')
-rw-r--r--sync/build.sbt5
-rw-r--r--sync/src/main/scala/akka/serial/Parity.scala9
-rw-r--r--sync/src/main/scala/akka/serial/SerialSettings.scala10
-rw-r--r--sync/src/main/scala/akka/serial/exceptions.scala19
-rw-r--r--sync/src/main/scala/akka/serial/sync/SerialConnection.scala142
-rw-r--r--sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala109
-rw-r--r--sync/src/test/scala/akka/serial/PseudoTerminal.scala43
-rw-r--r--sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala101
8 files changed, 438 insertions, 0 deletions
diff --git a/sync/build.sbt b/sync/build.sbt
new file mode 100644
index 0000000..547a3fd
--- /dev/null
+++ b/sync/build.sbt
@@ -0,0 +1,5 @@
+import akkaserial.Dependencies
+
+libraryDependencies += Dependencies.scalatest % "test"
+
+target in javah := (baseDirectory in ThisBuild).value / "native" / "src" / "include"
diff --git a/sync/src/main/scala/akka/serial/Parity.scala b/sync/src/main/scala/akka/serial/Parity.scala
new file mode 100644
index 0000000..a1e1d04
--- /dev/null
+++ b/sync/src/main/scala/akka/serial/Parity.scala
@@ -0,0 +1,9 @@
+package akka.serial
+
+/** Specifies available parities used in serial communication. */
+object Parity extends Enumeration {
+ type Parity = Value
+ val None = Value(0)
+ val Odd = Value(1)
+ val Even = Value(2)
+}
diff --git a/sync/src/main/scala/akka/serial/SerialSettings.scala b/sync/src/main/scala/akka/serial/SerialSettings.scala
new file mode 100644
index 0000000..56345ae
--- /dev/null
+++ b/sync/src/main/scala/akka/serial/SerialSettings.scala
@@ -0,0 +1,10 @@
+package akka.serial
+
+/**
+ * Groups settings used in communication over a serial port.
+ * @param baud baud rate to use with serial port
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ */
+case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None)
diff --git a/sync/src/main/scala/akka/serial/exceptions.scala b/sync/src/main/scala/akka/serial/exceptions.scala
new file mode 100644
index 0000000..26dfceb
--- /dev/null
+++ b/sync/src/main/scala/akka/serial/exceptions.scala
@@ -0,0 +1,19 @@
+package akka.serial
+
+/** The requested port could not be found. */
+class NoSuchPortException(message: String) extends Exception(message)
+
+/** The requested port is in use by someone else. */
+class PortInUseException(message: String) extends Exception(message)
+
+/** Permissions are not sufficient to open a serial port. */
+class AccessDeniedException(message: String) extends Exception(message)
+
+/** The settings specified are invalid. */
+class InvalidSettingsException(message: String) extends Exception(message)
+
+/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */
+class PortInterruptedException(message: String) extends Exception(message)
+
+/** The specified port has been closed. */
+class PortClosedException(message: String) extends Exception(message)
diff --git a/sync/src/main/scala/akka/serial/sync/SerialConnection.scala b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala
new file mode 100644
index 0000000..7d80a4c
--- /dev/null
+++ b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala
@@ -0,0 +1,142 @@
+package akka.serial
+package sync
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
+
+/**
+ * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In
+ * contrast to the latter, this class encapsulates and secures any pointers used to communicate with
+ * the native backend and is thread-safe.
+ *
+ * The underlying serial port is assumed open when this class is initialized.
+ */
+class SerialConnection private (
+ unsafe: UnsafeSerial,
+ val port: String
+) {
+
+ private var reading: Boolean = false
+ private val readLock = new Object
+
+ private var writing: Boolean = false
+ private val writeLock = new Object
+
+ private val closed = new AtomicBoolean(false)
+
+ /**
+ * Checks if this serial port is closed.
+ */
+ def isClosed = closed.get()
+
+ /**
+ * Closes the underlying serial connection. Any callers blocked on read or write will return.
+ * A call of this method has no effect if the serial port is already closed.
+ * @throws IOException on IO error
+ */
+ def close(): Unit = this.synchronized {
+ if (!closed.get) {
+ closed.set(true)
+ unsafe.cancelRead()
+ readLock.synchronized {
+ while (reading) this.wait()
+ }
+ writeLock.synchronized {
+ while (writing) this.wait()
+ }
+ unsafe.close()
+ }
+ }
+
+ /**
+ * Reads data from underlying serial connection into a ByteBuffer.
+ * Note that data is read into the buffer's memory, starting at the
+ * first position. The buffer's limit is set to the number of bytes
+ * read.
+ *
+ * A call to this method is blocking, however it is interrupted
+ * if the connection is closed.
+ *
+ * This method works only for direct buffers.
+ *
+ * @param buffer a ByteBuffer into which data is read
+ * @return the actual number of bytes read
+ * @throws PortInterruptedException if port is closed while reading
+ * @throws IOException on IO error
+ */
+ def read(buffer: ByteBuffer): Int = readLock.synchronized {
+ if (!closed.get) {
+ try {
+ reading = true
+ val n = unsafe.read(buffer)
+ buffer.limit(n)
+ n
+ } finally {
+ reading = false
+ if (closed.get) readLock.notify()
+ }
+ } else {
+ throw new PortClosedException(s"${port} is closed")
+ }
+ }
+
+ /**
+ * Writes data from a ByteBuffer to underlying serial connection.
+ * Note that data is read from the buffer's memory, its attributes
+ * such as position and limit are not modified.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * This method works only for direct buffers.
+ *
+ * @param buffer a ByteBuffer from which data is taken
+ * @return the actual number of bytes written
+ * @throws IOException on IO error
+ */
+ def write(buffer: ByteBuffer): Int = writeLock.synchronized {
+ if (!closed.get) {
+ try {
+ writing = true
+ unsafe.write(buffer, buffer.position)
+ } finally {
+ writing = false
+ if (closed.get) writeLock.notify()
+ }
+ } else {
+ throw new PortClosedException(s"${port} is closed")
+ }
+ }
+
+}
+
+object SerialConnection {
+
+ /**
+ * Opens a new connection to a serial port.
+ * This method acts as a factory to creating serial connections.
+ *
+ * @param port name of serial port to open
+ * @param settings settings with which to initialize the connection
+ * @return an instance of the open serial connection
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ def open(
+ port: String,
+ settings: SerialSettings
+ ): SerialConnection = synchronized {
+ val pointer = UnsafeSerial.open(
+ port,
+ settings.baud,
+ settings.characterSize,
+ settings.twoStopBits,
+ settings.parity.id
+ )
+ new SerialConnection(new UnsafeSerial(pointer), port)
+ }
+
+}
diff --git a/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala
new file mode 100644
index 0000000..dcca960
--- /dev/null
+++ b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala
@@ -0,0 +1,109 @@
+package akka.serial
+package sync
+
+import java.nio.ByteBuffer
+
+import ch.jodersky.jni.nativeLoader
+
+/**
+ * Low-level wrapper of native serial backend.
+ *
+ * WARNING: Methods in this class allocate native structures and deal with pointers. These
+ * pointers are handled as longs by java and are NOT checked for correctness, therefore passing
+ * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM.
+ *
+ * See SerialConnection for a higher-level, more secured wrapper
+ * of serial communication.
+ *
+ * @param serialAddr address of natively allocated serial configuration structure
+ */
+@nativeLoader("akkaserial1")
+private[serial] class UnsafeSerial(final val serialAddr: Long) {
+
+ final val ParityNone: Int = 0
+ final val ParityOdd: Int = 1
+ final val ParityEven: Int = 2
+
+ /**
+ * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only
+ * read into the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given
+ * serial port.
+ *
+ * @param buffer direct ByteBuffer to read into
+ * @return number of bytes actually read
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ @native def read(buffer: ByteBuffer): Int
+
+ /**
+ * Cancels a read (any caller to read or readDirect will return with a
+ * PortInterruptedException). This function may be called from any thread.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ @native def cancelRead(): Unit
+
+ /**
+ * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is
+ * only taken from the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer from which data is taken
+ * @param length actual amount of data that should be taken from the buffer (this is needed since the native
+ * backend does not provide a way to query the buffer's current limit)
+ * @return number of bytes actually written
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws IOException on IO error
+ */
+ @native def write(buffer: ByteBuffer, length: Int): Int
+
+ /**
+ * Closes an previously open serial port. Natively allocated resources are freed and the serial
+ * pointer becomes invalid, therefore this function should only be called ONCE per open serial
+ * port.
+ *
+ * A port should not be closed while it is used (by a read or write) as this
+ * results in undefined behaviour.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ @native def close(): Unit
+
+}
+
+private[serial] object UnsafeSerial {
+
+ /**
+ * Opens a serial port.
+ *
+ * @param port name of serial port to open
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ * @return address of natively allocated serial configuration structure
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ @native def debug(value: Boolean): Unit
+
+}
diff --git a/sync/src/test/scala/akka/serial/PseudoTerminal.scala b/sync/src/test/scala/akka/serial/PseudoTerminal.scala
new file mode 100644
index 0000000..3e9e9fe
--- /dev/null
+++ b/sync/src/test/scala/akka/serial/PseudoTerminal.scala
@@ -0,0 +1,43 @@
+package akka.serial
+
+import java.io.{File, IOException}
+import java.nio.file.Files
+
+import scala.concurrent.duration._
+import scala.sys.process._
+import scala.util.control.NonFatal
+
+trait PseudoTerminal {
+
+ final val SetupTimeout = 100.milliseconds
+
+ def withEcho[A](action: (String, SerialSettings) => A): A = {
+ val dir = Files.createTempDirectory("akka-serial-pty").toFile
+ val pty = new File(dir, "pty")
+
+ val socat = try {
+ val s = Seq(
+ "socat",
+ "-d -d",
+ s"exec:cat,pty,raw,b115200,echo=0",
+ s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}"
+ ).run(ProcessLogger(println(_)), false)
+ Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up
+ s
+ } catch {
+ case NonFatal(ex) =>
+ throw new IOException(
+ "Error running echo service, make sure the program 'socat' is installed", ex)
+ }
+
+ try {
+ val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200))
+ Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys
+ result
+ } finally {
+ socat.destroy()
+ dir.delete()
+ }
+ }
+
+}
diff --git a/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala
new file mode 100644
index 0000000..24069d7
--- /dev/null
+++ b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala
@@ -0,0 +1,101 @@
+package akka.serial
+package sync
+
+import java.nio.ByteBuffer
+import org.scalatest._
+
+class SerialConnectionSpec extends WordSpec with PseudoTerminal {
+
+ def withEchoConnection[A](action: SerialConnection => A): A = {
+ withEcho { (port, settings) =>
+ val connection = SerialConnection.open(port, settings)
+ try {
+ action(connection)
+ } finally {
+ connection.close()
+ }
+ }
+ }
+
+ "A SerialConnection" should {
+
+ "open a valid port" in {
+ withEcho { (port, settings) =>
+ SerialConnection.open(port, settings)
+ }
+ }
+
+ "throw an exception on an invalid port" in {
+ val settings = SerialSettings(baud = 115200)
+ intercept[NoSuchPortException] {
+ SerialConnection.open("/dev/nonexistant", settings)
+ }
+ }
+
+ "read the same data it writes to an echo pty" in {
+ withEchoConnection { conn =>
+ /* Note: this test assumes that all data will be written and read
+ * within single write and read calls. This in turn assumes that
+ * internal operating system buffers have enough capacity to
+ * store all data. */
+ val bufferSize = 64
+
+ val outString = "hello world"
+ val outBuffer = ByteBuffer.allocateDirect(bufferSize)
+ val outData = outString.getBytes
+ outBuffer.put(outData)
+ conn.write(outBuffer)
+
+ val inBuffer = ByteBuffer.allocateDirect(bufferSize)
+ conn.read(inBuffer)
+ val inData = new Array[Byte](inBuffer.remaining())
+ inBuffer.get(inData)
+ val inString = new String(inData)
+
+ assert(inString == outString)
+ }
+ }
+
+ "interrupt a read when closing a port" in {
+ withEchoConnection { conn =>
+ val buffer = ByteBuffer.allocateDirect(64)
+
+ val closer = new Thread {
+ override def run(): Unit = {
+ Thread.sleep(100)
+ conn.close()
+ }
+ }
+ closer.start()
+ intercept[PortInterruptedException]{
+ conn.read(buffer)
+ }
+ closer.join()
+ }
+ }
+
+ "throw an exception when reading from a closed port" in {
+ withEchoConnection { conn =>
+ val buffer = ByteBuffer.allocateDirect(64)
+ conn.close()
+
+ intercept[PortClosedException]{
+ conn.read(buffer)
+ }
+ }
+ }
+
+ "throw an exception when writing to a closed port" in {
+ withEchoConnection { conn =>
+ val buffer = ByteBuffer.allocateDirect(64)
+ conn.close()
+
+ intercept[PortClosedException]{
+ conn.write(buffer)
+ }
+ }
+ }
+
+ }
+
+}