From 5acd9c7939381606b438a22565d860facc57ef2b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 8 Jan 2017 21:16:25 +0100 Subject: Refactor project layout --- Documentation/manual.md | 6 +- build.sbt | 20 +- core/build.sbt | 7 + .../scala/ch/jodersky/akka/serial/Parity.scala | 9 + .../scala/ch/jodersky/akka/serial/Serial.scala | 132 +++++++ .../ch/jodersky/akka/serial/SerialConnection.scala | 140 +++++++ .../scala/ch/jodersky/akka/serial/SerialExt.scala | 9 + .../ch/jodersky/akka/serial/SerialManager.scala | 47 +++ .../ch/jodersky/akka/serial/SerialOperator.scala | 83 ++++ .../ch/jodersky/akka/serial/SerialSettings.scala | 10 + .../ch/jodersky/akka/serial/UnsafeSerial.scala | 108 ++++++ .../scala/ch/jodersky/akka/serial/Watcher.scala | 143 +++++++ .../scala/ch/jodersky/akka/serial/exceptions.scala | 19 + .../scala/ch/jodersky/flow/PseudoTerminal.scala | 43 +++ .../scala/ch/jodersky/flow/SerialManagerSpec.scala | 38 ++ .../ch/jodersky/flow/SerialOperatorSpec.scala | 49 +++ flow-core/build.sbt | 7 - .../src/main/scala/ch/jodersky/flow/Parity.scala | 9 - .../src/main/scala/ch/jodersky/flow/Serial.scala | 132 ------- .../scala/ch/jodersky/flow/SerialConnection.scala | 140 ------- .../main/scala/ch/jodersky/flow/SerialExt.scala | 9 - .../scala/ch/jodersky/flow/SerialManager.scala | 47 --- .../scala/ch/jodersky/flow/SerialOperator.scala | 83 ---- .../scala/ch/jodersky/flow/SerialSettings.scala | 10 - .../main/scala/ch/jodersky/flow/UnsafeSerial.scala | 108 ------ .../src/main/scala/ch/jodersky/flow/Watcher.scala | 143 ------- .../main/scala/ch/jodersky/flow/exceptions.scala | 19 - .../scala/ch/jodersky/flow/PseudoTerminal.scala | 43 --- .../scala/ch/jodersky/flow/SerialManagerSpec.scala | 38 -- .../ch/jodersky/flow/SerialOperatorSpec.scala | 49 --- flow-native/build.sbt | 8 - flow-native/lib_native/armv7l-linux/libflow4.so | Bin 12852 -> 0 bytes flow-native/lib_native/i686-linux/libflow4.so | Bin 14735 -> 0 bytes .../lib_native/x86_64-darwin/libflow4.dylib | Bin 18336 -> 0 bytes flow-native/lib_native/x86_64-linux/libflow4.so | Bin 16791 -> 0 bytes flow-native/src/.gitignore | 12 - flow-native/src/CMakeLists.txt | 46 --- flow-native/src/flow_jni.c | 150 -------- .../src/include/ch_jodersky_flow_UnsafeSerial.h | 45 --- .../src/include/ch_jodersky_flow_UnsafeSerial__.h | 29 -- flow-native/src/include/flow.h | 103 ----- flow-native/src/platform/posix/flow.c | 263 ------------- flow-native/src/platform/windows/README | 1 - flow-native/src/platform/windows/flow.c.disabled | 416 --------------------- flow-native/src/readme.md | 3 - flow-samples/README.md | 3 - .../flow/samples/terminalstream/Main.scala | 66 ---- .../flow/samples/terminal/ConsoleReader.scala | 29 -- .../ch/jodersky/flow/samples/terminal/Main.scala | 30 -- .../jodersky/flow/samples/terminal/Terminal.scala | 75 ---- .../ch/jodersky/flow/samples/watcher/main.scala | 50 --- flow-stream/build.sbt | 5 - .../scala/ch/jodersky/flow/stream/Serial.scala | 67 ---- .../flow/stream/StreamSerialException.scala | 5 - .../flow/stream/StreamWatcherException.scala | 4 - .../flow/stream/impl/SerialConnectionLogic.scala | 172 --------- .../flow/stream/impl/SerialConnectionStage.scala | 49 --- .../jodersky/flow/stream/impl/WatcherLogic.scala | 65 ---- .../jodersky/flow/stream/impl/WatcherStage.scala | 38 -- .../scala/ch/jodersky/flow/stream/SerialSpec.scala | 51 --- native/build.sbt | 8 + native/lib_native/armv7l-linux/libflow4.so | Bin 0 -> 12852 bytes native/lib_native/i686-linux/libflow4.so | Bin 0 -> 14735 bytes native/lib_native/x86_64-darwin/libflow4.dylib | Bin 0 -> 18336 bytes native/lib_native/x86_64-linux/libflow4.so | Bin 0 -> 16791 bytes native/src/.gitignore | 12 + native/src/CMakeLists.txt | 46 +++ native/src/flow_jni.c | 150 ++++++++ native/src/include/ch_jodersky_flow_UnsafeSerial.h | 45 +++ .../src/include/ch_jodersky_flow_UnsafeSerial__.h | 29 ++ native/src/include/flow.h | 103 +++++ native/src/platform/posix/flow.c | 263 +++++++++++++ native/src/platform/windows/README | 1 + native/src/platform/windows/flow.c.disabled | 416 +++++++++++++++++++++ native/src/readme.md | 3 + samples/README.md | 3 + .../flow/samples/terminalstream/Main.scala | 66 ++++ .../flow/samples/terminal/ConsoleReader.scala | 29 ++ .../ch/jodersky/flow/samples/terminal/Main.scala | 30 ++ .../jodersky/flow/samples/terminal/Terminal.scala | 75 ++++ .../ch/jodersky/flow/samples/watcher/main.scala | 50 +++ stream/build.sbt | 5 + .../scala/ch/jodersky/flow/stream/Serial.scala | 67 ++++ .../flow/stream/StreamSerialException.scala | 5 + .../flow/stream/StreamWatcherException.scala | 4 + .../flow/stream/impl/SerialConnectionLogic.scala | 172 +++++++++ .../flow/stream/impl/SerialConnectionStage.scala | 49 +++ .../jodersky/flow/stream/impl/WatcherLogic.scala | 65 ++++ .../jodersky/flow/stream/impl/WatcherStage.scala | 38 ++ .../scala/ch/jodersky/flow/stream/SerialSpec.scala | 51 +++ 90 files changed, 2635 insertions(+), 2635 deletions(-) create mode 100644 core/build.sbt create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/Parity.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/Serial.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/SerialConnection.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/SerialExt.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/SerialManager.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/SerialOperator.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/SerialSettings.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/UnsafeSerial.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/Watcher.scala create mode 100644 core/src/main/scala/ch/jodersky/akka/serial/exceptions.scala create mode 100644 core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala create mode 100644 core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala create mode 100644 core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala delete mode 100644 flow-core/build.sbt delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Parity.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Serial.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala delete mode 100644 flow-native/build.sbt delete mode 100755 flow-native/lib_native/armv7l-linux/libflow4.so delete mode 100644 flow-native/lib_native/i686-linux/libflow4.so delete mode 100644 flow-native/lib_native/x86_64-darwin/libflow4.dylib delete mode 100755 flow-native/lib_native/x86_64-linux/libflow4.so delete mode 100644 flow-native/src/.gitignore delete mode 100644 flow-native/src/CMakeLists.txt delete mode 100644 flow-native/src/flow_jni.c delete mode 100644 flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h delete mode 100644 flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h delete mode 100644 flow-native/src/include/flow.h delete mode 100644 flow-native/src/platform/posix/flow.c delete mode 100644 flow-native/src/platform/windows/README delete mode 100644 flow-native/src/platform/windows/flow.c.disabled delete mode 100644 flow-native/src/readme.md delete mode 100644 flow-samples/README.md delete mode 100644 flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala delete mode 100644 flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala delete mode 100644 flow-stream/build.sbt delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala delete mode 100644 flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala create mode 100644 native/build.sbt create mode 100755 native/lib_native/armv7l-linux/libflow4.so create mode 100644 native/lib_native/i686-linux/libflow4.so create mode 100644 native/lib_native/x86_64-darwin/libflow4.dylib create mode 100755 native/lib_native/x86_64-linux/libflow4.so create mode 100644 native/src/.gitignore create mode 100644 native/src/CMakeLists.txt create mode 100644 native/src/flow_jni.c create mode 100644 native/src/include/ch_jodersky_flow_UnsafeSerial.h create mode 100644 native/src/include/ch_jodersky_flow_UnsafeSerial__.h create mode 100644 native/src/include/flow.h create mode 100644 native/src/platform/posix/flow.c create mode 100644 native/src/platform/windows/README create mode 100644 native/src/platform/windows/flow.c.disabled create mode 100644 native/src/readme.md create mode 100644 samples/README.md create mode 100644 samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala create mode 100644 samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala create mode 100644 samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala create mode 100644 samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala create mode 100644 samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala create mode 100644 stream/build.sbt create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala create mode 100644 stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala create mode 100644 stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala diff --git a/Documentation/manual.md b/Documentation/manual.md index d1bd335..0a65e42 100644 --- a/Documentation/manual.md +++ b/Documentation/manual.md @@ -57,7 +57,7 @@ Flow's API follows that of an actor based system, where each actor is assigned s 2. Serial "operators". Operators are created once per open serial port and serve as an intermediate between client code and native code dealing with serial data transmission and reception. They isolate the user from threading issues and enable the reactive dispatch of incoming data. A serial operator is said to be "associated" to its underlying open serial port. -The messages understood by flow's actors are all contained in the `ch.jodersky.flow.Serial` object. They are well documented and should serve as the entry point when searching the API documentation. +The messages understood by flow's actors are all contained in the `ch.jodersky.akka.serial.Serial` object. They are well documented and should serve as the entry point when searching the API documentation. ## Opening a Port A serial port is opened by sending an `Open` message to the serial manager. The response varies on the outcome of opening the underlying serial port. @@ -67,7 +67,7 @@ A serial port is opened by sending an `Open` message to the serial manager. The 2. In case of success, the sender is notified with an `Opened` message. This message is sent from an operator actor, spawned by the serial manager. It is useful to capture the sender (i.e. the operator) of this message as all further communication with the newly opened port must pass through the operator. ~~~scala -import ch.jodersky.flow.{ Serial, SerialSettings, AccessDeniedException } +import ch.jodersky.akka.serial.{ Serial, SerialSettings, AccessDeniedException } val port = "/dev/ttyXXX" val settings = SerialSettings( @@ -189,7 +189,7 @@ Flow provides support for Akka streams and thus can be interfaced with reactive- libraryDependencies += "ch.jodersky" %% "flow-stream" % "@version@" ~~~ -The main entry point for serial streaming is `ch.jodersky.flow.stream.Serial`. It's API is also well documented and should serve as the starting point when searching documentation on serial streaming. +The main entry point for serial streaming is `ch.jodersky.akka.serial.stream.Serial`. Its API is also well documented and should serve as the starting point when searching documentation on serial streaming. ## Opening a Port Connection is established by materializing a `Flow[ByteString, ByteString, Future[Connection]]` obtained by calling `Serial().open()` diff --git a/build.sbt b/build.sbt index 50d9f8a..0fe8e5c 100644 --- a/build.sbt +++ b/build.sbt @@ -36,24 +36,24 @@ pomExtra in ThisBuild := { lazy val root = (project in file(".")) .aggregate(core, native, stream) -lazy val core = (project in file("flow-core")) - .settings(name := "flow-core") +lazy val core = (project in file("core")) + .settings(name := "akka-serial-core") .dependsOn(native % "test->runtime") -lazy val native = (project in file("flow-native")) - .settings(name := "flow-native") +lazy val native = (project in file("native")) + .settings(name := "akka-serial-native") -lazy val stream = (project in file("flow-stream")) - .settings(name := "flow-stream") +lazy val stream = (project in file("stream")) + .settings(name := "akka-serial-stream") .dependsOn(core, core % "test->test", native % "test->runtime") -lazy val samplesTerminal = (project in file("flow-samples") / "terminal") +lazy val samplesTerminal = (project in file("samples") / "terminal") .dependsOn(core, native % Runtime) -lazy val samplesTerminalStream = (project in file("flow-samples") / "terminal-stream") +lazy val samplesTerminalStream = (project in file("samples") / "terminal-stream") .dependsOn(stream, native % Runtime) -lazy val samplesWatcher = (project in file("flow-samples") / "watcher") +lazy val samplesWatcher = (project in file("samples") / "watcher") .dependsOn(core, native % Runtime) // Root project settings @@ -83,7 +83,7 @@ scalacOptions in (ScalaUnidoc, doc) ++= Seq( ) ++ { val latestTag: String = "git describe --abbrev=0 --match v[0-9].*".!! Opts.doc.sourceUrl( - s"https://github.com/jodersky/flow/blob/$latestTag€{FILE_PATH}.scala" + s"https://github.com/jodersky/akka-serial/blob/$latestTag€{FILE_PATH}.scala" ) } siteMappings ++= (mappings in (ScalaUnidoc, packageDoc)).value.map{ case (file, path) => diff --git a/core/build.sbt b/core/build.sbt new file mode 100644 index 0000000..fdfcbab --- /dev/null +++ b/core/build.sbt @@ -0,0 +1,7 @@ +import flow.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaTestKit % "test" +libraryDependencies += Dependencies.scalatest % "test" + +target in javah := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include" diff --git a/core/src/main/scala/ch/jodersky/akka/serial/Parity.scala b/core/src/main/scala/ch/jodersky/akka/serial/Parity.scala new file mode 100644 index 0000000..4e90744 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/Parity.scala @@ -0,0 +1,9 @@ +package ch.jodersky.akka.serial + +/** Specifies available parities used in serial communication. */ +object Parity extends Enumeration { + type Parity = Value + val None = Value(0) + val Odd = Value(1) + val Even = Value(2) +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/Serial.scala b/core/src/main/scala/ch/jodersky/akka/serial/Serial.scala new file mode 100644 index 0000000..889d3b0 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/Serial.scala @@ -0,0 +1,132 @@ +package ch.jodersky.akka.serial + +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by flow's serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + /** Base trait for any flow-related messages. */ + sealed trait Message + + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ + trait Command extends Message + + /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ + trait Event extends Message + + /** A command has failed. */ + case class CommandFailed(command: Command, reason: Throwable) extends Event + + /** + * Open a new serial port. + * + * Send this command to the serial manager to request the opening of a serial port. The manager will + * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. + * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. + * + * In case the port is successfully opened, the operator will respond with an `Opened` message. + * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. + * + * @param port name of serial port to open + * @param settings settings of serial port to open + * @param bufferSize maximum read and write buffer sizes + */ + case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command + + /** + * A port has been successfully opened. + * + * Event sent by a port operator, indicating that a serial port was successfully opened. The sender + * of this message is the operator associated to the given serial port. + * + * @param port name of opened serial port + */ + case class Opened(port: String) extends Event + + /** + * Data has been received. + * + * Event sent by an operator, indicating that data was received on the operator's serial port. + * + * @param data data received on the port + */ + case class Received(data: ByteString) extends Event + + /** + * Write data to a serial port. + * + * Send this command to an operator to write the given data to its associated serial port. + * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. + * Note that a successful write does not guarantee the actual transmission of data through the serial port, + * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to + * be transmitted. + * + * @param data data to be written to port + * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment + * is a function 'number of bytes written => event') + */ + case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command + + /** + * Special type of acknowledgment that is not sent back. + */ + case object NoAck extends Function1[Int, Event] { + def apply(length: Int) = sys.error("cannot apply NoAck") + } + + /** + * Request closing of port. + * + * Send this command to an operator to close its associated port. The operator will respond + * with a `Closed` message upon closing the serial port. + */ + case object Close extends Command + + /** + * A port has been closed. + * + * Event sent from operator, indicating that its port has been closed. + */ + case object Closed extends Event + + /** + * Watch a directory for new ports. + * + * Send this command to the manager to get notifications when a new port (i.e. file) is created in + * the given directory. + * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. + * + * Note: the sender is also notified of currently existing ports. + * + * @param directory the directory to watch + * @param skipInitial don't get notified of already existing ports + * + * @see Unwatch + * @see Connected + */ + case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command + + /** + * Stop receiving notifications about a previously watched directory. + * + * @param directory the directory to unwatch + */ + case class Unwatch(directory: String = "/dev") extends Command + + /** + * A new port (i.e. file) has been detected. + * + * @param port the absolute file name of the connected port + */ + case class Connected(port: String) extends Event + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + def debug(value: Boolean) = UnsafeSerial.debug(value) + +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/SerialConnection.scala b/core/src/main/scala/ch/jodersky/akka/serial/SerialConnection.scala new file mode 100644 index 0000000..6481ee6 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/SerialConnection.scala @@ -0,0 +1,140 @@ +package ch.jodersky.akka.serial + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In + * contrast to the latter, this class encapsulates and secures any pointers used to communicate with + * the native backend and is thread-safe. + * + * The underlying serial port is assumed open when this class is initialized. + */ +private[serial] class SerialConnection private ( + unsafe: UnsafeSerial, + val port: String +) { + + private var reading: Boolean = false + private val readLock = new Object + + private var writing: Boolean = false + private val writeLock = new Object + + private val closed = new AtomicBoolean(false) + + /** + * Checks if this serial port is closed. + */ + def isClosed = closed.get() + + /** + * Closes the underlying serial connection. Any callers blocked on read or write will return. + * A call of this method has no effect if the serial port is already closed. + * @throws IOException on IO error + */ + def close(): Unit = this.synchronized { + if (!closed.get) { + closed.set(true) + unsafe.cancelRead() + readLock.synchronized { + while (reading) this.wait() + } + writeLock.synchronized { + while (writing) this.wait() + } + unsafe.close() + } + } + + /** + * Reads data from underlying serial connection into a ByteBuffer. + * Note that data is read into the buffer's memory, its attributes + * such as position and limit are not modified. + * + * A call to this method is blocking, however it is interrupted + * if the connection is closed. + * + * This method works for direct and indirect buffers but is optimized + * for the former. + * + * @param buffer a ByteBuffer into which data is read + * @return the actual number of bytes read + * @throws PortInterruptedException if port is closed while reading + * @throws IOException on IO error + */ + def read(buffer: ByteBuffer): Int = readLock.synchronized { + if (!closed.get) { + try { + reading = true + unsafe.read(buffer) + } finally { + reading = false + if (closed.get) readLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + + /** + * Writes data from a ByteBuffer to underlying serial connection. + * Note that data is read from the buffer's memory, its attributes + * such as position and limit are not modified. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * This method works for direct and indirect buffers but is optimized + * for the former. + * + * @param buffer a ByteBuffer from which data is taken + * @return the actual number of bytes written + * @throws IOException on IO error + */ + def write(buffer: ByteBuffer): Int = writeLock.synchronized { + if (!closed.get) { + try { + writing = true + unsafe.write(buffer, buffer.position) + } finally { + writing = false + if (closed.get) writeLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + +} + +private[serial] object SerialConnection { + + /** + * Opens a new connection to a serial port. + * This method acts as a factory to creating serial connections. + * + * @param port name of serial port to open + * @param settings settings with which to initialize the connection + * @return an instance of the open serial connection + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + def open( + port: String, + settings: SerialSettings + ): SerialConnection = synchronized { + val pointer = UnsafeSerial.open( + port, + settings.baud, + settings.characterSize, + settings.twoStopBits, + settings.parity.id + ) + new SerialConnection(new UnsafeSerial(pointer), port) + } + +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/SerialExt.scala b/core/src/main/scala/ch/jodersky/akka/serial/SerialExt.scala new file mode 100644 index 0000000..1b954cf --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/SerialExt.scala @@ -0,0 +1,9 @@ +package ch.jodersky.akka.serial + +import akka.actor.{ ExtendedActorSystem, Props } +import akka.io.IO + +/** Provides the serial IO manager. */ +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/SerialManager.scala b/core/src/main/scala/ch/jodersky/akka/serial/SerialManager.scala new file mode 100644 index 0000000..a6647cd --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/SerialManager.scala @@ -0,0 +1,47 @@ +package ch.jodersky.akka.serial + +import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } +import akka.actor.SupervisorStrategy.{ Escalate, Stop } +import scala.util.{ Failure, Success, Try } + +/** + * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to + * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. + * @see SerialOperator + */ +private[serial] class SerialManager extends Actor { + import SerialManager._ + import context._ + + override val supervisorStrategy = OneForOneStrategy() { + case _: Exception if sender == watcher => Escalate + case _: Exception => Stop + } + + private val watcher = actorOf(Watcher(self), "watcher") + + def receive = { + + case open @ Serial.Open(port, settings, bufferSize) => Try { + SerialConnection.open(port, settings) + } match { + case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) + case Failure(err) => sender ! Serial.CommandFailed(open, err) + } + + case w: Serial.Watch => watcher.forward(w) + + case u: Serial.Unwatch => watcher.forward(u) + + } + +} + +private[serial] object SerialManager { + + private def escapePortString(port: String) = port map { + case '/' => '-' + case c => c + } + +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/SerialOperator.scala b/core/src/main/scala/ch/jodersky/akka/serial/SerialOperator.scala new file mode 100644 index 0000000..1cfc703 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/SerialOperator.scala @@ -0,0 +1,83 @@ +package ch.jodersky.akka.serial + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import akka.util.ByteString +import java.nio.ByteBuffer + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +private[serial] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { + import SerialOperator._ + import context._ + + case class ReaderDied(ex: Throwable) + object Reader extends Thread { + val buffer = ByteBuffer.allocateDirect(bufferSize) + + def loop() = { + var stop = false + while (!connection.isClosed && !stop) { + try { + buffer.clear() + val length = connection.read(buffer) + buffer.limit(length) + val data = ByteString.fromByteBuffer(buffer) + client.tell(Serial.Received(data), self) + } catch { + // don't do anything if port is interrupted + case ex: PortInterruptedException => {} + + //stop and tell operator on other exception + case ex: Exception => + stop = true + self.tell(ReaderDied(ex), Actor.noSender) + } + } + } + + override def run() { + this.setName(s"serial-reader(${connection.port})") + loop() + } + + } + + val writeBuffer = ByteBuffer.allocateDirect(bufferSize) + + override def preStart() = { + context watch client + client ! Serial.Opened(connection.port) + Reader.start() + } + + override def receive: Receive = { + + case Serial.Write(data, ack) => + writeBuffer.clear() + data.copyToBuffer(writeBuffer) + val sent = connection.write(writeBuffer) + if (ack != Serial.NoAck) sender ! ack(sent) + + case Serial.Close => + client ! Serial.Closed + context stop self + + case Terminated(`client`) => + context stop self + + // go down with reader thread + case ReaderDied(ex) => throw ex + + } + + override def postStop() = { + connection.close() + } + +} + +private[serial] object SerialOperator { + def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/SerialSettings.scala b/core/src/main/scala/ch/jodersky/akka/serial/SerialSettings.scala new file mode 100644 index 0000000..e5ab797 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/SerialSettings.scala @@ -0,0 +1,10 @@ +package ch.jodersky.akka.serial + +/** + * Groups settings used in communication over a serial port. + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + */ +case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/core/src/main/scala/ch/jodersky/akka/serial/UnsafeSerial.scala b/core/src/main/scala/ch/jodersky/akka/serial/UnsafeSerial.scala new file mode 100644 index 0000000..91d8be6 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/UnsafeSerial.scala @@ -0,0 +1,108 @@ +package ch.jodersky.akka.serial + +import java.nio.ByteBuffer + +import ch.jodersky.jni.nativeLoader + +/** + * Low-level wrapper of native serial backend. + * + * WARNING: Methods in this class allocate native structures and deal with pointers. These + * pointers are handled as longs by java and are NOT checked for correctness, therefore passing + * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. + * + * See SerialConnection for a higher-level, more secured wrapper + * of serial communication. + * + * @param serialAddr address of natively allocated serial configuration structure + */ +@nativeLoader("flow4") +private[serial] class UnsafeSerial(final val serialAddr: Long) { + + final val ParityNone: Int = 0 + final val ParityOdd: Int = 1 + final val ParityEven: Int = 2 + + /** + * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only + * read into the buffer's allocated memory, its position or limit are not changed. + * + * The read is blocking, however it may be interrupted by calling cancelRead() on the given + * serial port. + * + * @param buffer direct ByteBuffer to read into + * @return number of bytes actually read + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws PortInterruptedException if the call to this function was interrupted + * @throws IOException on IO error + */ + @native def read(buffer: ByteBuffer): Int + + /** + * Cancels a read (any caller to read or readDirect will return with a + * PortInterruptedException). This function may be called from any thread. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def cancelRead(): Unit + + /** + * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is + * only taken from the buffer's allocated memory, its position or limit are not changed. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * @param serial address of natively allocated serial configuration structure + * @param buffer direct ByteBuffer from which data is taken + * @param length actual amount of data that should be taken from the buffer (this is needed since the native + * backend does not provide a way to query the buffer's current limit) + * @return number of bytes actually written + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws IOException on IO error + */ + @native def write(buffer: ByteBuffer, length: Int): Int + + /** + * Closes an previously open serial port. Natively allocated resources are freed and the serial + * pointer becomes invalid, therefore this function should only be called ONCE per open serial + * port. + * + * A port should not be closed while it is used (by a read or write) as this + * results in undefined behaviour. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def close(): Unit + +} + +private[serial] object UnsafeSerial { + + /** + * Opens a serial port. + * + * @param port name of serial port to open + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + * @return address of natively allocated serial configuration structure + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + @native def debug(value: Boolean): Unit + +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/Watcher.scala b/core/src/main/scala/ch/jodersky/akka/serial/Watcher.scala new file mode 100644 index 0000000..e079290 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/Watcher.scala @@ -0,0 +1,143 @@ +package ch.jodersky.akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } +import scala.util.{ Failure, Success, Try } + +private[serial] class Watcher(from: Option[ActorRef]) extends Actor { + + case class WatcherDied(reason: Throwable) + object WatcherThread extends Thread { + import Watcher.NewFile + + private val service = FileSystems.getDefault().newWatchService() + + def register(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + this.setName("serial-port-watcher") + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + self.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) + } + } + } + + def close() = service.close // causes the service to throw a ClosedWatchServiceException + } + + + // directory -> subscribers + private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] + + // directory -> watchkey + private val keys: Map[String, WatchKey] = Map.empty + + def subscribe(directory: String, client: ActorRef): WatchKey = { + val normal = Paths.get(directory).toAbsolutePath + val index = normal.toString + val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) + clients addBinding (index, client) + key + } + + def unsubscribe(directory: String, client: ActorRef): Unit = { + val index = Paths.get(directory).toAbsolutePath.toString + + clients removeBinding (index, sender) + + if (clients.get(index).isEmpty && keys.get(index).isDefined) { + keys(index).cancel() + keys -= index + } + } + + def reply(msg: Any, sender: ActorRef) = { + val origin = from match { + case Some(ref) => ref + case None => self + } + sender.tell(msg, origin) + } + + override def preStart() = { + WatcherThread.setDaemon(true) + WatcherThread.start() + } + + override def receive = { + + case w @ Serial.Watch(directory, skipInitial) => + val normalPath = Paths.get(directory).toAbsolutePath + val normal = normalPath.toString + + Try { + subscribe(directory, sender) + } match { + case Failure(err) => reply(Serial.CommandFailed(w, err), sender) + case Success(key) => + context watch sender + if (!skipInitial) { + Files.newDirectoryStream(normalPath) foreach { path => + if (!Files.isDirectory(path)) { + reply(Serial.Connected(path.toString), sender) + } + } + } + } + + case u @ Serial.Unwatch(directory) => + val normal = Paths.get(directory).toAbsolutePath.toString + + clients.removeBinding(normal, sender) + + if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { + keys(normal).cancel() + keys -= normal + } + + case Terminated(client) => + for ((directory, c) <- clients if c == client) { + unsubscribe(directory, client) + } + + case Watcher.NewFile(directory, file) => + val normal = directory.toAbsolutePath + val absFile = normal resolve file + clients.getOrElse(normal.toString, Set.empty) foreach { client => + reply(Serial.Connected(absFile.toString), client) + } + + case WatcherDied(err) => throw err // go down with watcher thread + + } + + override def postStop() = { + WatcherThread.close() + } + +} + +private[serial] object Watcher { + private case class NewFile(directory: Path, file: Path) + + def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) + +} diff --git a/core/src/main/scala/ch/jodersky/akka/serial/exceptions.scala b/core/src/main/scala/ch/jodersky/akka/serial/exceptions.scala new file mode 100644 index 0000000..47a59e4 --- /dev/null +++ b/core/src/main/scala/ch/jodersky/akka/serial/exceptions.scala @@ -0,0 +1,19 @@ +package ch.jodersky.akka.serial + +/** The requested port could not be found. */ +class NoSuchPortException(message: String) extends Exception(message) + +/** The requested port is in use by someone else. */ +class PortInUseException(message: String) extends Exception(message) + +/** Permissions are not sufficient to open a serial port. */ +class AccessDeniedException(message: String) extends Exception(message) + +/** The settings specified are invalid. */ +class InvalidSettingsException(message: String) extends Exception(message) + +/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ +class PortInterruptedException(message: String) extends Exception(message) + +/** The specified port has been closed. */ +class PortClosedException(message: String) extends Exception(message) diff --git a/core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala b/core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala new file mode 100644 index 0000000..86919d2 --- /dev/null +++ b/core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala @@ -0,0 +1,43 @@ +package ch.jodersky.akka.serial + +import java.io.{File, IOException} +import java.nio.file.Files + +import scala.concurrent.duration._ +import scala.sys.process._ +import scala.util.control.NonFatal + +trait PseudoTerminal { + + final val SetupTimeout = 100.milliseconds + + def withEcho[A](action: (String, SerialSettings) => A): A = { + val dir = Files.createTempDirectory("flow-pty").toFile + val pty = new File(dir, "pty") + + val socat = try { + val s = Seq( + "socat", + "-d -d", + s"exec:cat,pty,raw,b115200,echo=0", + s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}" + ).run(ProcessLogger(println(_)), false) + Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up + s + } catch { + case NonFatal(ex) => + throw new IOException( + "Error running echo service, make sure the program 'socat' is installed", ex) + } + + try { + val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200)) + Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys + result + } finally { + socat.destroy() + dir.delete() + } + } + +} diff --git a/core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala b/core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala new file mode 100644 index 0000000..cac07fa --- /dev/null +++ b/core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala @@ -0,0 +1,38 @@ +package ch.jodersky.akka.serial + +import akka.actor.ActorSystem +import akka.io.IO +import akka.testkit.{ImplicitSender, TestKit} +import org.scalatest._ + +class SerialManagerSpec + extends TestKit(ActorSystem("serial-manager")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "Serial manager" should { + val manager = IO(Serial) + + "open an existing port" in { + withEcho{ case (port, settings) => + manager ! Serial.Open(port, settings) + expectMsgType[Serial.Opened] + } + } + + "fail opening a non-existing port" in { + val cmd = Serial.Open("nonexistent", SerialSettings(115200)) + manager ! cmd + assert(expectMsgType[Serial.CommandFailed].command == cmd) + } + + } + +} diff --git a/core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala b/core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala new file mode 100644 index 0000000..5c9ca49 --- /dev/null +++ b/core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala @@ -0,0 +1,49 @@ +package ch.jodersky.akka.serial + +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit} +import akka.util.ByteString +import org.scalatest._ + +case class Ack(n: Int) extends Serial.Event + +class SerialOperatorSpec + extends TestKit(ActorSystem("serial-operator")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with SequentialNestedSuiteExecution + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + def withEchoOp[A](action: ActorRef => A): A = { + withEcho { case (port, settings) => + val connection = SerialConnection.open(port, settings) + val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor)) + action(operator) + } + } + + "Serial operator" should { + + "follow the correct protocol" in withEchoOp { op => + expectMsgType[Serial.Opened] + + val data = ByteString("hello world".getBytes("utf-8")) + op ! Serial.Write(data) + expectMsg(Serial.Received(data)) + + op ! Serial.Close + expectMsg(Serial.Closed) + + } + + } + +} diff --git a/flow-core/build.sbt b/flow-core/build.sbt deleted file mode 100644 index fdfcbab..0000000 --- a/flow-core/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -import flow.Dependencies - -libraryDependencies += Dependencies.akkaActor -libraryDependencies += Dependencies.akkaTestKit % "test" -libraryDependencies += Dependencies.scalatest % "test" - -target in javah := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include" diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala deleted file mode 100644 index 30596d2..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala +++ /dev/null @@ -1,9 +0,0 @@ -package ch.jodersky.flow - -/** Specifies available parities used in serial communication. */ -object Parity extends Enumeration { - type Parity = Value - val None = Value(0) - val Odd = Value(1) - val Even = Value(2) -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala deleted file mode 100644 index 43b1d19..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala +++ /dev/null @@ -1,132 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.ExtensionKey -import akka.util.ByteString - -/** Defines messages used by flow's serial IO layer. */ -object Serial extends ExtensionKey[SerialExt] { - - /** Base trait for any flow-related messages. */ - sealed trait Message - - /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ - trait Command extends Message - - /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ - trait Event extends Message - - /** A command has failed. */ - case class CommandFailed(command: Command, reason: Throwable) extends Event - - /** - * Open a new serial port. - * - * Send this command to the serial manager to request the opening of a serial port. The manager will - * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. - * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. - * - * In case the port is successfully opened, the operator will respond with an `Opened` message. - * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. - * - * @param port name of serial port to open - * @param settings settings of serial port to open - * @param bufferSize maximum read and write buffer sizes - */ - case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command - - /** - * A port has been successfully opened. - * - * Event sent by a port operator, indicating that a serial port was successfully opened. The sender - * of this message is the operator associated to the given serial port. - * - * @param port name of opened serial port - */ - case class Opened(port: String) extends Event - - /** - * Data has been received. - * - * Event sent by an operator, indicating that data was received on the operator's serial port. - * - * @param data data received on the port - */ - case class Received(data: ByteString) extends Event - - /** - * Write data to a serial port. - * - * Send this command to an operator to write the given data to its associated serial port. - * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. - * Note that a successful write does not guarantee the actual transmission of data through the serial port, - * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to - * be transmitted. - * - * @param data data to be written to port - * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment - * is a function 'number of bytes written => event') - */ - case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command - - /** - * Special type of acknowledgment that is not sent back. - */ - case object NoAck extends Function1[Int, Event] { - def apply(length: Int) = sys.error("cannot apply NoAck") - } - - /** - * Request closing of port. - * - * Send this command to an operator to close its associated port. The operator will respond - * with a `Closed` message upon closing the serial port. - */ - case object Close extends Command - - /** - * A port has been closed. - * - * Event sent from operator, indicating that its port has been closed. - */ - case object Closed extends Event - - /** - * Watch a directory for new ports. - * - * Send this command to the manager to get notifications when a new port (i.e. file) is created in - * the given directory. - * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. - * - * Note: the sender is also notified of currently existing ports. - * - * @param directory the directory to watch - * @param skipInitial don't get notified of already existing ports - * - * @see Unwatch - * @see Connected - */ - case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command - - /** - * Stop receiving notifications about a previously watched directory. - * - * @param directory the directory to unwatch - */ - case class Unwatch(directory: String = "/dev") extends Command - - /** - * A new port (i.e. file) has been detected. - * - * @param port the absolute file name of the connected port - */ - case class Connected(port: String) extends Event - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - def debug(value: Boolean) = UnsafeSerial.debug(value) - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala deleted file mode 100644 index 1cd1046..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala +++ /dev/null @@ -1,140 +0,0 @@ -package ch.jodersky.flow - -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicBoolean - -/** - * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In - * contrast to the latter, this class encapsulates and secures any pointers used to communicate with - * the native backend and is thread-safe. - * - * The underlying serial port is assumed open when this class is initialized. - */ -private[flow] class SerialConnection private ( - unsafe: UnsafeSerial, - val port: String -) { - - private var reading: Boolean = false - private val readLock = new Object - - private var writing: Boolean = false - private val writeLock = new Object - - private val closed = new AtomicBoolean(false) - - /** - * Checks if this serial port is closed. - */ - def isClosed = closed.get() - - /** - * Closes the underlying serial connection. Any callers blocked on read or write will return. - * A call of this method has no effect if the serial port is already closed. - * @throws IOException on IO error - */ - def close(): Unit = this.synchronized { - if (!closed.get) { - closed.set(true) - unsafe.cancelRead() - readLock.synchronized { - while (reading) this.wait() - } - writeLock.synchronized { - while (writing) this.wait() - } - unsafe.close() - } - } - - /** - * Reads data from underlying serial connection into a ByteBuffer. - * Note that data is read into the buffer's memory, its attributes - * such as position and limit are not modified. - * - * A call to this method is blocking, however it is interrupted - * if the connection is closed. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer into which data is read - * @return the actual number of bytes read - * @throws PortInterruptedException if port is closed while reading - * @throws IOException on IO error - */ - def read(buffer: ByteBuffer): Int = readLock.synchronized { - if (!closed.get) { - try { - reading = true - unsafe.read(buffer) - } finally { - reading = false - if (closed.get) readLock.notify() - } - } else { - throw new PortClosedException(s"${port} is closed") - } - } - - /** - * Writes data from a ByteBuffer to underlying serial connection. - * Note that data is read from the buffer's memory, its attributes - * such as position and limit are not modified. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer from which data is taken - * @return the actual number of bytes written - * @throws IOException on IO error - */ - def write(buffer: ByteBuffer): Int = writeLock.synchronized { - if (!closed.get) { - try { - writing = true - unsafe.write(buffer, buffer.position) - } finally { - writing = false - if (closed.get) writeLock.notify() - } - } else { - throw new PortClosedException(s"${port} is closed") - } - } - -} - -private[flow] object SerialConnection { - - /** - * Opens a new connection to a serial port. - * This method acts as a factory to creating serial connections. - * - * @param port name of serial port to open - * @param settings settings with which to initialize the connection - * @return an instance of the open serial connection - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - def open( - port: String, - settings: SerialSettings - ): SerialConnection = synchronized { - val pointer = UnsafeSerial.open( - port, - settings.baud, - settings.characterSize, - settings.twoStopBits, - settings.parity.id - ) - new SerialConnection(new UnsafeSerial(pointer), port) - } - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala deleted file mode 100644 index 4ed3e2e..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala +++ /dev/null @@ -1,9 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ ExtendedActorSystem, Props } -import akka.io.IO - -/** Provides the serial IO manager. */ -class SerialExt(system: ExtendedActorSystem) extends IO.Extension { - lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala deleted file mode 100644 index 7967087..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala +++ /dev/null @@ -1,47 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } -import akka.actor.SupervisorStrategy.{ Escalate, Stop } -import scala.util.{ Failure, Success, Try } - -/** - * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to - * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. - * @see SerialOperator - */ -private[flow] class SerialManager extends Actor { - import SerialManager._ - import context._ - - override val supervisorStrategy = OneForOneStrategy() { - case _: Exception if sender == watcher => Escalate - case _: Exception => Stop - } - - private val watcher = actorOf(Watcher(self), "watcher") - - def receive = { - - case open @ Serial.Open(port, settings, bufferSize) => Try { - SerialConnection.open(port, settings) - } match { - case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) - case Failure(err) => sender ! Serial.CommandFailed(open, err) - } - - case w: Serial.Watch => watcher.forward(w) - - case u: Serial.Unwatch => watcher.forward(u) - - } - -} - -private[flow] object SerialManager { - - private def escapePortString(port: String) = port map { - case '/' => '-' - case c => c - } - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala deleted file mode 100644 index d5c131c..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala +++ /dev/null @@ -1,83 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import akka.util.ByteString -import java.nio.ByteBuffer - -/** - * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. - * @see SerialManager - */ -private[flow] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { - import SerialOperator._ - import context._ - - case class ReaderDied(ex: Throwable) - object Reader extends Thread { - val buffer = ByteBuffer.allocateDirect(bufferSize) - - def loop() = { - var stop = false - while (!connection.isClosed && !stop) { - try { - buffer.clear() - val length = connection.read(buffer) - buffer.limit(length) - val data = ByteString.fromByteBuffer(buffer) - client.tell(Serial.Received(data), self) - } catch { - // don't do anything if port is interrupted - case ex: PortInterruptedException => {} - - //stop and tell operator on other exception - case ex: Exception => - stop = true - self.tell(ReaderDied(ex), Actor.noSender) - } - } - } - - override def run() { - this.setName(s"serial-reader(${connection.port})") - loop() - } - - } - - val writeBuffer = ByteBuffer.allocateDirect(bufferSize) - - override def preStart() = { - context watch client - client ! Serial.Opened(connection.port) - Reader.start() - } - - override def receive: Receive = { - - case Serial.Write(data, ack) => - writeBuffer.clear() - data.copyToBuffer(writeBuffer) - val sent = connection.write(writeBuffer) - if (ack != Serial.NoAck) sender ! ack(sent) - - case Serial.Close => - client ! Serial.Closed - context stop self - - case Terminated(`client`) => - context stop self - - // go down with reader thread - case ReaderDied(ex) => throw ex - - } - - override def postStop() = { - connection.close() - } - -} - -private[flow] object SerialOperator { - def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala deleted file mode 100644 index 2d3a6ed..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala +++ /dev/null @@ -1,10 +0,0 @@ -package ch.jodersky.flow - -/** - * Groups settings used in communication over a serial port. - * @param baud baud rate to use with serial port - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - */ -case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala deleted file mode 100644 index 3126618..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala +++ /dev/null @@ -1,108 +0,0 @@ -package ch.jodersky.flow - -import java.nio.ByteBuffer - -import ch.jodersky.jni.nativeLoader - -/** - * Low-level wrapper of native serial backend. - * - * WARNING: Methods in this class allocate native structures and deal with pointers. These - * pointers are handled as longs by java and are NOT checked for correctness, therefore passing - * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. - * - * See SerialConnection for a higher-level, more secured wrapper - * of serial communication. - * - * @param serialAddr address of natively allocated serial configuration structure - */ -@nativeLoader("flow4") -private[flow] class UnsafeSerial(final val serialAddr: Long) { - - final val ParityNone: Int = 0 - final val ParityOdd: Int = 1 - final val ParityEven: Int = 2 - - /** - * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only - * read into the buffer's allocated memory, its position or limit are not changed. - * - * The read is blocking, however it may be interrupted by calling cancelRead() on the given - * serial port. - * - * @param buffer direct ByteBuffer to read into - * @return number of bytes actually read - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws PortInterruptedException if the call to this function was interrupted - * @throws IOException on IO error - */ - @native def read(buffer: ByteBuffer): Int - - /** - * Cancels a read (any caller to read or readDirect will return with a - * PortInterruptedException). This function may be called from any thread. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - @native def cancelRead(): Unit - - /** - * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is - * only taken from the buffer's allocated memory, its position or limit are not changed. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer direct ByteBuffer from which data is taken - * @param length actual amount of data that should be taken from the buffer (this is needed since the native - * backend does not provide a way to query the buffer's current limit) - * @return number of bytes actually written - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws IOException on IO error - */ - @native def write(buffer: ByteBuffer, length: Int): Int - - /** - * Closes an previously open serial port. Natively allocated resources are freed and the serial - * pointer becomes invalid, therefore this function should only be called ONCE per open serial - * port. - * - * A port should not be closed while it is used (by a read or write) as this - * results in undefined behaviour. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - @native def close(): Unit - -} - -private[flow] object UnsafeSerial { - - /** - * Opens a serial port. - * - * @param port name of serial port to open - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - * @return address of natively allocated serial configuration structure - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - @native def debug(value: Boolean): Unit - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala deleted file mode 100644 index 9fa519b..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala +++ /dev/null @@ -1,143 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorRef, Props, Terminated } -import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } -import java.nio.file.StandardWatchEventKinds._ -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } -import scala.util.{ Failure, Success, Try } - -private[flow] class Watcher(from: Option[ActorRef]) extends Actor { - - case class WatcherDied(reason: Throwable) - object WatcherThread extends Thread { - import Watcher.NewFile - - private val service = FileSystems.getDefault().newWatchService() - - def register(directory: Path) = directory.register(service, ENTRY_CREATE) - - override def run(): Unit = { - this.setName("serial-port-watcher") - var stop = false - while (!stop) { - try { - val key = service.take() - key.pollEvents() foreach { ev => - val event = ev.asInstanceOf[WatchEvent[Path]] - if (event.kind == ENTRY_CREATE) { - val directory = key.watchable().asInstanceOf[Path] - val file = event.context() - self.tell(NewFile(directory, file), Actor.noSender) - } - } - key.reset() - } catch { - case _: InterruptedException => stop = true - case _: ClosedWatchServiceException => stop = true - case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) - } - } - } - - def close() = service.close // causes the service to throw a ClosedWatchServiceException - } - - - // directory -> subscribers - private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] - - // directory -> watchkey - private val keys: Map[String, WatchKey] = Map.empty - - def subscribe(directory: String, client: ActorRef): WatchKey = { - val normal = Paths.get(directory).toAbsolutePath - val index = normal.toString - val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) - clients addBinding (index, client) - key - } - - def unsubscribe(directory: String, client: ActorRef): Unit = { - val index = Paths.get(directory).toAbsolutePath.toString - - clients removeBinding (index, sender) - - if (clients.get(index).isEmpty && keys.get(index).isDefined) { - keys(index).cancel() - keys -= index - } - } - - def reply(msg: Any, sender: ActorRef) = { - val origin = from match { - case Some(ref) => ref - case None => self - } - sender.tell(msg, origin) - } - - override def preStart() = { - WatcherThread.setDaemon(true) - WatcherThread.start() - } - - override def receive = { - - case w @ Serial.Watch(directory, skipInitial) => - val normalPath = Paths.get(directory).toAbsolutePath - val normal = normalPath.toString - - Try { - subscribe(directory, sender) - } match { - case Failure(err) => reply(Serial.CommandFailed(w, err), sender) - case Success(key) => - context watch sender - if (!skipInitial) { - Files.newDirectoryStream(normalPath) foreach { path => - if (!Files.isDirectory(path)) { - reply(Serial.Connected(path.toString), sender) - } - } - } - } - - case u @ Serial.Unwatch(directory) => - val normal = Paths.get(directory).toAbsolutePath.toString - - clients.removeBinding(normal, sender) - - if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { - keys(normal).cancel() - keys -= normal - } - - case Terminated(client) => - for ((directory, c) <- clients if c == client) { - unsubscribe(directory, client) - } - - case Watcher.NewFile(directory, file) => - val normal = directory.toAbsolutePath - val absFile = normal resolve file - clients.getOrElse(normal.toString, Set.empty) foreach { client => - reply(Serial.Connected(absFile.toString), client) - } - - case WatcherDied(err) => throw err // go down with watcher thread - - } - - override def postStop() = { - WatcherThread.close() - } - -} - -private[flow] object Watcher { - private case class NewFile(directory: Path, file: Path) - - def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala deleted file mode 100644 index ee087a8..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala +++ /dev/null @@ -1,19 +0,0 @@ -package ch.jodersky.flow - -/** The requested port could not be found. */ -class NoSuchPortException(message: String) extends Exception(message) - -/** The requested port is in use by someone else. */ -class PortInUseException(message: String) extends Exception(message) - -/** Permissions are not sufficient to open a serial port. */ -class AccessDeniedException(message: String) extends Exception(message) - -/** The settings specified are invalid. */ -class InvalidSettingsException(message: String) extends Exception(message) - -/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ -class PortInterruptedException(message: String) extends Exception(message) - -/** The specified port has been closed. */ -class PortClosedException(message: String) extends Exception(message) diff --git a/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala b/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala deleted file mode 100644 index 8e891ae..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala +++ /dev/null @@ -1,43 +0,0 @@ -package ch.jodersky.flow - -import java.io.{File, IOException} -import java.nio.file.Files - -import scala.concurrent.duration._ -import scala.sys.process._ -import scala.util.control.NonFatal - -trait PseudoTerminal { - - final val SetupTimeout = 100.milliseconds - - def withEcho[A](action: (String, SerialSettings) => A): A = { - val dir = Files.createTempDirectory("flow-pty").toFile - val pty = new File(dir, "pty") - - val socat = try { - val s = Seq( - "socat", - "-d -d", - s"exec:cat,pty,raw,b115200,echo=0", - s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}" - ).run(ProcessLogger(println(_)), false) - Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up - s - } catch { - case NonFatal(ex) => - throw new IOException( - "Error running echo service, make sure the program 'socat' is installed", ex) - } - - try { - val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200)) - Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys - result - } finally { - socat.destroy() - dir.delete() - } - } - -} diff --git a/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala b/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala deleted file mode 100644 index 59af305..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.ActorSystem -import akka.io.IO -import akka.testkit.{ImplicitSender, TestKit} -import org.scalatest._ - -class SerialManagerSpec - extends TestKit(ActorSystem("serial-manager")) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with PseudoTerminal { - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "Serial manager" should { - val manager = IO(Serial) - - "open an existing port" in { - withEcho{ case (port, settings) => - manager ! Serial.Open(port, settings) - expectMsgType[Serial.Opened] - } - } - - "fail opening a non-existing port" in { - val cmd = Serial.Open("nonexistent", SerialSettings(115200)) - manager ! cmd - assert(expectMsgType[Serial.CommandFailed].command == cmd) - } - - } - -} diff --git a/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala b/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala deleted file mode 100644 index 4c1dd94..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -package ch.jodersky.flow - -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, ActorSystem} -import akka.testkit.{ImplicitSender, TestKit} -import akka.util.ByteString -import org.scalatest._ - -case class Ack(n: Int) extends Serial.Event - -class SerialOperatorSpec - extends TestKit(ActorSystem("serial-operator")) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with SequentialNestedSuiteExecution - with PseudoTerminal { - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - def withEchoOp[A](action: ActorRef => A): A = { - withEcho { case (port, settings) => - val connection = SerialConnection.open(port, settings) - val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor)) - action(operator) - } - } - - "Serial operator" should { - - "follow the correct protocol" in withEchoOp { op => - expectMsgType[Serial.Opened] - - val data = ByteString("hello world".getBytes("utf-8")) - op ! Serial.Write(data) - expectMsg(Serial.Received(data)) - - op ! Serial.Close - expectMsg(Serial.Closed) - - } - - } - -} diff --git a/flow-native/build.sbt b/flow-native/build.sbt deleted file mode 100644 index 2c7ffea..0000000 --- a/flow-native/build.sbt +++ /dev/null @@ -1,8 +0,0 @@ -enablePlugins(JniNative) - -sourceDirectory in nativeCompile := sourceDirectory.value - -// package native libraries from lib_native during releases -val isRelease = sys.props("release") == "true" -enableNativeCompilation in Compile := !isRelease -enableNativeCompilation in Test := !isRelease diff --git a/flow-native/lib_native/armv7l-linux/libflow4.so b/flow-native/lib_native/armv7l-linux/libflow4.so deleted file mode 100755 index f5d9ae0..0000000 Binary files a/flow-native/lib_native/armv7l-linux/libflow4.so and /dev/null differ diff --git a/flow-native/lib_native/i686-linux/libflow4.so b/flow-native/lib_native/i686-linux/libflow4.so deleted file mode 100644 index fb438e2..0000000 Binary files a/flow-native/lib_native/i686-linux/libflow4.so and /dev/null differ diff --git a/flow-native/lib_native/x86_64-darwin/libflow4.dylib b/flow-native/lib_native/x86_64-darwin/libflow4.dylib deleted file mode 100644 index 213415d..0000000 Binary files a/flow-native/lib_native/x86_64-darwin/libflow4.dylib and /dev/null differ diff --git a/flow-native/lib_native/x86_64-linux/libflow4.so b/flow-native/lib_native/x86_64-linux/libflow4.so deleted file mode 100755 index 18fb300..0000000 Binary files a/flow-native/lib_native/x86_64-linux/libflow4.so and /dev/null differ diff --git a/flow-native/src/.gitignore b/flow-native/src/.gitignore deleted file mode 100644 index 1785c46..0000000 --- a/flow-native/src/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -# CMake -/CMakeFiles/ -/CMakeCache.txt -/cmake_install.cmake -/Makefile - -# Binary files -*.o -*.so* -*.dylib -*.a -*~ \ No newline at end of file diff --git a/flow-native/src/CMakeLists.txt b/flow-native/src/CMakeLists.txt deleted file mode 100644 index a57451c..0000000 --- a/flow-native/src/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -################################################################ -# A minimal CMake file that is compatible with sbt-jni # -# # -# All settings required by sbt-jni have been marked so, please # -# add/modify/remove settings to build your specific library. # -################################################################ - -cmake_minimum_required(VERSION 2.8.0) -set(ignoreMe "${SBT}") # sbt-jni defines -DSBT - -# Define project and related variables -# (required by sbt-jni) please use semantic versioning -# -project (flow) -set(PROJECT_VERSION_MAJOR 4) -set(PROJECT_VERSION_MINOR 0) -set(PROJECT_VERSION_PATCH 0) - -set(CMAKE_C_FLAGS "-std=c99") -add_definitions(-Wall) -add_definitions(-Wextra) -add_definitions(-pedantic) - -# Setup JNI -find_package(JNI REQUIRED) -if (JNI_FOUND) - message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}") -endif() - -# Include directories -include_directories(.) -include_directories(include) -include_directories(${JNI_INCLUDE_DIRS}) - -# Sources -file(GLOB LIB_SRC - "*.c" - "platform/posix/*.c" -) - -# Setup installation targets -# (required by sbt-jni) major version should always be appended to library name -# -set (LIB_NAME ${PROJECT_NAME}${PROJECT_VERSION_MAJOR}) -add_library(${LIB_NAME} SHARED ${LIB_SRC}) -install(TARGETS ${LIB_NAME} LIBRARY DESTINATION .) diff --git a/flow-native/src/flow_jni.c b/flow-native/src/flow_jni.c deleted file mode 100644 index 75bffff..0000000 --- a/flow-native/src/flow_jni.c +++ /dev/null @@ -1,150 +0,0 @@ -#include - -#include "flow.h" - -#include "ch_jodersky_flow_UnsafeSerial.h" -#include "ch_jodersky_flow_UnsafeSerial__.h" - -// suppress unused parameter warnings -#define UNUSED_ARG(x) (void)(x) - -static inline void throwException(JNIEnv* env, const char* const exception, const char * const message) -{ - (*env)->ThrowNew(env, (*env)->FindClass(env, exception), message); -} - -/** Check return code and throw exception in case it is non-zero. */ -static void check(JNIEnv* env, int ret) -{ - switch (ret) { - case -E_IO: throwException(env, "java/io/IOException", ""); break; - case -E_BUSY: throwException(env, "ch/jodersky/flow/PortInUseException", ""); break; - case -E_ACCESS_DENIED: throwException(env, "ch/jodersky/flow/AccessDeniedException", ""); break; - case -E_INVALID_SETTINGS: throwException(env, "ch/jodersky/flow/InvalidSettingsException", ""); break; - case -E_INTERRUPT: throwException(env, "ch/jodersky/flow/PortInterruptedException", ""); break; - case -E_NO_PORT: throwException(env, "ch/jodersky/flow/NoSuchPortException", ""); break; - default: return; - } -} - -/** Get pointer to serial config associated to an UnsafeSerial instance. */ -static struct serial_config* get_config(JNIEnv* env, jobject unsafe_serial) -{ - jclass clazz = (*env)->FindClass(env, "ch/jodersky/flow/UnsafeSerial"); - jfieldID field = (*env)->GetFieldID(env, clazz, "serialAddr", "J"); - jlong addr = (*env)->GetLongField(env, unsafe_serial, field); - return (struct serial_config*) (intptr_t) addr; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: open - * Signature: (Ljava/lang/String;IIZI)J - */ -JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open -(JNIEnv *env, jobject instance, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity) -{ - UNUSED_ARG(instance); - - const char *dev = (*env)->GetStringUTFChars(env, port_name, 0); - struct serial_config* config; - int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &config); - (*env)->ReleaseStringUTFChars(env, port_name, dev); - - if (r < 0) { - check(env, r); - return -E_IO; - } - - long jpointer = (long) config; - return (jlong) jpointer; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: read - * Signature: (Ljava/nio/ByteBuffer;)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read -(JNIEnv *env, jobject instance, jobject buffer) -{ - char* local_buffer = (char*) (*env)->GetDirectBufferAddress(env, buffer); - if (local_buffer == NULL) { - throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); - return -E_IO; - } - size_t size = (size_t) (*env)->GetDirectBufferCapacity(env, buffer); - struct serial_config* config = get_config(env, instance); - - int r = serial_read(config, local_buffer, size); - if (r < 0) { - check(env, r); - } - return r; - -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: cancelRead - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead -(JNIEnv *env, jobject instance) -{ - int r = serial_cancel_read(get_config(env, instance)); - if (r < 0) { - check(env, r); - } -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: write - * Signature: (Ljava/nio/ByteBuffer;I)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write -(JNIEnv *env, jobject instance, jobject buffer, jint size) -{ - - char* local_buffer = (char *) (*env)->GetDirectBufferAddress(env, buffer); - if (local_buffer == NULL) { - throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); - return -E_IO; - } - - int r = serial_write(get_config(env, instance), local_buffer, (size_t) size); - if (r < 0) { - check(env, r); - return -E_IO; - } - return r; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: close - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close -(JNIEnv *env, jobject instance) -{ - int r = serial_close(get_config(env, instance)); - if (r < 0) { - check(env, r); - } -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: debug - * Signature: (Z)V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug -(JNIEnv *env, jobject instance, jboolean value) -{ - UNUSED_ARG(env); - UNUSED_ARG(instance); - - serial_debug((bool) value); -} diff --git a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h b/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h deleted file mode 100644 index f80ada0..0000000 --- a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h +++ /dev/null @@ -1,45 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include -/* Header for class ch_jodersky_flow_UnsafeSerial */ - -#ifndef _Included_ch_jodersky_flow_UnsafeSerial -#define _Included_ch_jodersky_flow_UnsafeSerial -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: read - * Signature: (Ljava/nio/ByteBuffer;)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read - (JNIEnv *, jobject, jobject); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: cancelRead - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead - (JNIEnv *, jobject); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: write - * Signature: (Ljava/nio/ByteBuffer;I)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write - (JNIEnv *, jobject, jobject, jint); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: close - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close - (JNIEnv *, jobject); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h b/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h deleted file mode 100644 index 617875f..0000000 --- a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h +++ /dev/null @@ -1,29 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include -/* Header for class ch_jodersky_flow_UnsafeSerial__ */ - -#ifndef _Included_ch_jodersky_flow_UnsafeSerial__ -#define _Included_ch_jodersky_flow_UnsafeSerial__ -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: open - * Signature: (Ljava/lang/String;IIZI)J - */ -JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open - (JNIEnv *, jobject, jstring, jint, jint, jboolean, jint); - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: debug - * Signature: (Z)V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug - (JNIEnv *, jobject, jboolean); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/flow-native/src/include/flow.h b/flow-native/src/include/flow.h deleted file mode 100644 index e3f33b9..0000000 --- a/flow-native/src/include/flow.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef FLOW_H -#define FLOW_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include - -// general error codes, whose that are returned by functions -#define E_IO 1 // IO error -#define E_ACCESS_DENIED 2 // access denied -#define E_BUSY 3 // port is busy -#define E_INVALID_SETTINGS 4 // some port settings are invalid -#define E_INTERRUPT 5 // not really an error, function call aborted because port is closed -#define E_NO_PORT 6 // requested port does not exist - -#define PARITY_NONE 0 -#define PARITY_ODD 1 -#define PARITY_EVEN 2 - -/** - * Contains internal configuration of an open serial port. - */ -struct serial_config; - -/** - * Opens a serial port and allocates memory for storing configuration. Note: if this function fails, - * any internally allocated resources will be freed. - * @param port_name name of port - * @param baud baud rate - * @param char_size character size of data transmitted through serial device - * @param two_stop_bits set to use two stop bits instead of one - * @param parity kind of parity checking to use - * @param serial pointer to memory that will be allocated with a serial structure - * @return 0 on success - * @return -E_NO_PORT if the given port does not exist - * @return -E_ACCESS_DENIED if permissions are not sufficient to open port - * @return -E_BUSY if port is already in use - * @return -E_INVALID_SETTINGS if any of the specified settings are invalid - * @return -E_IO on other error - */ -int serial_open( - const char* port_name, - int baud, - int char_size, - bool two_stop_bits, - int parity, - struct serial_config** const serial); - -/** - * Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to - * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT - * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per - * close manual page, close should not be called on a file descriptor that is in use by another thread). - * @param serial pointer to serial configuration that is to be closed (and freed) - * @return 0 on success - * @return -E_IO on error - */ -int serial_close(struct serial_config* const serial); - -/** - * Starts a read from a previously opened serial port. The read is blocking, however it may be - * interrupted by calling 'serial_cancel_read' on the given serial port. - * @param serial pointer to serial configuration from which to read - * @param buffer buffer into which data is read - * @param size maximum buffer size - * @return n>0 the number of bytes read into buffer - * @return -E_INTERRUPT if the call to this function was interrupted - * @return -E_IO on IO error - */ -int serial_read(struct serial_config* const serial, char* const buffer, size_t size); - -/** - * Cancels a blocked read call. This function is thread safe, i.e. it may be called from a thread even - * while another thread is blocked in a read call. - * @param serial_config the serial port to interrupt - * @return 0 on success - * @return -E_IO on error - */ -int serial_cancel_read(struct serial_config* const serial); - -/** - * Writes data to a previously opened serial port. Non bocking. - * @param serial pointer to serial configuration to which to write - * @param data data to write - * @param size number of bytes to write from data - * @return n>0 the number of bytes written - * @return -E_IO on IO error - */ -int serial_write(struct serial_config* const serial, char* const data, size_t size); - -/** - * Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. - */ -void serial_debug(bool value); - -#ifdef __cplusplus -} -#endif - -#endif /* FLOW_H */ diff --git a/flow-native/src/platform/posix/flow.c b/flow-native/src/platform/posix/flow.c deleted file mode 100644 index 969949b..0000000 --- a/flow-native/src/platform/posix/flow.c +++ /dev/null @@ -1,263 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include "flow.h" - -#define DATA_CANCEL 0xffffffff - -static bool debug = false; - -static void print_debug(const char* const msg, int en) -{ - if (debug) { - if (errno == 0) { - fprintf(stderr, "%s", msg); - } else { - fprintf(stderr, "%s: %d\n", msg, en); - } - fflush(stderr); - } -} - -void serial_debug(bool value) -{ - debug = value; -} - -//contains file descriptors used in managing a serial port -struct serial_config { - int port_fd; // file descriptor of serial port - - /* a pipe is used to abort a serial read by writing something into the - * write end of the pipe */ - int pipe_read_fd; // file descriptor, read end of pipe - int pipe_write_fd; // file descriptor, write end of pipe -}; - -int serial_open( - const char* const port_name, - int baud, - int char_size, - bool two_stop_bits, - int parity, - struct serial_config** serial) -{ - - int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK); - - if (fd < 0) { - int en = errno; - print_debug("Error obtaining file descriptor for port", en); - if (en == EACCES) return -E_ACCESS_DENIED; - if (en == ENOENT) return -E_NO_PORT; - return -E_IO; - } - - if (flock(fd, LOCK_EX | LOCK_NB) < 0) { - print_debug("Error acquiring lock on port", errno); - close(fd); - return -E_BUSY; - } - - /* configure new port settings */ - struct termios newtio; - - /* initialize serial interface */ - newtio.c_iflag = 0; - newtio.c_oflag = 0; - newtio.c_lflag = 0; - newtio.c_cflag = CREAD; - - /* set speed */ - speed_t bd; - switch (baud) { - case 50: bd = B50; break; - case 75: bd = B75; break; - case 110: bd = B110; break; - case 134: bd = B134; break; - case 150: bd = B150; break; - case 200: bd = B200; break; - case 300: bd = B300; break; - case 600: bd = B600; break; - case 1200: bd = B1200; break; - case 1800: bd = B1800; break; - case 2400: bd = B2400; break; - case 4800: bd = B4800; break; - case 9600: bd = B9600; break; - case 19200: bd = B19200; break; - case 38400: bd = B38400; break; - case 57600: bd = B57600; break; - case 115200: bd = B115200; break; - case 230400: bd = B230400; break; - default: - close(fd); - print_debug("Invalid baud rate", 0); - return -E_INVALID_SETTINGS; - } - - if (cfsetspeed(&newtio, bd) < 0) { - print_debug("Error setting baud rate", errno); - close(fd); - return -E_IO; - } - - /* set char size*/ - switch (char_size) { - case 5: newtio.c_cflag |= CS5; break; - case 6: newtio.c_cflag |= CS6; break; - case 7: newtio.c_cflag |= CS7; break; - case 8: newtio.c_cflag |= CS8; break; - default: - close(fd); - print_debug("Invalid character size", 0); - return -E_INVALID_SETTINGS; - } - - /* use two stop bits */ - if (two_stop_bits){ - newtio.c_cflag |= CSTOPB; - } - - /* set parity */ - switch (parity) { - case PARITY_NONE: break; - case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break; - case PARITY_EVEN: newtio.c_cflag |= PARENB; break; - default: - close(fd); - print_debug("Invalid parity", 0); - return -E_INVALID_SETTINGS; - } - - if (tcflush(fd, TCIOFLUSH) < 0) { - print_debug("Error flushing serial settings", errno); - close(fd); - return -E_IO; - } - - if (tcsetattr(fd, TCSANOW, &newtio) < 0) { - print_debug("Error applying serial settings", errno); - close(fd); - return -E_IO; - } - - int pipe_fd[2]; - if (pipe(pipe_fd) < 0) { - print_debug("Error opening pipe", errno); - close(fd); - return -E_IO; - } - - if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) { - print_debug("Error setting pipe to non-blocking", errno); - close(fd); - close(pipe_fd[0]); - close(pipe_fd[1]); - return -E_IO; - } - - struct serial_config* s = malloc(sizeof(s)); - if (s == NULL) { - print_debug("Error allocating memory for serial configuration", errno); - close(fd); - close(pipe_fd[0]); - close(pipe_fd[1]); - return -E_IO; - } - - s->port_fd = fd; - s->pipe_read_fd = pipe_fd[0]; - s->pipe_write_fd = pipe_fd[1]; - (*serial) = s; - - return 0; -} - -int serial_close(struct serial_config* const serial) -{ - if (close(serial->pipe_write_fd) < 0) { - print_debug("Error closing write end of pipe", errno); - return -E_IO; - } - if (close(serial->pipe_read_fd) < 0) { - print_debug("Error closing read end of pipe", errno); - return -E_IO; - } - - if (flock(serial->port_fd, LOCK_UN) < 0){ - print_debug("Error releasing lock on port", errno); - return -E_IO; - } - if (close(serial->port_fd) < 0) { - print_debug("Error closing port", errno); - return -E_IO; - } - - free(serial); - return 0; -} - -int serial_read(struct serial_config* const serial, char* const buffer, size_t size) -{ - int port = serial->port_fd; - int pipe = serial->pipe_read_fd; - - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(port, &rfds); - FD_SET(pipe, &rfds); - - int nfds = pipe + 1; - if (pipe < port) nfds = port + 1; - - int n = select(nfds, &rfds, NULL, NULL, NULL); - if (n < 0) { - print_debug("Error trying to call select on port and pipe", errno); - return -E_IO; - } - - if (FD_ISSET(pipe, &rfds)) { - return -E_INTERRUPT; - } else if (FD_ISSET(port, &rfds)) { - int r = read(port, buffer, size); - - // treat 0 bytes read as an error to avoid problems on disconnect - // anyway, after a poll there should be more than 0 bytes available to read - if (r <= 0) { - print_debug("Error data not available after select", errno); - return -E_IO; - } - return r; - } else { - print_debug("Select returned unknown read sets", 0); - return -E_IO; - } -} - -int serial_cancel_read(struct serial_config* const serial) -{ - int data = DATA_CANCEL; - - //write to pipe to wake up any blocked read thread (self-pipe trick) - if (write(serial->pipe_write_fd, &data, 1) < 0) { - print_debug("Error writing to pipe during read cancel", errno); - return -E_IO; - } - - return 0; -} - -int serial_write(struct serial_config* const serial, char* const data, size_t size) -{ - int r = write(serial->port_fd, data, size); - if (r < 0) { - print_debug("Error writing to port", errno); - return -E_IO; - } - return r; -} diff --git a/flow-native/src/platform/windows/README b/flow-native/src/platform/windows/README deleted file mode 100644 index 3d24410..0000000 --- a/flow-native/src/platform/windows/README +++ /dev/null @@ -1 +0,0 @@ -The contents of the file flow.c were found in the avrdude project. They look like they may be a good starting point for serial communication on windows. diff --git a/flow-native/src/platform/windows/flow.c.disabled b/flow-native/src/platform/windows/flow.c.disabled deleted file mode 100644 index 86a267c..0000000 --- a/flow-native/src/platform/windows/flow.c.disabled +++ /dev/null @@ -1,416 +0,0 @@ -/* - * avrdude - A Downloader/Uploader for AVR device programmers - * Copyright (C) 2003, 2004 Martin J. Thomas - * Copyright (C) 2006 Joerg Wunsch - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -/* $Id$ */ - -/* - * Native Win32 serial interface for avrdude. - */ - -#include "avrdude.h" - -#if defined(WIN32NATIVE) - -#include -#include -#include /* for isprint */ - -#include "serial.h" - -long serial_recv_timeout = 5000; /* ms */ - -#define W32SERBUFSIZE 1024 - -struct baud_mapping { - long baud; - DWORD speed; -}; - -/* HANDLE hComPort=INVALID_HANDLE_VALUE; */ - -static struct baud_mapping baud_lookup_table [] = { - { 1200, CBR_1200 }, - { 2400, CBR_2400 }, - { 4800, CBR_4800 }, - { 9600, CBR_9600 }, - { 19200, CBR_19200 }, - { 38400, CBR_38400 }, - { 57600, CBR_57600 }, - { 115200, CBR_115200 }, - { 0, 0 } /* Terminator. */ -}; - -static DWORD serial_baud_lookup(long baud) -{ - struct baud_mapping *map = baud_lookup_table; - - while (map->baud) { - if (map->baud == baud) - return map->speed; - map++; - } - - /* - * If a non-standard BAUD rate is used, issue - * a warning (if we are verbose) and return the raw rate - */ - if (verbose > 0) - fprintf(stderr, "%s: serial_baud_lookup(): Using non-standard baud rate: %ld", - progname, baud); - - return baud; -} - - -static BOOL serial_w32SetTimeOut(HANDLE hComPort, DWORD timeout) // in ms -{ - COMMTIMEOUTS ctmo; - ZeroMemory (&ctmo, sizeof(COMMTIMEOUTS)); - ctmo.ReadIntervalTimeout = timeout; - ctmo.ReadTotalTimeoutMultiplier = timeout; - ctmo.ReadTotalTimeoutConstant = timeout; - - return SetCommTimeouts(hComPort, &ctmo); -} - -static int ser_setspeed(union filedescriptor *fd, long baud) -{ - DCB dcb; - HANDLE hComPort = (HANDLE)fd->pfd; - - ZeroMemory (&dcb, sizeof(DCB)); - dcb.DCBlength = sizeof(DCB); - dcb.BaudRate = serial_baud_lookup (baud); - dcb.fBinary = 1; - dcb.fDtrControl = DTR_CONTROL_DISABLE; - dcb.fRtsControl = RTS_CONTROL_DISABLE; - dcb.ByteSize = 8; - dcb.Parity = NOPARITY; - dcb.StopBits = ONESTOPBIT; - - if (!SetCommState(hComPort, &dcb)) - return -1; - - return 0; -} - - -static int ser_open(char * port, long baud, union filedescriptor *fdp) -{ - LPVOID lpMsgBuf; - HANDLE hComPort=INVALID_HANDLE_VALUE; - char *newname = 0; - - /* - * If the port is of the form "net::", then - * handle it as a TCP connection to a terminal server. - * - * This is curently not implemented for Win32. - */ - if (strncmp(port, "net:", strlen("net:")) == 0) { - fprintf(stderr, - "%s: ser_open(): network connects are currently not" - "implemented for Win32 environments\n", - progname); - return -1; - } - - if (strncasecmp(port, "com", strlen("com")) == 0) { - - // prepend "\\\\.\\" to name, required for port # >= 10 - newname = malloc(strlen("\\\\.\\") + strlen(port) + 1); - - if (newname == 0) { - fprintf(stderr, - "%s: ser_open(): out of memory\n", - progname); - exit(1); - } - strcpy(newname, "\\\\.\\"); - strcat(newname, port); - - port = newname; - } - - hComPort = CreateFile(port, GENERIC_READ | GENERIC_WRITE, 0, NULL, - OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); - - if (hComPort == INVALID_HANDLE_VALUE) { - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL); - fprintf(stderr, "%s: ser_open(): can't open device \"%s\": %s\n", - progname, port, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - return -1; - } - - if (!SetupComm(hComPort, W32SERBUFSIZE, W32SERBUFSIZE)) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set buffers for \"%s\"\n", - progname, port); - return -1; - } - - fdp->pfd = (void *)hComPort; - if (ser_setspeed(fdp, baud) != 0) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set com-state for \"%s\"\n", - progname, port); - return -1; - } - - if (!serial_w32SetTimeOut(hComPort,0)) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set initial timeout for \"%s\"\n", - progname, port); - return -1; - } - - if (newname != 0) { - free(newname); - } - return 0; -} - - -static void ser_close(union filedescriptor *fd) -{ - HANDLE hComPort=(HANDLE)fd->pfd; - if (hComPort != INVALID_HANDLE_VALUE) - CloseHandle (hComPort); - - hComPort = INVALID_HANDLE_VALUE; -} - -static int ser_set_dtr_rts(union filedescriptor *fd, int is_on) -{ - HANDLE hComPort=(HANDLE)fd->pfd; - - if (is_on) { - EscapeCommFunction(hComPort, SETDTR); - EscapeCommFunction(hComPort, SETRTS); - } else { - EscapeCommFunction(hComPort, CLRDTR); - EscapeCommFunction(hComPort, CLRRTS); - } - return 0; -} - - -static int ser_send(union filedescriptor *fd, unsigned char * buf, size_t buflen) -{ - size_t len = buflen; - unsigned char c='\0'; - DWORD written; - unsigned char * b = buf; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_send(): port not open\n", - progname); - exit(1); - } - - if (!len) - return 0; - - if (verbose > 3) - { - fprintf(stderr, "%s: Send: ", progname); - - while (len) { - c = *b; - if (isprint(c)) { - fprintf(stderr, "%c ", c); - } - else { - fprintf(stderr, ". "); - } - fprintf(stderr, "[%02x] ", c); - b++; - len--; - } - fprintf(stderr, "\n"); - } - - serial_w32SetTimeOut(hComPort,500); - - if (!WriteFile (hComPort, buf, buflen, &written, NULL)) { - fprintf(stderr, "%s: ser_send(): write error: %s\n", - progname, "sorry no info avail"); // TODO - exit(1); - } - - if (written != buflen) { - fprintf(stderr, "%s: ser_send(): size/send mismatch\n", - progname); - exit(1); - } - - return 0; -} - - -static int ser_recv(union filedescriptor *fd, unsigned char * buf, size_t buflen) -{ - unsigned char c; - unsigned char * p = buf; - DWORD read; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_read(): port not open\n", - progname); - exit(1); - } - - serial_w32SetTimeOut(hComPort, serial_recv_timeout); - - if (!ReadFile(hComPort, buf, buflen, &read, NULL)) { - LPVOID lpMsgBuf; - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL ); - fprintf(stderr, "%s: ser_recv(): read error: %s\n", - progname, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - exit(1); - } - - /* time out detected */ - if (read == 0) { - if (verbose > 1) - fprintf(stderr, - "%s: ser_recv(): programmer is not responding\n", - progname); - return -1; - } - - p = buf; - - if (verbose > 3) - { - fprintf(stderr, "%s: Recv: ", progname); - - while (read) { - c = *p; - if (isprint(c)) { - fprintf(stderr, "%c ", c); - } - else { - fprintf(stderr, ". "); - } - fprintf(stderr, "[%02x] ", c); - - p++; - read--; - } - fprintf(stderr, "\n"); - } - return 0; -} - - -static int ser_drain(union filedescriptor *fd, int display) -{ - // int rc; - unsigned char buf[10]; - BOOL readres; - DWORD read; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_drain(): port not open\n", - progname); - exit(1); - } - - serial_w32SetTimeOut(hComPort,250); - - if (display) { - fprintf(stderr, "drain>"); - } - - while (1) { - readres=ReadFile(hComPort, buf, 1, &read, NULL); - if (!readres) { - LPVOID lpMsgBuf; - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL ); - fprintf(stderr, "%s: ser_drain(): read error: %s\n", - progname, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - exit(1); - } - - if (read) { // data avail - if (display) fprintf(stderr, "%02x ", buf[0]); - } - else { // no more data - if (display) fprintf(stderr, "/run`. - -All projects, including samples, can be listed by running `sbt projects`. diff --git a/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala b/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala deleted file mode 100644 index 74433db..0000000 --- a/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala +++ /dev/null @@ -1,66 +0,0 @@ -package ch.jodersky.flow -package samples.terminalstream - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.io.StdIn - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.util.ByteString - -import stream.Serial - -object Main { - - final val Delay = FiniteDuration(500, MILLISECONDS) - - implicit val system = ActorSystem("terminal-stream") - implicit val materializer = ActorMaterializer() - - def ask(label: String, default: String) = { - print(label + " [" + default.toString + "]: ") - val in = StdIn.readLine() - println("") - if (in.isEmpty) default else in - } - - def main(args: Array[String]): Unit = { - import system.dispatcher - - val port = ask("Device", "/dev/ttyACM0") - val baud = ask("Baud rate", "115200").toInt - val cs = ask("Char size", "8").toInt - val tsb = ask("Use two stop bits", "false").toBoolean - val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) - val settings = SerialSettings(baud, cs, tsb, parity) - - val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] = - Serial().open(port, settings) - - val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data => - println("server says: " + data.decodeString("UTF-8")) - } - - val ticker: Source[ByteString, _] = Source.tick(Delay, Delay, ()).scan(0){case (x, _) => - x + 1 - }.map{ x => - println(x) - ByteString(x.toString) - } - - val connection: Future[Serial.Connection] = ticker.viaMat(serial)(Keep.right).to(printer).run() - - connection map { conn => - println("Connected to " + conn.port) - StdIn.readLine("Press enter to exit") - } recover { case err => - println("Cannot connect: " + err) - } andThen { case _ => - system.terminate() - } - - } - -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala deleted file mode 100644 index 2b30663..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala +++ /dev/null @@ -1,29 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.Actor -import scala.io.StdIn - -class ConsoleReader extends Actor { - import context._ - import ConsoleReader._ - - def receive = { - case Read => - StdIn.readLine() match { - case ":q" | null => parent ! EOT - case s => { - parent ! ConsoleInput(s) - } - } - } - -} - -object ConsoleReader { - - case object Read - case object EOT - case class ConsoleInput(in: String) - -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala deleted file mode 100644 index 80c0f80..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala +++ /dev/null @@ -1,30 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.ActorSystem -import scala.io.StdIn - -object Main { - - def ask(label: String, default: String) = { - print(label + " [" + default.toString + "]: ") - val in = StdIn.readLine() - println("") - if (in.isEmpty) default else in - } - - def main(args: Array[String]): Unit = { - val port = ask("Device", "/dev/ttyACM0") - val baud = ask("Baud rate", "115200").toInt - val cs = ask("Char size", "8").toInt - val tsb = ask("Use two stop bits", "false").toBoolean - val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) - val settings = SerialSettings(baud, cs, tsb, parity) - - println("Starting terminal system, enter :q to exit.") - Serial.debug(true) - val system = ActorSystem("flow") - val terminal = system.actorOf(Terminal(port, settings), name = "terminal") - system.registerOnTermination(println("Stopped terminal system.")) - } -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala deleted file mode 100644 index de60620..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala +++ /dev/null @@ -1,75 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } -import akka.io.IO -import akka.util.ByteString - -class Terminal(port: String, settings: SerialSettings) extends Actor with ActorLogging { - import Terminal._ - import context._ - - val reader = actorOf(Props[ConsoleReader]) - - log.info(s"Requesting manager to open port: ${port}, baud: ${settings.baud}") - IO(Serial) ! Serial.Open(port, settings) - - override def postStop() = { - system.terminate() - } - - def receive = { - case Serial.CommandFailed(cmd, reason) => { - log.error(s"Connection failed, stopping terminal. Reason: ${reason}") - context stop self - } - case Serial.Opened(port) => { - log.info(s"Port ${port} is now open.") - val operator = sender - context become opened(operator) - context watch operator - reader ! ConsoleReader.Read - } - } - - def opened(operator: ActorRef): Receive = { - - case Serial.Received(data) => { - log.info(s"Received data: ${formatData(data)}") - } - - case Terminal.Wrote(data) => log.info(s"Wrote data: ${formatData(data)}") - - case Serial.Closed => { - log.info("Operator closed normally, exiting terminal.") - context unwatch operator - context stop self - } - - case Terminated(`operator`) => { - log.error("Operator crashed, exiting terminal.") - context stop self - } - - case ConsoleReader.EOT => { - log.info("Initiating close.") - operator ! Serial.Close - } - - case ConsoleReader.ConsoleInput(input) => { - val data = ByteString(input.getBytes) - operator ! Serial.Write(data, length => Wrote(data.take(length))) - reader ! ConsoleReader.Read - } - } - -} - -object Terminal { - case class Wrote(data: ByteString) extends Serial.Event - - def apply(port: String, settings: SerialSettings) = Props(classOf[Terminal], port, settings) - - private def formatData(data: ByteString) = data.mkString("[", ",", "]") + " " + (new String(data.toArray, "UTF-8")) - -} diff --git a/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala b/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala deleted file mode 100644 index 650d08e..0000000 --- a/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala +++ /dev/null @@ -1,50 +0,0 @@ -package ch.jodersky.flow -package samples.watcher - -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } -import akka.io.IO -import scala.io.StdIn - -class Watcher extends Actor with ActorLogging { - import context._ - - val ports = List( - "/dev/ttyUSB\\d+", - "/dev/ttyACM\\d+", - "/dev/cu\\d+", - "/dev/ttyS\\d+" - ) - - override def preStart() = { - val cmd = Serial.Watch() - IO(Serial) ! cmd //watch for new devices - log.info(s"Watching ${cmd.directory} for new devices.") - } - - def receive = { - - case Serial.CommandFailed(w: Serial.Watch, err) => - log.error(err, s"Could not get a watch on ${w.directory}.") - context stop self - - case Serial.Connected(path) => - log.info(s"New device: ${path}") - ports.find(path matches _) match { - case Some(port) => log.info(s"Device is a serial device.") - case None => log.warning(s"Device is NOT serial device.") - } - - } - -} - -object Main { - - def main(args: Array[String]): Unit = { - val system = ActorSystem("flow") - val watcher = system.actorOf(Props(classOf[Watcher]), name = "watcher") - StdIn.readLine() - system.terminate() - } - -} diff --git a/flow-stream/build.sbt b/flow-stream/build.sbt deleted file mode 100644 index c9aa7eb..0000000 --- a/flow-stream/build.sbt +++ /dev/null @@ -1,5 +0,0 @@ -import flow.Dependencies - -libraryDependencies += Dependencies.akkaActor -libraryDependencies += Dependencies.akkaStream -libraryDependencies += Dependencies.scalatest % "test" diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala deleted file mode 100644 index d478de8..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala +++ /dev/null @@ -1,67 +0,0 @@ -package ch.jodersky.flow -package stream - -import akka.stream.scaladsl.Source -import scala.concurrent.Future - -import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} -import akka.io.IO -import akka.stream.scaladsl.Flow -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial} -import impl._ - -object Serial extends ExtensionId[Serial] with ExtensionIdProvider { - - /** - * Represents a prospective serial connection. - */ - case class Connection(port: String, settings: SerialSettings) - - case class Watch(ports: Set[String]) - - def apply()(implicit system: ActorSystem): Serial = super.apply(system) - - override def lookup() = Serial - - override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) - -} - -/** - * Entry point to streaming over serial ports. - * The design of this API is inspired by Akka's Tcp streams. - */ -class Serial(system: ExtendedActorSystem) extends Extension { - - /** - * Creates a Flow that will open a serial port when materialized. - * This Flow then represents an open serial connection: data pushed to its - * inlet will be written to the underlying serial port, and data received - * on the port will be emitted by its outlet. - * @param port name of serial port to open - * @param settings settings to use with serial port - * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped - * @param bufferSize maximum read and write buffer sizes - * @return a Flow associated to the given serial port - */ - def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): - Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( - new SerialConnectionStage( - IO(CoreSerial)(system), - port, - settings, - failOnOverflow, - bufferSize - ) - ) - - def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( - new WatcherStage( - IO(CoreSerial)(system), - ports - ) - ) - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala deleted file mode 100644 index 78438f9..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala +++ /dev/null @@ -1,5 +0,0 @@ -package ch.jodersky.flow -package stream - -/** Represents a generic exception occured during streaming of serial data. */ -class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala deleted file mode 100644 index 8eee61c..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala +++ /dev/null @@ -1,4 +0,0 @@ -package ch.jodersky.flow -package stream - -class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala deleted file mode 100644 index 764b054..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala +++ /dev/null @@ -1,172 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.{FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings} - -/** - * Graph logic that handles establishing and forwarding serial communication. - * The underlying stream is closed when downstream (output) finishes, - * upstream (input) closes are ignored. - */ -private[stream] class SerialConnectionLogic( - shape: FlowShape[ByteString, ByteString], - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int, - connectionPromise: Promise[Serial.Connection]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - import SerialConnectionLogic._ - - /** Receives data and writes it to the serial backend. */ - private def in: Inlet[ByteString] = shape.in - - /** Receives data from the serial backend and pushes it downstream. */ - private def out: Outlet[ByteString] = shape.out - - /** Implicit alias to stageActor so it will be used in "!" calls, without - * explicitly specifying a sender. */ - implicit private def self = stageActor.ref - - /** - * Input handler for an established connection. - * @param operator the operator actor of the established connection - */ - class ConnectedInHandler(operator: ActorRef) extends InHandler { - - override def onPush(): Unit = { - val elem = grab(in) - require(elem != null) // reactive streams requirement - operator ! CoreSerial.Write(elem, _ => WriteAck) - } - - override def onUpstreamFinish(): Unit = { - if (isClosed(out)) { // close serial connection if output is also closed - operator ! CoreSerial.Close - } - } - - } - - class ConnectedOutHandler(operator: ActorRef) extends OutHandler { - // implicit alias to stage actor, so it will be used in "!" calls - implicit val self = stageActor.ref - - override def onPull(): Unit = { - // serial connections are at the end of the "backpressure chain", - // they do not natively support backpressure (as does TCP for example) - // therefore nothing is done here - } - - override def onDownstreamFinish(): Unit = { - // closing downstream also closes the underlying connection - operator ! CoreSerial.Close - } - - } - - override def preStart(): Unit = { - setKeepGoing(true) // serial connection operator will manage completing stage - getStageActor(connecting) - stageActor watch manager - manager ! CoreSerial.Open(port, settings, bufferSize) - } - - setHandler(in, IgnoreTerminateInput) - setHandler(out, IgnoreTerminateOutput) - - /** Initial behavior, before a serial connection is established. */ - private def connecting(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`manager`) => - val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.Opened(port) => - val operator = sender - setHandler(in, new ConnectedInHandler(operator)) - setHandler(out, new ConnectedOutHandler(operator)) - stageActor become connected(operator) - connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value - stageActor unwatch manager - stageActor watch operator - if (!isClosed(in)) { - pull(in) // start pulling input - } - - case other => - val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") - failStage(ex) - connectionPromise.failure(ex) - - } - - } - - /** Behaviour once a connection has been established. It is assumed that operator is not null. */ - private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`operator`) => - failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) - - case CoreSerial.CommandFailed(cmd, reason) => - failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) - - case CoreSerial.Closed => - completeStage() - - case CoreSerial.Received(data) => - if (isAvailable(out)) { - push(out, data) - } else if (failOnOverflow) { - /* Note that the native backend does not provide any way of informing about - * dropped serial data. However, in most cases, a computer capable of running flow - * is also capable of processing incoming serial data at typical baud rates. - * Hence packets will usually only be dropped if an application that uses flow - * backpressures, which can however be detected here. */ - failStage(new StreamSerialException("Incoming serial data was dropped.")) - } - - case WriteAck => - if (!isClosed(in)) { - pull(in) - } - - case other => - failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) - - } - - } - -} - -private[stream] object SerialConnectionLogic { - - case object WriteAck extends CoreSerial.Event - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala deleted file mode 100644 index ceeac01..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala +++ /dev/null @@ -1,49 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} -import akka.util.ByteString - -/** - * Graph stage that establishes and thereby materializes a serial connection. - * The actual connection logic is deferred to [[SerialConnectionLogic]]. - */ -private[stream] class SerialConnectionStage( - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int -) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { - - val in: Inlet[ByteString] = Inlet("Serial.in") - val out: Outlet[ByteString] = Outlet("Serial.out") - - val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): - (GraphStageLogic, Future[Serial.Connection]) = { - - val connectionPromise = Promise[Serial.Connection] - - val logic = new SerialConnectionLogic( - shape, - manager, - port, - settings, - failOnOverflow, - bufferSize, - connectionPromise - ) - - (logic, connectionPromise.future) - } - - override def toString = s"Serial($port)" - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala deleted file mode 100644 index 60b7c90..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala +++ /dev/null @@ -1,65 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.SourceShape -import akka.stream.stage.GraphStageLogic -import ch.jodersky.flow.{Serial => CoreSerial} - -private[stream] class WatcherLogic( - shape: SourceShape[String], - ioManager: ActorRef, - ports: Set[String], - watchPromise: Promise[Serial.Watch]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - - implicit private def self = stageActor.ref - - override def preStart(): Unit = { - getStageActor(receive) - stageActor watch ioManager - for (dir <- WatcherLogic.getDirs(ports)) { - ioManager ! CoreSerial.Watch(dir, skipInitial = false) - } - } - - setHandler(shape.out, IgnoreTerminateOutput) - - private def receive(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`ioManager`) => - val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.Connected(port) => - if (ports contains port) { - if (isAvailable(shape.out)) { - push(shape.out, port) - } - } - - case other => - failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) - - } - } - -} - -private[stream] object WatcherLogic { - def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala deleted file mode 100644 index 82fad69..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala +++ /dev/null @@ -1,38 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, Outlet, SourceShape} -import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} - - -private[stream] class WatcherStage( - ioManager: ActorRef, - ports: Set[String] -) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { - - val out = Outlet[String]("Watcher.out") - - val shape = new SourceShape(out) - - override def createLogicAndMaterializedValue(attributes: Attributes): - (GraphStageLogic, Future[Serial.Watch]) = { - - val promise = Promise[Serial.Watch] - - val logic = new WatcherLogic( - shape, - ioManager, - ports, - promise - ) - - (logic, promise.future) - } - - override def toString = s"Watcher($ports)" - -} diff --git a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala b/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala deleted file mode 100644 index 1a1ebdc..0000000 --- a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -package ch.jodersky.flow -package stream - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.ByteString -import org.scalatest._ - -class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { - - implicit val system = ActorSystem("flow-test") - implicit val materializer = ActorMaterializer() - - override def afterAll { - system.terminate() - } - - "Serial stream" should { - val data = ByteString(("hello world").getBytes("utf-8")) - - "receive the same data it sends in an echo test" in { - withEcho { case (port, settings) => - val graph = Source.single(data) - .via(Serial().open(port, settings)) // send to echo pty - .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS - .dropWhile(_ != data) - .toMat(Sink.head)(Keep.right) - - Await.result(graph.run(), 2.seconds) - } - } - - "fail if the underlying pty fails" in { - val result = withEcho { case (port, settings) => - Source.single(data) - .via(Serial().open(port, settings)) - .toMat(Sink.last)(Keep.right) - .run()} - - intercept[StreamSerialException] { - Await.result(result, 10.seconds) - } - } - - } - -} diff --git a/native/build.sbt b/native/build.sbt new file mode 100644 index 0000000..2c7ffea --- /dev/null +++ b/native/build.sbt @@ -0,0 +1,8 @@ +enablePlugins(JniNative) + +sourceDirectory in nativeCompile := sourceDirectory.value + +// package native libraries from lib_native during releases +val isRelease = sys.props("release") == "true" +enableNativeCompilation in Compile := !isRelease +enableNativeCompilation in Test := !isRelease diff --git a/native/lib_native/armv7l-linux/libflow4.so b/native/lib_native/armv7l-linux/libflow4.so new file mode 100755 index 0000000..f5d9ae0 Binary files /dev/null and b/native/lib_native/armv7l-linux/libflow4.so differ diff --git a/native/lib_native/i686-linux/libflow4.so b/native/lib_native/i686-linux/libflow4.so new file mode 100644 index 0000000..fb438e2 Binary files /dev/null and b/native/lib_native/i686-linux/libflow4.so differ diff --git a/native/lib_native/x86_64-darwin/libflow4.dylib b/native/lib_native/x86_64-darwin/libflow4.dylib new file mode 100644 index 0000000..213415d Binary files /dev/null and b/native/lib_native/x86_64-darwin/libflow4.dylib differ diff --git a/native/lib_native/x86_64-linux/libflow4.so b/native/lib_native/x86_64-linux/libflow4.so new file mode 100755 index 0000000..18fb300 Binary files /dev/null and b/native/lib_native/x86_64-linux/libflow4.so differ diff --git a/native/src/.gitignore b/native/src/.gitignore new file mode 100644 index 0000000..1785c46 --- /dev/null +++ b/native/src/.gitignore @@ -0,0 +1,12 @@ +# CMake +/CMakeFiles/ +/CMakeCache.txt +/cmake_install.cmake +/Makefile + +# Binary files +*.o +*.so* +*.dylib +*.a +*~ \ No newline at end of file diff --git a/native/src/CMakeLists.txt b/native/src/CMakeLists.txt new file mode 100644 index 0000000..a57451c --- /dev/null +++ b/native/src/CMakeLists.txt @@ -0,0 +1,46 @@ +################################################################ +# A minimal CMake file that is compatible with sbt-jni # +# # +# All settings required by sbt-jni have been marked so, please # +# add/modify/remove settings to build your specific library. # +################################################################ + +cmake_minimum_required(VERSION 2.8.0) +set(ignoreMe "${SBT}") # sbt-jni defines -DSBT + +# Define project and related variables +# (required by sbt-jni) please use semantic versioning +# +project (flow) +set(PROJECT_VERSION_MAJOR 4) +set(PROJECT_VERSION_MINOR 0) +set(PROJECT_VERSION_PATCH 0) + +set(CMAKE_C_FLAGS "-std=c99") +add_definitions(-Wall) +add_definitions(-Wextra) +add_definitions(-pedantic) + +# Setup JNI +find_package(JNI REQUIRED) +if (JNI_FOUND) + message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}") +endif() + +# Include directories +include_directories(.) +include_directories(include) +include_directories(${JNI_INCLUDE_DIRS}) + +# Sources +file(GLOB LIB_SRC + "*.c" + "platform/posix/*.c" +) + +# Setup installation targets +# (required by sbt-jni) major version should always be appended to library name +# +set (LIB_NAME ${PROJECT_NAME}${PROJECT_VERSION_MAJOR}) +add_library(${LIB_NAME} SHARED ${LIB_SRC}) +install(TARGETS ${LIB_NAME} LIBRARY DESTINATION .) diff --git a/native/src/flow_jni.c b/native/src/flow_jni.c new file mode 100644 index 0000000..75bffff --- /dev/null +++ b/native/src/flow_jni.c @@ -0,0 +1,150 @@ +#include + +#include "flow.h" + +#include "ch_jodersky_flow_UnsafeSerial.h" +#include "ch_jodersky_flow_UnsafeSerial__.h" + +// suppress unused parameter warnings +#define UNUSED_ARG(x) (void)(x) + +static inline void throwException(JNIEnv* env, const char* const exception, const char * const message) +{ + (*env)->ThrowNew(env, (*env)->FindClass(env, exception), message); +} + +/** Check return code and throw exception in case it is non-zero. */ +static void check(JNIEnv* env, int ret) +{ + switch (ret) { + case -E_IO: throwException(env, "java/io/IOException", ""); break; + case -E_BUSY: throwException(env, "ch/jodersky/flow/PortInUseException", ""); break; + case -E_ACCESS_DENIED: throwException(env, "ch/jodersky/flow/AccessDeniedException", ""); break; + case -E_INVALID_SETTINGS: throwException(env, "ch/jodersky/flow/InvalidSettingsException", ""); break; + case -E_INTERRUPT: throwException(env, "ch/jodersky/flow/PortInterruptedException", ""); break; + case -E_NO_PORT: throwException(env, "ch/jodersky/flow/NoSuchPortException", ""); break; + default: return; + } +} + +/** Get pointer to serial config associated to an UnsafeSerial instance. */ +static struct serial_config* get_config(JNIEnv* env, jobject unsafe_serial) +{ + jclass clazz = (*env)->FindClass(env, "ch/jodersky/flow/UnsafeSerial"); + jfieldID field = (*env)->GetFieldID(env, clazz, "serialAddr", "J"); + jlong addr = (*env)->GetLongField(env, unsafe_serial, field); + return (struct serial_config*) (intptr_t) addr; +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial__ + * Method: open + * Signature: (Ljava/lang/String;IIZI)J + */ +JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open +(JNIEnv *env, jobject instance, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity) +{ + UNUSED_ARG(instance); + + const char *dev = (*env)->GetStringUTFChars(env, port_name, 0); + struct serial_config* config; + int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &config); + (*env)->ReleaseStringUTFChars(env, port_name, dev); + + if (r < 0) { + check(env, r); + return -E_IO; + } + + long jpointer = (long) config; + return (jlong) jpointer; +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: read + * Signature: (Ljava/nio/ByteBuffer;)I + */ +JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read +(JNIEnv *env, jobject instance, jobject buffer) +{ + char* local_buffer = (char*) (*env)->GetDirectBufferAddress(env, buffer); + if (local_buffer == NULL) { + throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); + return -E_IO; + } + size_t size = (size_t) (*env)->GetDirectBufferCapacity(env, buffer); + struct serial_config* config = get_config(env, instance); + + int r = serial_read(config, local_buffer, size); + if (r < 0) { + check(env, r); + } + return r; + +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: cancelRead + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead +(JNIEnv *env, jobject instance) +{ + int r = serial_cancel_read(get_config(env, instance)); + if (r < 0) { + check(env, r); + } +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: write + * Signature: (Ljava/nio/ByteBuffer;I)I + */ +JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write +(JNIEnv *env, jobject instance, jobject buffer, jint size) +{ + + char* local_buffer = (char *) (*env)->GetDirectBufferAddress(env, buffer); + if (local_buffer == NULL) { + throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); + return -E_IO; + } + + int r = serial_write(get_config(env, instance), local_buffer, (size_t) size); + if (r < 0) { + check(env, r); + return -E_IO; + } + return r; +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: close + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close +(JNIEnv *env, jobject instance) +{ + int r = serial_close(get_config(env, instance)); + if (r < 0) { + check(env, r); + } +} + +/* + * Class: ch_jodersky_flow_UnsafeSerial__ + * Method: debug + * Signature: (Z)V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug +(JNIEnv *env, jobject instance, jboolean value) +{ + UNUSED_ARG(env); + UNUSED_ARG(instance); + + serial_debug((bool) value); +} diff --git a/native/src/include/ch_jodersky_flow_UnsafeSerial.h b/native/src/include/ch_jodersky_flow_UnsafeSerial.h new file mode 100644 index 0000000..f80ada0 --- /dev/null +++ b/native/src/include/ch_jodersky_flow_UnsafeSerial.h @@ -0,0 +1,45 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class ch_jodersky_flow_UnsafeSerial */ + +#ifndef _Included_ch_jodersky_flow_UnsafeSerial +#define _Included_ch_jodersky_flow_UnsafeSerial +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: read + * Signature: (Ljava/nio/ByteBuffer;)I + */ +JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read + (JNIEnv *, jobject, jobject); + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: cancelRead + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead + (JNIEnv *, jobject); + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: write + * Signature: (Ljava/nio/ByteBuffer;I)I + */ +JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write + (JNIEnv *, jobject, jobject, jint); + +/* + * Class: ch_jodersky_flow_UnsafeSerial + * Method: close + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close + (JNIEnv *, jobject); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/native/src/include/ch_jodersky_flow_UnsafeSerial__.h b/native/src/include/ch_jodersky_flow_UnsafeSerial__.h new file mode 100644 index 0000000..617875f --- /dev/null +++ b/native/src/include/ch_jodersky_flow_UnsafeSerial__.h @@ -0,0 +1,29 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class ch_jodersky_flow_UnsafeSerial__ */ + +#ifndef _Included_ch_jodersky_flow_UnsafeSerial__ +#define _Included_ch_jodersky_flow_UnsafeSerial__ +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: ch_jodersky_flow_UnsafeSerial__ + * Method: open + * Signature: (Ljava/lang/String;IIZI)J + */ +JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open + (JNIEnv *, jobject, jstring, jint, jint, jboolean, jint); + +/* + * Class: ch_jodersky_flow_UnsafeSerial__ + * Method: debug + * Signature: (Z)V + */ +JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug + (JNIEnv *, jobject, jboolean); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/native/src/include/flow.h b/native/src/include/flow.h new file mode 100644 index 0000000..e3f33b9 --- /dev/null +++ b/native/src/include/flow.h @@ -0,0 +1,103 @@ +#ifndef FLOW_H +#define FLOW_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +// general error codes, whose that are returned by functions +#define E_IO 1 // IO error +#define E_ACCESS_DENIED 2 // access denied +#define E_BUSY 3 // port is busy +#define E_INVALID_SETTINGS 4 // some port settings are invalid +#define E_INTERRUPT 5 // not really an error, function call aborted because port is closed +#define E_NO_PORT 6 // requested port does not exist + +#define PARITY_NONE 0 +#define PARITY_ODD 1 +#define PARITY_EVEN 2 + +/** + * Contains internal configuration of an open serial port. + */ +struct serial_config; + +/** + * Opens a serial port and allocates memory for storing configuration. Note: if this function fails, + * any internally allocated resources will be freed. + * @param port_name name of port + * @param baud baud rate + * @param char_size character size of data transmitted through serial device + * @param two_stop_bits set to use two stop bits instead of one + * @param parity kind of parity checking to use + * @param serial pointer to memory that will be allocated with a serial structure + * @return 0 on success + * @return -E_NO_PORT if the given port does not exist + * @return -E_ACCESS_DENIED if permissions are not sufficient to open port + * @return -E_BUSY if port is already in use + * @return -E_INVALID_SETTINGS if any of the specified settings are invalid + * @return -E_IO on other error + */ +int serial_open( + const char* port_name, + int baud, + int char_size, + bool two_stop_bits, + int parity, + struct serial_config** const serial); + +/** + * Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to + * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT + * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per + * close manual page, close should not be called on a file descriptor that is in use by another thread). + * @param serial pointer to serial configuration that is to be closed (and freed) + * @return 0 on success + * @return -E_IO on error + */ +int serial_close(struct serial_config* const serial); + +/** + * Starts a read from a previously opened serial port. The read is blocking, however it may be + * interrupted by calling 'serial_cancel_read' on the given serial port. + * @param serial pointer to serial configuration from which to read + * @param buffer buffer into which data is read + * @param size maximum buffer size + * @return n>0 the number of bytes read into buffer + * @return -E_INTERRUPT if the call to this function was interrupted + * @return -E_IO on IO error + */ +int serial_read(struct serial_config* const serial, char* const buffer, size_t size); + +/** + * Cancels a blocked read call. This function is thread safe, i.e. it may be called from a thread even + * while another thread is blocked in a read call. + * @param serial_config the serial port to interrupt + * @return 0 on success + * @return -E_IO on error + */ +int serial_cancel_read(struct serial_config* const serial); + +/** + * Writes data to a previously opened serial port. Non bocking. + * @param serial pointer to serial configuration to which to write + * @param data data to write + * @param size number of bytes to write from data + * @return n>0 the number of bytes written + * @return -E_IO on IO error + */ +int serial_write(struct serial_config* const serial, char* const data, size_t size); + +/** + * Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. + */ +void serial_debug(bool value); + +#ifdef __cplusplus +} +#endif + +#endif /* FLOW_H */ diff --git a/native/src/platform/posix/flow.c b/native/src/platform/posix/flow.c new file mode 100644 index 0000000..969949b --- /dev/null +++ b/native/src/platform/posix/flow.c @@ -0,0 +1,263 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "flow.h" + +#define DATA_CANCEL 0xffffffff + +static bool debug = false; + +static void print_debug(const char* const msg, int en) +{ + if (debug) { + if (errno == 0) { + fprintf(stderr, "%s", msg); + } else { + fprintf(stderr, "%s: %d\n", msg, en); + } + fflush(stderr); + } +} + +void serial_debug(bool value) +{ + debug = value; +} + +//contains file descriptors used in managing a serial port +struct serial_config { + int port_fd; // file descriptor of serial port + + /* a pipe is used to abort a serial read by writing something into the + * write end of the pipe */ + int pipe_read_fd; // file descriptor, read end of pipe + int pipe_write_fd; // file descriptor, write end of pipe +}; + +int serial_open( + const char* const port_name, + int baud, + int char_size, + bool two_stop_bits, + int parity, + struct serial_config** serial) +{ + + int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK); + + if (fd < 0) { + int en = errno; + print_debug("Error obtaining file descriptor for port", en); + if (en == EACCES) return -E_ACCESS_DENIED; + if (en == ENOENT) return -E_NO_PORT; + return -E_IO; + } + + if (flock(fd, LOCK_EX | LOCK_NB) < 0) { + print_debug("Error acquiring lock on port", errno); + close(fd); + return -E_BUSY; + } + + /* configure new port settings */ + struct termios newtio; + + /* initialize serial interface */ + newtio.c_iflag = 0; + newtio.c_oflag = 0; + newtio.c_lflag = 0; + newtio.c_cflag = CREAD; + + /* set speed */ + speed_t bd; + switch (baud) { + case 50: bd = B50; break; + case 75: bd = B75; break; + case 110: bd = B110; break; + case 134: bd = B134; break; + case 150: bd = B150; break; + case 200: bd = B200; break; + case 300: bd = B300; break; + case 600: bd = B600; break; + case 1200: bd = B1200; break; + case 1800: bd = B1800; break; + case 2400: bd = B2400; break; + case 4800: bd = B4800; break; + case 9600: bd = B9600; break; + case 19200: bd = B19200; break; + case 38400: bd = B38400; break; + case 57600: bd = B57600; break; + case 115200: bd = B115200; break; + case 230400: bd = B230400; break; + default: + close(fd); + print_debug("Invalid baud rate", 0); + return -E_INVALID_SETTINGS; + } + + if (cfsetspeed(&newtio, bd) < 0) { + print_debug("Error setting baud rate", errno); + close(fd); + return -E_IO; + } + + /* set char size*/ + switch (char_size) { + case 5: newtio.c_cflag |= CS5; break; + case 6: newtio.c_cflag |= CS6; break; + case 7: newtio.c_cflag |= CS7; break; + case 8: newtio.c_cflag |= CS8; break; + default: + close(fd); + print_debug("Invalid character size", 0); + return -E_INVALID_SETTINGS; + } + + /* use two stop bits */ + if (two_stop_bits){ + newtio.c_cflag |= CSTOPB; + } + + /* set parity */ + switch (parity) { + case PARITY_NONE: break; + case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break; + case PARITY_EVEN: newtio.c_cflag |= PARENB; break; + default: + close(fd); + print_debug("Invalid parity", 0); + return -E_INVALID_SETTINGS; + } + + if (tcflush(fd, TCIOFLUSH) < 0) { + print_debug("Error flushing serial settings", errno); + close(fd); + return -E_IO; + } + + if (tcsetattr(fd, TCSANOW, &newtio) < 0) { + print_debug("Error applying serial settings", errno); + close(fd); + return -E_IO; + } + + int pipe_fd[2]; + if (pipe(pipe_fd) < 0) { + print_debug("Error opening pipe", errno); + close(fd); + return -E_IO; + } + + if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) { + print_debug("Error setting pipe to non-blocking", errno); + close(fd); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -E_IO; + } + + struct serial_config* s = malloc(sizeof(s)); + if (s == NULL) { + print_debug("Error allocating memory for serial configuration", errno); + close(fd); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -E_IO; + } + + s->port_fd = fd; + s->pipe_read_fd = pipe_fd[0]; + s->pipe_write_fd = pipe_fd[1]; + (*serial) = s; + + return 0; +} + +int serial_close(struct serial_config* const serial) +{ + if (close(serial->pipe_write_fd) < 0) { + print_debug("Error closing write end of pipe", errno); + return -E_IO; + } + if (close(serial->pipe_read_fd) < 0) { + print_debug("Error closing read end of pipe", errno); + return -E_IO; + } + + if (flock(serial->port_fd, LOCK_UN) < 0){ + print_debug("Error releasing lock on port", errno); + return -E_IO; + } + if (close(serial->port_fd) < 0) { + print_debug("Error closing port", errno); + return -E_IO; + } + + free(serial); + return 0; +} + +int serial_read(struct serial_config* const serial, char* const buffer, size_t size) +{ + int port = serial->port_fd; + int pipe = serial->pipe_read_fd; + + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(port, &rfds); + FD_SET(pipe, &rfds); + + int nfds = pipe + 1; + if (pipe < port) nfds = port + 1; + + int n = select(nfds, &rfds, NULL, NULL, NULL); + if (n < 0) { + print_debug("Error trying to call select on port and pipe", errno); + return -E_IO; + } + + if (FD_ISSET(pipe, &rfds)) { + return -E_INTERRUPT; + } else if (FD_ISSET(port, &rfds)) { + int r = read(port, buffer, size); + + // treat 0 bytes read as an error to avoid problems on disconnect + // anyway, after a poll there should be more than 0 bytes available to read + if (r <= 0) { + print_debug("Error data not available after select", errno); + return -E_IO; + } + return r; + } else { + print_debug("Select returned unknown read sets", 0); + return -E_IO; + } +} + +int serial_cancel_read(struct serial_config* const serial) +{ + int data = DATA_CANCEL; + + //write to pipe to wake up any blocked read thread (self-pipe trick) + if (write(serial->pipe_write_fd, &data, 1) < 0) { + print_debug("Error writing to pipe during read cancel", errno); + return -E_IO; + } + + return 0; +} + +int serial_write(struct serial_config* const serial, char* const data, size_t size) +{ + int r = write(serial->port_fd, data, size); + if (r < 0) { + print_debug("Error writing to port", errno); + return -E_IO; + } + return r; +} diff --git a/native/src/platform/windows/README b/native/src/platform/windows/README new file mode 100644 index 0000000..3d24410 --- /dev/null +++ b/native/src/platform/windows/README @@ -0,0 +1 @@ +The contents of the file flow.c were found in the avrdude project. They look like they may be a good starting point for serial communication on windows. diff --git a/native/src/platform/windows/flow.c.disabled b/native/src/platform/windows/flow.c.disabled new file mode 100644 index 0000000..86a267c --- /dev/null +++ b/native/src/platform/windows/flow.c.disabled @@ -0,0 +1,416 @@ +/* + * avrdude - A Downloader/Uploader for AVR device programmers + * Copyright (C) 2003, 2004 Martin J. Thomas + * Copyright (C) 2006 Joerg Wunsch + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +/* $Id$ */ + +/* + * Native Win32 serial interface for avrdude. + */ + +#include "avrdude.h" + +#if defined(WIN32NATIVE) + +#include +#include +#include /* for isprint */ + +#include "serial.h" + +long serial_recv_timeout = 5000; /* ms */ + +#define W32SERBUFSIZE 1024 + +struct baud_mapping { + long baud; + DWORD speed; +}; + +/* HANDLE hComPort=INVALID_HANDLE_VALUE; */ + +static struct baud_mapping baud_lookup_table [] = { + { 1200, CBR_1200 }, + { 2400, CBR_2400 }, + { 4800, CBR_4800 }, + { 9600, CBR_9600 }, + { 19200, CBR_19200 }, + { 38400, CBR_38400 }, + { 57600, CBR_57600 }, + { 115200, CBR_115200 }, + { 0, 0 } /* Terminator. */ +}; + +static DWORD serial_baud_lookup(long baud) +{ + struct baud_mapping *map = baud_lookup_table; + + while (map->baud) { + if (map->baud == baud) + return map->speed; + map++; + } + + /* + * If a non-standard BAUD rate is used, issue + * a warning (if we are verbose) and return the raw rate + */ + if (verbose > 0) + fprintf(stderr, "%s: serial_baud_lookup(): Using non-standard baud rate: %ld", + progname, baud); + + return baud; +} + + +static BOOL serial_w32SetTimeOut(HANDLE hComPort, DWORD timeout) // in ms +{ + COMMTIMEOUTS ctmo; + ZeroMemory (&ctmo, sizeof(COMMTIMEOUTS)); + ctmo.ReadIntervalTimeout = timeout; + ctmo.ReadTotalTimeoutMultiplier = timeout; + ctmo.ReadTotalTimeoutConstant = timeout; + + return SetCommTimeouts(hComPort, &ctmo); +} + +static int ser_setspeed(union filedescriptor *fd, long baud) +{ + DCB dcb; + HANDLE hComPort = (HANDLE)fd->pfd; + + ZeroMemory (&dcb, sizeof(DCB)); + dcb.DCBlength = sizeof(DCB); + dcb.BaudRate = serial_baud_lookup (baud); + dcb.fBinary = 1; + dcb.fDtrControl = DTR_CONTROL_DISABLE; + dcb.fRtsControl = RTS_CONTROL_DISABLE; + dcb.ByteSize = 8; + dcb.Parity = NOPARITY; + dcb.StopBits = ONESTOPBIT; + + if (!SetCommState(hComPort, &dcb)) + return -1; + + return 0; +} + + +static int ser_open(char * port, long baud, union filedescriptor *fdp) +{ + LPVOID lpMsgBuf; + HANDLE hComPort=INVALID_HANDLE_VALUE; + char *newname = 0; + + /* + * If the port is of the form "net::", then + * handle it as a TCP connection to a terminal server. + * + * This is curently not implemented for Win32. + */ + if (strncmp(port, "net:", strlen("net:")) == 0) { + fprintf(stderr, + "%s: ser_open(): network connects are currently not" + "implemented for Win32 environments\n", + progname); + return -1; + } + + if (strncasecmp(port, "com", strlen("com")) == 0) { + + // prepend "\\\\.\\" to name, required for port # >= 10 + newname = malloc(strlen("\\\\.\\") + strlen(port) + 1); + + if (newname == 0) { + fprintf(stderr, + "%s: ser_open(): out of memory\n", + progname); + exit(1); + } + strcpy(newname, "\\\\.\\"); + strcat(newname, port); + + port = newname; + } + + hComPort = CreateFile(port, GENERIC_READ | GENERIC_WRITE, 0, NULL, + OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); + + if (hComPort == INVALID_HANDLE_VALUE) { + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL); + fprintf(stderr, "%s: ser_open(): can't open device \"%s\": %s\n", + progname, port, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + return -1; + } + + if (!SetupComm(hComPort, W32SERBUFSIZE, W32SERBUFSIZE)) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set buffers for \"%s\"\n", + progname, port); + return -1; + } + + fdp->pfd = (void *)hComPort; + if (ser_setspeed(fdp, baud) != 0) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set com-state for \"%s\"\n", + progname, port); + return -1; + } + + if (!serial_w32SetTimeOut(hComPort,0)) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set initial timeout for \"%s\"\n", + progname, port); + return -1; + } + + if (newname != 0) { + free(newname); + } + return 0; +} + + +static void ser_close(union filedescriptor *fd) +{ + HANDLE hComPort=(HANDLE)fd->pfd; + if (hComPort != INVALID_HANDLE_VALUE) + CloseHandle (hComPort); + + hComPort = INVALID_HANDLE_VALUE; +} + +static int ser_set_dtr_rts(union filedescriptor *fd, int is_on) +{ + HANDLE hComPort=(HANDLE)fd->pfd; + + if (is_on) { + EscapeCommFunction(hComPort, SETDTR); + EscapeCommFunction(hComPort, SETRTS); + } else { + EscapeCommFunction(hComPort, CLRDTR); + EscapeCommFunction(hComPort, CLRRTS); + } + return 0; +} + + +static int ser_send(union filedescriptor *fd, unsigned char * buf, size_t buflen) +{ + size_t len = buflen; + unsigned char c='\0'; + DWORD written; + unsigned char * b = buf; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_send(): port not open\n", + progname); + exit(1); + } + + if (!len) + return 0; + + if (verbose > 3) + { + fprintf(stderr, "%s: Send: ", progname); + + while (len) { + c = *b; + if (isprint(c)) { + fprintf(stderr, "%c ", c); + } + else { + fprintf(stderr, ". "); + } + fprintf(stderr, "[%02x] ", c); + b++; + len--; + } + fprintf(stderr, "\n"); + } + + serial_w32SetTimeOut(hComPort,500); + + if (!WriteFile (hComPort, buf, buflen, &written, NULL)) { + fprintf(stderr, "%s: ser_send(): write error: %s\n", + progname, "sorry no info avail"); // TODO + exit(1); + } + + if (written != buflen) { + fprintf(stderr, "%s: ser_send(): size/send mismatch\n", + progname); + exit(1); + } + + return 0; +} + + +static int ser_recv(union filedescriptor *fd, unsigned char * buf, size_t buflen) +{ + unsigned char c; + unsigned char * p = buf; + DWORD read; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_read(): port not open\n", + progname); + exit(1); + } + + serial_w32SetTimeOut(hComPort, serial_recv_timeout); + + if (!ReadFile(hComPort, buf, buflen, &read, NULL)) { + LPVOID lpMsgBuf; + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL ); + fprintf(stderr, "%s: ser_recv(): read error: %s\n", + progname, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + exit(1); + } + + /* time out detected */ + if (read == 0) { + if (verbose > 1) + fprintf(stderr, + "%s: ser_recv(): programmer is not responding\n", + progname); + return -1; + } + + p = buf; + + if (verbose > 3) + { + fprintf(stderr, "%s: Recv: ", progname); + + while (read) { + c = *p; + if (isprint(c)) { + fprintf(stderr, "%c ", c); + } + else { + fprintf(stderr, ". "); + } + fprintf(stderr, "[%02x] ", c); + + p++; + read--; + } + fprintf(stderr, "\n"); + } + return 0; +} + + +static int ser_drain(union filedescriptor *fd, int display) +{ + // int rc; + unsigned char buf[10]; + BOOL readres; + DWORD read; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_drain(): port not open\n", + progname); + exit(1); + } + + serial_w32SetTimeOut(hComPort,250); + + if (display) { + fprintf(stderr, "drain>"); + } + + while (1) { + readres=ReadFile(hComPort, buf, 1, &read, NULL); + if (!readres) { + LPVOID lpMsgBuf; + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL ); + fprintf(stderr, "%s: ser_drain(): read error: %s\n", + progname, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + exit(1); + } + + if (read) { // data avail + if (display) fprintf(stderr, "%02x ", buf[0]); + } + else { // no more data + if (display) fprintf(stderr, "/run`. + +All projects, including samples, can be listed by running `sbt projects`. diff --git a/samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala b/samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala new file mode 100644 index 0000000..a669ce7 --- /dev/null +++ b/samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala @@ -0,0 +1,66 @@ +package ch.jodersky.akka.serial +package samples.terminalstream + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.io.StdIn + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.util.ByteString + +import stream.Serial + +object Main { + + final val Delay = FiniteDuration(500, MILLISECONDS) + + implicit val system = ActorSystem("terminal-stream") + implicit val materializer = ActorMaterializer() + + def ask(label: String, default: String) = { + print(label + " [" + default.toString + "]: ") + val in = StdIn.readLine() + println("") + if (in.isEmpty) default else in + } + + def main(args: Array[String]): Unit = { + import system.dispatcher + + val port = ask("Device", "/dev/ttyACM0") + val baud = ask("Baud rate", "115200").toInt + val cs = ask("Char size", "8").toInt + val tsb = ask("Use two stop bits", "false").toBoolean + val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) + val settings = SerialSettings(baud, cs, tsb, parity) + + val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] = + Serial().open(port, settings) + + val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data => + println("server says: " + data.decodeString("UTF-8")) + } + + val ticker: Source[ByteString, _] = Source.tick(Delay, Delay, ()).scan(0){case (x, _) => + x + 1 + }.map{ x => + println(x) + ByteString(x.toString) + } + + val connection: Future[Serial.Connection] = ticker.viaMat(serial)(Keep.right).to(printer).run() + + connection map { conn => + println("Connected to " + conn.port) + StdIn.readLine("Press enter to exit") + } recover { case err => + println("Cannot connect: " + err) + } andThen { case _ => + system.terminate() + } + + } + +} diff --git a/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala new file mode 100644 index 0000000..1bc547a --- /dev/null +++ b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala @@ -0,0 +1,29 @@ +package ch.jodersky.akka.serial +package samples.terminal + +import akka.actor.Actor +import scala.io.StdIn + +class ConsoleReader extends Actor { + import context._ + import ConsoleReader._ + + def receive = { + case Read => + StdIn.readLine() match { + case ":q" | null => parent ! EOT + case s => { + parent ! ConsoleInput(s) + } + } + } + +} + +object ConsoleReader { + + case object Read + case object EOT + case class ConsoleInput(in: String) + +} diff --git a/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala new file mode 100644 index 0000000..394c417 --- /dev/null +++ b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala @@ -0,0 +1,30 @@ +package ch.jodersky.akka.serial +package samples.terminal + +import akka.actor.ActorSystem +import scala.io.StdIn + +object Main { + + def ask(label: String, default: String) = { + print(label + " [" + default.toString + "]: ") + val in = StdIn.readLine() + println("") + if (in.isEmpty) default else in + } + + def main(args: Array[String]): Unit = { + val port = ask("Device", "/dev/ttyACM0") + val baud = ask("Baud rate", "115200").toInt + val cs = ask("Char size", "8").toInt + val tsb = ask("Use two stop bits", "false").toBoolean + val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) + val settings = SerialSettings(baud, cs, tsb, parity) + + println("Starting terminal system, enter :q to exit.") + Serial.debug(true) + val system = ActorSystem("flow") + val terminal = system.actorOf(Terminal(port, settings), name = "terminal") + system.registerOnTermination(println("Stopped terminal system.")) + } +} diff --git a/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala new file mode 100644 index 0000000..5e1fc2f --- /dev/null +++ b/samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala @@ -0,0 +1,75 @@ +package ch.jodersky.akka.serial +package samples.terminal + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } +import akka.io.IO +import akka.util.ByteString + +class Terminal(port: String, settings: SerialSettings) extends Actor with ActorLogging { + import Terminal._ + import context._ + + val reader = actorOf(Props[ConsoleReader]) + + log.info(s"Requesting manager to open port: ${port}, baud: ${settings.baud}") + IO(Serial) ! Serial.Open(port, settings) + + override def postStop() = { + system.terminate() + } + + def receive = { + case Serial.CommandFailed(cmd, reason) => { + log.error(s"Connection failed, stopping terminal. Reason: ${reason}") + context stop self + } + case Serial.Opened(port) => { + log.info(s"Port ${port} is now open.") + val operator = sender + context become opened(operator) + context watch operator + reader ! ConsoleReader.Read + } + } + + def opened(operator: ActorRef): Receive = { + + case Serial.Received(data) => { + log.info(s"Received data: ${formatData(data)}") + } + + case Terminal.Wrote(data) => log.info(s"Wrote data: ${formatData(data)}") + + case Serial.Closed => { + log.info("Operator closed normally, exiting terminal.") + context unwatch operator + context stop self + } + + case Terminated(`operator`) => { + log.error("Operator crashed, exiting terminal.") + context stop self + } + + case ConsoleReader.EOT => { + log.info("Initiating close.") + operator ! Serial.Close + } + + case ConsoleReader.ConsoleInput(input) => { + val data = ByteString(input.getBytes) + operator ! Serial.Write(data, length => Wrote(data.take(length))) + reader ! ConsoleReader.Read + } + } + +} + +object Terminal { + case class Wrote(data: ByteString) extends Serial.Event + + def apply(port: String, settings: SerialSettings) = Props(classOf[Terminal], port, settings) + + private def formatData(data: ByteString) = data.mkString("[", ",", "]") + " " + (new String(data.toArray, "UTF-8")) + +} diff --git a/samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala b/samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala new file mode 100644 index 0000000..6393953 --- /dev/null +++ b/samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala @@ -0,0 +1,50 @@ +package ch.jodersky.akka.serial +package samples.watcher + +import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.io.IO +import scala.io.StdIn + +class Watcher extends Actor with ActorLogging { + import context._ + + val ports = List( + "/dev/ttyUSB\\d+", + "/dev/ttyACM\\d+", + "/dev/cu\\d+", + "/dev/ttyS\\d+" + ) + + override def preStart() = { + val cmd = Serial.Watch() + IO(Serial) ! cmd //watch for new devices + log.info(s"Watching ${cmd.directory} for new devices.") + } + + def receive = { + + case Serial.CommandFailed(w: Serial.Watch, err) => + log.error(err, s"Could not get a watch on ${w.directory}.") + context stop self + + case Serial.Connected(path) => + log.info(s"New device: ${path}") + ports.find(path matches _) match { + case Some(port) => log.info(s"Device is a serial device.") + case None => log.warning(s"Device is NOT serial device.") + } + + } + +} + +object Main { + + def main(args: Array[String]): Unit = { + val system = ActorSystem("flow") + val watcher = system.actorOf(Props(classOf[Watcher]), name = "watcher") + StdIn.readLine() + system.terminate() + } + +} diff --git a/stream/build.sbt b/stream/build.sbt new file mode 100644 index 0000000..c9aa7eb --- /dev/null +++ b/stream/build.sbt @@ -0,0 +1,5 @@ +import flow.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaStream +libraryDependencies += Dependencies.scalatest % "test" diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala new file mode 100644 index 0000000..2195435 --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala @@ -0,0 +1,67 @@ +package ch.jodersky.akka.serial +package stream + +import akka.stream.scaladsl.Source +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +import ch.jodersky.akka.serial.{Serial => CoreSerial} +import impl._ + +object Serial extends ExtensionId[Serial] with ExtensionIdProvider { + + /** + * Represents a prospective serial connection. + */ + case class Connection(port: String, settings: SerialSettings) + + case class Watch(ports: Set[String]) + + def apply()(implicit system: ActorSystem): Serial = super.apply(system) + + override def lookup() = Serial + + override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) + +} + +/** + * Entry point to streaming over serial ports. + * The design of this API is inspired by Akka's Tcp streams. + */ +class Serial(system: ExtendedActorSystem) extends Extension { + + /** + * Creates a Flow that will open a serial port when materialized. + * This Flow then represents an open serial connection: data pushed to its + * inlet will be written to the underlying serial port, and data received + * on the port will be emitted by its outlet. + * @param port name of serial port to open + * @param settings settings to use with serial port + * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped + * @param bufferSize maximum read and write buffer sizes + * @return a Flow associated to the given serial port + */ + def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): + Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( + new SerialConnectionStage( + IO(CoreSerial)(system), + port, + settings, + failOnOverflow, + bufferSize + ) + ) + + def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( + new WatcherStage( + IO(CoreSerial)(system), + ports + ) + ) + +} diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala b/stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala new file mode 100644 index 0000000..ed94374 --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala @@ -0,0 +1,5 @@ +package ch.jodersky.akka.serial +package stream + +/** Represents a generic exception occured during streaming of serial data. */ +class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala b/stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala new file mode 100644 index 0000000..96fbb77 --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala @@ -0,0 +1,4 @@ +package ch.jodersky.akka.serial +package stream + +class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala new file mode 100644 index 0000000..13ee5d6 --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala @@ -0,0 +1,172 @@ +package ch.jodersky.akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.{FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString + +import ch.jodersky.akka.serial.{Serial => CoreSerial, SerialSettings} + +/** + * Graph logic that handles establishing and forwarding serial communication. + * The underlying stream is closed when downstream (output) finishes, + * upstream (input) closes are ignored. + */ +private[stream] class SerialConnectionLogic( + shape: FlowShape[ByteString, ByteString], + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int, + connectionPromise: Promise[Serial.Connection]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + import SerialConnectionLogic._ + + /** Receives data and writes it to the serial backend. */ + private def in: Inlet[ByteString] = shape.in + + /** Receives data from the serial backend and pushes it downstream. */ + private def out: Outlet[ByteString] = shape.out + + /** Implicit alias to stageActor so it will be used in "!" calls, without + * explicitly specifying a sender. */ + implicit private def self = stageActor.ref + + /** + * Input handler for an established connection. + * @param operator the operator actor of the established connection + */ + class ConnectedInHandler(operator: ActorRef) extends InHandler { + + override def onPush(): Unit = { + val elem = grab(in) + require(elem != null) // reactive streams requirement + operator ! CoreSerial.Write(elem, _ => WriteAck) + } + + override def onUpstreamFinish(): Unit = { + if (isClosed(out)) { // close serial connection if output is also closed + operator ! CoreSerial.Close + } + } + + } + + class ConnectedOutHandler(operator: ActorRef) extends OutHandler { + // implicit alias to stage actor, so it will be used in "!" calls + implicit val self = stageActor.ref + + override def onPull(): Unit = { + // serial connections are at the end of the "backpressure chain", + // they do not natively support backpressure (as does TCP for example) + // therefore nothing is done here + } + + override def onDownstreamFinish(): Unit = { + // closing downstream also closes the underlying connection + operator ! CoreSerial.Close + } + + } + + override def preStart(): Unit = { + setKeepGoing(true) // serial connection operator will manage completing stage + getStageActor(connecting) + stageActor watch manager + manager ! CoreSerial.Open(port, settings, bufferSize) + } + + setHandler(in, IgnoreTerminateInput) + setHandler(out, IgnoreTerminateOutput) + + /** Initial behavior, before a serial connection is established. */ + private def connecting(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`manager`) => + val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.Opened(port) => + val operator = sender + setHandler(in, new ConnectedInHandler(operator)) + setHandler(out, new ConnectedOutHandler(operator)) + stageActor become connected(operator) + connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value + stageActor unwatch manager + stageActor watch operator + if (!isClosed(in)) { + pull(in) // start pulling input + } + + case other => + val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") + failStage(ex) + connectionPromise.failure(ex) + + } + + } + + /** Behaviour once a connection has been established. It is assumed that operator is not null. */ + private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`operator`) => + failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) + + case CoreSerial.CommandFailed(cmd, reason) => + failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) + + case CoreSerial.Closed => + completeStage() + + case CoreSerial.Received(data) => + if (isAvailable(out)) { + push(out, data) + } else if (failOnOverflow) { + /* Note that the native backend does not provide any way of informing about + * dropped serial data. However, in most cases, a computer capable of running flow + * is also capable of processing incoming serial data at typical baud rates. + * Hence packets will usually only be dropped if an application that uses flow + * backpressures, which can however be detected here. */ + failStage(new StreamSerialException("Incoming serial data was dropped.")) + } + + case WriteAck => + if (!isClosed(in)) { + pull(in) + } + + case other => + failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) + + } + + } + +} + +private[stream] object SerialConnectionLogic { + + case object WriteAck extends CoreSerial.Event + +} diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala new file mode 100644 index 0000000..20f38eb --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala @@ -0,0 +1,49 @@ +package ch.jodersky.akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString + +/** + * Graph stage that establishes and thereby materializes a serial connection. + * The actual connection logic is deferred to [[SerialConnectionLogic]]. + */ +private[stream] class SerialConnectionStage( + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int +) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { + + val in: Inlet[ByteString] = Inlet("Serial.in") + val out: Outlet[ByteString] = Outlet("Serial.out") + + val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): + (GraphStageLogic, Future[Serial.Connection]) = { + + val connectionPromise = Promise[Serial.Connection] + + val logic = new SerialConnectionLogic( + shape, + manager, + port, + settings, + failOnOverflow, + bufferSize, + connectionPromise + ) + + (logic, connectionPromise.future) + } + + override def toString = s"Serial($port)" + +} diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala new file mode 100644 index 0000000..e9dc41d --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala @@ -0,0 +1,65 @@ +package ch.jodersky.akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import ch.jodersky.akka.serial.{Serial => CoreSerial} + +private[stream] class WatcherLogic( + shape: SourceShape[String], + ioManager: ActorRef, + ports: Set[String], + watchPromise: Promise[Serial.Watch]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + + implicit private def self = stageActor.ref + + override def preStart(): Unit = { + getStageActor(receive) + stageActor watch ioManager + for (dir <- WatcherLogic.getDirs(ports)) { + ioManager ! CoreSerial.Watch(dir, skipInitial = false) + } + } + + setHandler(shape.out, IgnoreTerminateOutput) + + private def receive(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`ioManager`) => + val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.Connected(port) => + if (ports contains port) { + if (isAvailable(shape.out)) { + push(shape.out, port) + } + } + + case other => + failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) + + } + } + +} + +private[stream] object WatcherLogic { + def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) +} diff --git a/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala b/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala new file mode 100644 index 0000000..be6af18 --- /dev/null +++ b/stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala @@ -0,0 +1,38 @@ +package ch.jodersky.akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} + + +private[stream] class WatcherStage( + ioManager: ActorRef, + ports: Set[String] +) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { + + val out = Outlet[String]("Watcher.out") + + val shape = new SourceShape(out) + + override def createLogicAndMaterializedValue(attributes: Attributes): + (GraphStageLogic, Future[Serial.Watch]) = { + + val promise = Promise[Serial.Watch] + + val logic = new WatcherLogic( + shape, + ioManager, + ports, + promise + ) + + (logic, promise.future) + } + + override def toString = s"Watcher($ports)" + +} diff --git a/stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala b/stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala new file mode 100644 index 0000000..29c4ef6 --- /dev/null +++ b/stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala @@ -0,0 +1,51 @@ +package ch.jodersky.akka.serial +package stream + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.ByteString +import org.scalatest._ + +class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { + + implicit val system = ActorSystem("flow-test") + implicit val materializer = ActorMaterializer() + + override def afterAll { + system.terminate() + } + + "Serial stream" should { + val data = ByteString(("hello world").getBytes("utf-8")) + + "receive the same data it sends in an echo test" in { + withEcho { case (port, settings) => + val graph = Source.single(data) + .via(Serial().open(port, settings)) // send to echo pty + .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS + .dropWhile(_ != data) + .toMat(Sink.head)(Keep.right) + + Await.result(graph.run(), 2.seconds) + } + } + + "fail if the underlying pty fails" in { + val result = withEcho { case (port, settings) => + Source.single(data) + .via(Serial().open(port, settings)) + .toMat(Sink.last)(Keep.right) + .run()} + + intercept[StreamSerialException] { + Await.result(result, 10.seconds) + } + } + + } + +} -- cgit v1.2.3