From 23959966760174477a6b0fcbf9dd1e8ef37c643b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 8 Jan 2017 21:16:25 +0100 Subject: Rename project to akka-serial --- .travis.yml | 2 +- CHANGELOG.md | 4 + Documentation/developer.md | 18 +- Documentation/manual.md | 34 +- README.md | 32 +- build.sbt | 45 ++- core/build.sbt | 5 + core/src/main/scala/akka/serial/Serial.scala | 132 +++++++ core/src/main/scala/akka/serial/SerialExt.scala | 9 + .../src/main/scala/akka/serial/SerialManager.scala | 48 +++ .../main/scala/akka/serial/SerialOperator.scala | 84 +++++ core/src/main/scala/akka/serial/Watcher.scala | 143 +++++++ core/src/test/resources/reference.conf | 3 + .../test/scala/akka/serial/SerialManagerSpec.scala | 40 ++ .../scala/akka/serial/SerialOperatorSpec.scala | 50 +++ flow-core/build.sbt | 7 - .../src/main/scala/ch/jodersky/flow/Parity.scala | 9 - .../src/main/scala/ch/jodersky/flow/Serial.scala | 132 ------- .../scala/ch/jodersky/flow/SerialConnection.scala | 140 ------- .../main/scala/ch/jodersky/flow/SerialExt.scala | 9 - .../scala/ch/jodersky/flow/SerialManager.scala | 47 --- .../scala/ch/jodersky/flow/SerialOperator.scala | 83 ---- .../scala/ch/jodersky/flow/SerialSettings.scala | 10 - .../main/scala/ch/jodersky/flow/UnsafeSerial.scala | 108 ------ .../src/main/scala/ch/jodersky/flow/Watcher.scala | 143 ------- .../main/scala/ch/jodersky/flow/exceptions.scala | 19 - .../scala/ch/jodersky/flow/PseudoTerminal.scala | 43 --- .../scala/ch/jodersky/flow/SerialManagerSpec.scala | 38 -- .../ch/jodersky/flow/SerialOperatorSpec.scala | 49 --- flow-native/build.sbt | 8 - flow-native/lib_native/armv7l-linux/libflow4.so | Bin 12852 -> 0 bytes flow-native/lib_native/i686-linux/libflow4.so | Bin 14735 -> 0 bytes .../lib_native/x86_64-darwin/libflow4.dylib | Bin 18336 -> 0 bytes flow-native/lib_native/x86_64-linux/libflow4.so | Bin 16791 -> 0 bytes flow-native/src/.gitignore | 12 - flow-native/src/CMakeLists.txt | 46 --- flow-native/src/flow_jni.c | 150 -------- .../src/include/ch_jodersky_flow_UnsafeSerial.h | 45 --- .../src/include/ch_jodersky_flow_UnsafeSerial__.h | 29 -- flow-native/src/include/flow.h | 103 ----- flow-native/src/platform/posix/flow.c | 263 ------------- flow-native/src/platform/windows/README | 1 - flow-native/src/platform/windows/flow.c.disabled | 416 --------------------- flow-native/src/readme.md | 3 - flow-samples/README.md | 3 - .../flow/samples/terminalstream/Main.scala | 66 ---- .../flow/samples/terminal/ConsoleReader.scala | 29 -- .../ch/jodersky/flow/samples/terminal/Main.scala | 30 -- .../jodersky/flow/samples/terminal/Terminal.scala | 75 ---- .../ch/jodersky/flow/samples/watcher/main.scala | 50 --- flow-stream/build.sbt | 5 - .../scala/ch/jodersky/flow/stream/Serial.scala | 67 ---- .../flow/stream/StreamSerialException.scala | 5 - .../flow/stream/StreamWatcherException.scala | 4 - .../flow/stream/impl/SerialConnectionLogic.scala | 172 --------- .../flow/stream/impl/SerialConnectionStage.scala | 49 --- .../jodersky/flow/stream/impl/WatcherLogic.scala | 65 ---- .../jodersky/flow/stream/impl/WatcherStage.scala | 38 -- .../scala/ch/jodersky/flow/stream/SerialSpec.scala | 51 --- native/build.sbt | 8 + native/src/.gitignore | 12 + native/src/CMakeLists.txt | 46 +++ native/src/akka_serial_jni.c | 150 ++++++++ native/src/include/akka_serial.h | 103 +++++ native/src/include/akka_serial_sync_UnsafeSerial.h | 45 +++ .../src/include/akka_serial_sync_UnsafeSerial__.h | 29 ++ native/src/platform/posix/akka_serial.c | 263 +++++++++++++ native/src/platform/windows/README | 1 + native/src/platform/windows/akka_serial.c.disabled | 416 +++++++++++++++++++++ native/src/readme.md | 3 + project/Dependencies.scala | 8 +- samples/README.md | 3 + .../akka/serial/samples/terminalstream/Main.scala | 66 ++++ .../serial/samples/terminal/ConsoleReader.scala | 29 ++ .../scala/akka/serial/samples/terminal/Main.scala | 30 ++ .../akka/serial/samples/terminal/Terminal.scala | 75 ++++ .../scala/akka/serial/samples/watcher/main.scala | 50 +++ stream/build.sbt | 5 + .../src/main/scala/akka/serial/stream/Serial.scala | 68 ++++ .../akka/serial/stream/StreamSerialException.scala | 5 + .../serial/stream/StreamWatcherException.scala | 4 + .../serial/stream/impl/SerialConnectionLogic.scala | 172 +++++++++ .../serial/stream/impl/SerialConnectionStage.scala | 49 +++ .../akka/serial/stream/impl/WatcherLogic.scala | 65 ++++ .../akka/serial/stream/impl/WatcherStage.scala | 38 ++ stream/src/test/resources/application.conf | 3 + .../test/scala/akka/serial/stream/SerialSpec.scala | 51 +++ sync/build.sbt | 5 + sync/src/main/scala/akka/serial/Parity.scala | 9 + .../main/scala/akka/serial/SerialSettings.scala | 10 + sync/src/main/scala/akka/serial/exceptions.scala | 19 + .../scala/akka/serial/sync/SerialConnection.scala | 142 +++++++ .../main/scala/akka/serial/sync/UnsafeSerial.scala | 109 ++++++ .../test/scala/akka/serial/PseudoTerminal.scala | 43 +++ .../akka/serial/sync/SerialConnectionSpec.scala | 101 +++++ 95 files changed, 2821 insertions(+), 2685 deletions(-) create mode 100644 core/build.sbt create mode 100644 core/src/main/scala/akka/serial/Serial.scala create mode 100644 core/src/main/scala/akka/serial/SerialExt.scala create mode 100644 core/src/main/scala/akka/serial/SerialManager.scala create mode 100644 core/src/main/scala/akka/serial/SerialOperator.scala create mode 100644 core/src/main/scala/akka/serial/Watcher.scala create mode 100644 core/src/test/resources/reference.conf create mode 100644 core/src/test/scala/akka/serial/SerialManagerSpec.scala create mode 100644 core/src/test/scala/akka/serial/SerialOperatorSpec.scala delete mode 100644 flow-core/build.sbt delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Parity.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Serial.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala delete mode 100644 flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala delete mode 100644 flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala delete mode 100644 flow-native/build.sbt delete mode 100755 flow-native/lib_native/armv7l-linux/libflow4.so delete mode 100644 flow-native/lib_native/i686-linux/libflow4.so delete mode 100644 flow-native/lib_native/x86_64-darwin/libflow4.dylib delete mode 100755 flow-native/lib_native/x86_64-linux/libflow4.so delete mode 100644 flow-native/src/.gitignore delete mode 100644 flow-native/src/CMakeLists.txt delete mode 100644 flow-native/src/flow_jni.c delete mode 100644 flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h delete mode 100644 flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h delete mode 100644 flow-native/src/include/flow.h delete mode 100644 flow-native/src/platform/posix/flow.c delete mode 100644 flow-native/src/platform/windows/README delete mode 100644 flow-native/src/platform/windows/flow.c.disabled delete mode 100644 flow-native/src/readme.md delete mode 100644 flow-samples/README.md delete mode 100644 flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala delete mode 100644 flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala delete mode 100644 flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala delete mode 100644 flow-stream/build.sbt delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala delete mode 100644 flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala delete mode 100644 flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala create mode 100644 native/build.sbt create mode 100644 native/src/.gitignore create mode 100644 native/src/CMakeLists.txt create mode 100644 native/src/akka_serial_jni.c create mode 100644 native/src/include/akka_serial.h create mode 100644 native/src/include/akka_serial_sync_UnsafeSerial.h create mode 100644 native/src/include/akka_serial_sync_UnsafeSerial__.h create mode 100644 native/src/platform/posix/akka_serial.c create mode 100644 native/src/platform/windows/README create mode 100644 native/src/platform/windows/akka_serial.c.disabled create mode 100644 native/src/readme.md create mode 100644 samples/README.md create mode 100644 samples/terminal-stream/src/main/scala/akka/serial/samples/terminalstream/Main.scala create mode 100644 samples/terminal/src/main/scala/akka/serial/samples/terminal/ConsoleReader.scala create mode 100644 samples/terminal/src/main/scala/akka/serial/samples/terminal/Main.scala create mode 100644 samples/terminal/src/main/scala/akka/serial/samples/terminal/Terminal.scala create mode 100644 samples/watcher/src/main/scala/akka/serial/samples/watcher/main.scala create mode 100644 stream/build.sbt create mode 100644 stream/src/main/scala/akka/serial/stream/Serial.scala create mode 100644 stream/src/main/scala/akka/serial/stream/StreamSerialException.scala create mode 100644 stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala create mode 100644 stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala create mode 100644 stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala create mode 100644 stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala create mode 100644 stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala create mode 100644 stream/src/test/resources/application.conf create mode 100644 stream/src/test/scala/akka/serial/stream/SerialSpec.scala create mode 100644 sync/build.sbt create mode 100644 sync/src/main/scala/akka/serial/Parity.scala create mode 100644 sync/src/main/scala/akka/serial/SerialSettings.scala create mode 100644 sync/src/main/scala/akka/serial/exceptions.scala create mode 100644 sync/src/main/scala/akka/serial/sync/SerialConnection.scala create mode 100644 sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala create mode 100644 sync/src/test/scala/akka/serial/PseudoTerminal.scala create mode 100644 sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala diff --git a/.travis.yml b/.travis.yml index a8d4609..4232fd1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,6 +20,6 @@ cache: - "$HOME/.sbt/boot/" before_cache: - - find $HOME/.ivy2/cache/ch.jodersky -depth -name "flow*" -exec rm -r {} \; + - find $HOME/.ivy2/cache/ch.jodersky -depth -name "akka-serial*" -exec rm -r {} \; - find $HOME/.ivy2 -name "ivydata-*.properties" -delete - find $HOME/.sbt -name "*.lock" -delete diff --git a/CHANGELOG.md b/CHANGELOG.md index 90d5e47..ee0f076 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +# (unreleased) +- Rename project to akka-serial +- Move non-akka-dependent serial code into separate project, "akka-serial-sync" + # Version 3.0.4 - Implement unit tests - Upgrade Akka to 2.4.14 diff --git a/Documentation/developer.md b/Documentation/developer.md index 62f64d4..e7c581f 100644 --- a/Documentation/developer.md +++ b/Documentation/developer.md @@ -7,7 +7,7 @@ title: Developer Guide {:toc} # Building from Source -A complete build of flow involves two parts +A complete build of akka-serial involves two parts 1. Building Scala sources (the front-end), resulting in a platform independent artifact (i.e. a jar file). @@ -19,7 +19,7 @@ Both steps are independent, their only interaction being a header file generated Run `sbt core/package` in the base directory. This simply compiles Scala sources as with any standard sbt project and packages the resulting class files into a jar. ## Building Native Sources -The back-end is managed by CMake and all relevant files are contained in `flow-native/src`. +The back-end is managed by CMake and all relevant files are contained in `native/src`. ### Build Process Several steps are involved in producing the native library: @@ -45,9 +45,9 @@ Several steps are involved in producing the native library: - put into a "fat" jar, useful for dependency management with sbt (see next section) ### Creating a Fat Jar -The native library produced in the previous step may be bundled into a "fat" jar so that it can be included in SBT projects through its regular dependency mechanisms. In this process, sbt basically acts as a wrapper script around CMake, calling the native build process and packaging generated libraries. Running `sbt native/package` produces the fat jar in `flow-native/target`. +The native library produced in the previous step may be bundled into a "fat" jar so that it can be included in sbt projects through its regular dependency mechanisms. In this process, sbt basically acts as a wrapper script around CMake, calling the native build process and packaging generated libraries. Running `sbt native/package` produces the fat jar in `native/target`. -Note: an important feature of fat jars is to include native libraries for several platforms. To copy binaries compiled on other platforms to the fat jar, place them in a subfolder of `flow-native/lib_native`. The subfolder should have the name `$(arch)-$(kernel)`, where `arch` and `kernel` are, respectively, the lower-case values returned by `uname -m` and `uname -s`. +Note: an important feature of fat jars is to include native libraries for several platforms. To copy binaries compiled on other platforms to the fat jar, place them in a subfolder of `native/lib_native`. The subfolder should have the name `$(arch)-$(kernel)`, where `arch` and `kernel` are, respectively, the lower-case values returned by `uname -m` and `uname -s`. ### Note About Versioning The project and package versions follow a [semantic](http://semver.org/) pattern: `M.m.p`, where @@ -58,11 +58,11 @@ The project and package versions follow a [semantic](http://semver.org/) pattern - `p` is the patch number, representing internal modifications such as bug-fixes -Usually (following most Linux distribution's conventions), shared libraries produced by a project `name` of version `M.m.p` are named `libname.so.M.m.p`. However, since when accessing shared libraries through the JVM, only the `name` can be specified and no particular version, the convention adopted by flow is to append `M` to the library name and always keep the major version at zero. E.g. `libflow.so.3.1.2` becomes `libflow3.so.0.1.2`. +Usually (following most Linux distribution's conventions), shared libraries produced by a project `name` of version `M.m.p` are named `libname.so.M.m.p`. However, since when accessing shared libraries through the JVM, only the `name` can be specified and no particular version, the convention adopted by akka-serial is to append `M` to the library name and always keep the major version at zero. E.g. `libakkaserial.so.3.1.2` becomes `libakakserial3.so.0.1.2`. # Testing -`flow-samples` directory contains fully functional application examples of flow. To run an example, change to the base directory of flow and run sbt samples/run. -All projects, including samples, can be listed by running sbt projects. +The `samples` directory contains fully functional application examples of akka-serial. To run an example, change to the base directory of akka-serial and run sbt samples/run. +All projects, including samples, can be listed by running `sbt projects`. To be able connect you can use real device (arduino) burned with sample-echo (`dev/arduino-terminal`) code, or create Virtual Serial Port pair @@ -101,11 +101,11 @@ socat -d -d pty,raw,echo=0 "exec:/bin/cat,pty,raw,echo=0" ``` # Publishing and Releasing -Releases are handled automatically by the continuous integration and deployment system (Travis CI). A release will be performed for every annotated Git tag that is pushed to the main repository. +Releases are handled automatically by the continuous integration and deployment system, Travis CI. A release will be performed for every annotated Git tag that is pushed to the main repository. Here are a couple of observations on the release process: -- During a release, only readily available libraries in `lib_native` are packaged into the fat jar, no local native compilation is performed. The rationale behind this is that while native libraries rarely change, they are still tied to the version of libc of the compiling system. Since the releases are mostly done on a development machine (cutting-edge OS), compiling native libraries locally could break compatibility with older systems. +- During a release, only readily available libraries in `native/lib_native` are packaged into the fat jar, no local native compilation is performed. The rationale behind this is that while native libraries rarely change, they are still tied to the version of libc of the compiling system. Since the releases are mostly done on a development machine with a cutting-edge OS, compiling native libraries locally could break compatibility with older systems. - The website is not automatically updated. After creating a new release: diff --git a/Documentation/manual.md b/Documentation/manual.md index d1bd335..f51689f 100644 --- a/Documentation/manual.md +++ b/Documentation/manual.md @@ -7,13 +7,13 @@ title: User Guide {:toc} # Getting Started -Flow uses sbt as build system. To get started, include a dependency to flow in your project: +akka-serial uses sbt as build system. To get started, include a dependency to akka-serial in your project: ~~~scala -libraryDependencies += "ch.jodersky" %% "flow-core" % "@version@" +libraryDependencies += "ch.jodersky" %% "akka-serial-core" % "@version@" ~~~ -Next, you need to include flow's native library that supports communication for serial devices. +Next, you need to include akka-serial's native library that supports communication for serial devices. ## Including Native Library There are two options to include the native library: @@ -28,19 +28,19 @@ It is recommended that you use the first option for testing purposes or end-user In case your kernel/architecture combination is present in the "supported platforms" table in the [downloads section]({{site.url}}/downloads/), add a second dependency to your project: ~~~scala -libraryDependencies += "ch.jodersky" % "flow-native" % "@version@" % "runtime" +libraryDependencies += "ch.jodersky" % "akka-serial-native" % "@version@" % "runtime" ~~~ This will add a jar to your classpath containing native libraries for various platforms. At run-time, the correct library for the current platform is selected, extracted and loaded. This solution enables running applications seamlessly, as if they were pure JVM applications. ### Maximum Portability -Start by obtaining a copy of the native library, either by [building flow](./developer) or by [downloading]({{site.url}}/downloads/) a native archive. In order to work with this version of flow, native libraries need to be of major version @native_major@ and minor version greater or equal to @native_minor@. +Start by obtaining a copy of the native library, either by [building akka-serial](./developer) or by [downloading]({{site.url}}/downloads/) a native archive. In order to work with this version of akka-serial, native libraries need to be of major version @native_major@ and minor version greater or equal to @native_minor@. -Then, for every end-user application that relies on flow, manually add the native library for the current platform to the JVM's library path. This can be achieved through various ways, notably: +Then, for every end-user application that relies on akka-serial, manually add the native library for the current platform to the JVM's library path. This can be achieved through various ways, notably: - Per application: - Run your program with the command-line option ```-Djava.library.path=".:"```. E.g. ```java -Djava.library.path=".:/home/" -jar your-app.jar``` + Run your program with the command-line option ```-Djava.library.path=".:"```. E.g. ```java -Djava.library.path=".:/home/" -jar your-app.jar``` - System- or user-wide: @@ -49,15 +49,15 @@ Then, for every end-user application that relies on flow, manually add the nativ --- # Communication Protocol -The following is a general guide on the usage of flow. If you prefer a complete example, check out the code contained in the [flow-samples](https://github.com/jodersky/flow/tree/v@version@/flow-samples) directory. +The following is a general guide on the usage of akak-serial. If you prefer a complete example, check out the code contained in the [samples](https://github.com/jodersky/akka-serial/tree/v@version@/samples) directory. -Flow's API follows that of an actor based system, where each actor is assigned specific functions involved in serial communication. The two main actor types are: +akka-serial's API follows that of an actor based system, where each actor is assigned specific functions involved in serial communication. The two main actor types are: 1. Serial "manager". The manager is a singleton actor that is instantiated once per actor system, a reference to it may be obtained with `IO(Serial)`. It is typically used to open serial ports (see following section). 2. Serial "operators". Operators are created once per open serial port and serve as an intermediate between client code and native code dealing with serial data transmission and reception. They isolate the user from threading issues and enable the reactive dispatch of incoming data. A serial operator is said to be "associated" to its underlying open serial port. -The messages understood by flow's actors are all contained in the `ch.jodersky.flow.Serial` object. They are well documented and should serve as the entry point when searching the API documentation. +The messages understood by akka-serial's actors are all contained in the `akka.serial.Serial` object. They are well documented and should serve as the entry point when searching the API documentation. ## Opening a Port A serial port is opened by sending an `Open` message to the serial manager. The response varies on the outcome of opening the underlying serial port. @@ -67,7 +67,7 @@ A serial port is opened by sending an `Open` message to the serial manager. The 2. In case of success, the sender is notified with an `Opened` message. This message is sent from an operator actor, spawned by the serial manager. It is useful to capture the sender (i.e. the operator) of this message as all further communication with the newly opened port must pass through the operator. ~~~scala -import ch.jodersky.flow.{ Serial, SerialSettings, AccessDeniedException } +import akka.serial.{ Serial, SerialSettings, AccessDeniedException } val port = "/dev/ttyXXX" val settings = SerialSettings( @@ -138,7 +138,7 @@ The opposite is not true by default, i.e. if the operator crashes (this can happ --- # Watching Ports -As of version 2.2.0, flow can watch directories for new files. On most unix systems this can be used for watching for new serial ports in `/dev/`. +akka-serial can watch directories for new files. On most unix systems this can be used for watching for new serial ports in `/dev/`. Watching happens through a message-based, publish-subscribe protocol as explained in the sections below. ## Subscribing @@ -183,13 +183,13 @@ Note that the manager has a deathwatch on every subscribed client. Hence, should --- # Stream Support -Flow provides support for Akka streams and thus can be interfaced with reactive-streams. Support is implemented in a separate module, which needs to be added as a library dependency: +akka-serial provides support for Akka streams and thus can be interfaced with reactive-streams. Support is implemented in a separate module, which needs to be added as a library dependency: ~~~scala -libraryDependencies += "ch.jodersky" %% "flow-stream" % "@version@" +libraryDependencies += "ch.jodersky" %% "akka-serial-stream" % "@version@" ~~~ -The main entry point for serial streaming is `ch.jodersky.flow.stream.Serial`. It's API is also well documented and should serve as the starting point when searching documentation on serial streaming. +The main entry point for serial streaming is `akka.serial.stream.Serial`. Its API is also well documented and should serve as the starting point when searching documentation on serial streaming. ## Opening a Port Connection is established by materializing a `Flow[ByteString, ByteString, Future[Connection]]` obtained by calling `Serial().open()` @@ -217,6 +217,6 @@ Note that backpressure is only available for writing, to add backpressure on the The underlying serial port is closed when its materialized serial flow is closed. ## Errors and Resource Handling -Any errors described in flow-core can also be encountered in flow-streaming. When thrown, they will be wrapped as the cause of a `StreamSerialException` and cause the the serial `Flow` stage to fail. +Any errors described in akka-serial-core can also be encountered in akka-serial-stream. When thrown, they will be wrapped as the cause of a `StreamSerialException` and cause the the serial `Flow` stage to fail. -As with flow-core, native resources are handled by underlying Akka mechanisms and any crashes in user code will automatically case the resources to be freed. +As with akka-serial-core, native resources are handled by underlying Akka mechanisms and any crashes in user code will automatically case the resources to be freed. diff --git a/README.md b/README.md index 9a53ee8..cf37fd8 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ -[![Build Status](https://travis-ci.org/jodersky/flow.svg?branch=master)](https://travis-ci.org/jodersky/flow) -[![Download](https://img.shields.io/maven-central/v/ch.jodersky/flow-native.svg?maxAge=2592000)](http://search.maven.org/#search|ga|1|ch.jodersky%20flow-) +[![Build Status](https://travis-ci.org/jodersky/akka-serial.svg?branch=master)](https://travis-ci.org/jodersky/akka-serial) +[![Download](https://img.shields.io/maven-central/v/ch.jodersky/akka-serial-native.svg?maxAge=2592000)](http://search.maven.org/#search|ga|1|ch.jodersky%20akka-serial-) -# flow -Serial communication library for Scala, designed to be reactive, lightweight and easily integrable with Akka applications. See the [website](https://jodersky.github.io/flow) for a guide. +# akka-serial +Serial communication library for Scala, designed to be reactive, lightweight and easily integrable with Akka applications. See the [website](https://jodersky.github.io/akka-serial) for a guide. ## Highlights - Reactive: only does work when required (no constant polling of ports or blocking IO) @@ -16,25 +16,29 @@ Since hardware is involved in serial communication, a Scala-only solution is not ## Directory Structure ``` -flow/ +akka-serial/ ├── Documentation Sources for user documentation as published on the website. +├── core Main Scala source files. ├── dev Firmware samples for serial devices, to make testing easier. -├── flow-core Main Scala source files. -├── flow-native C sources used to implement serial communication. -│ └── lib_native Compiled native libraries that are published in flow-native. -├── flow-samples Runnable example projects. -├── flow-stream Stream API, used to connect with Akka streams. -└── project Build configuration. +├── native C sources used to implement serial communication. +│ └── lib_native Compiled native libraries that are published in the fat jar. +├── project Build configuration. +├── samples Runnable example projects. +├── stream Stream API, used to connect with Akka streams. +└── sync Synchronous, non-Akka-dependent code. ``` *Website source code is in the git branch 'gh-pages'.* ## Build -Detailed documentation on building flow is available on the website (or, equivalently, in [developer.md](Documentation/developer.md)). +Detailed documentation on building akka-serial is available on the website (or, equivalently, in [developer.md](Documentation/developer.md)). -Since flow integrates into the Akka-IO framework, a good resource on its general design is the framework's [documentation](http://doc.akka.io/docs/akka/current/scala/io.html). +Since akka-serial integrates into the Akka-IO framework, a good resource on its general design is the framework's [documentation](http://doc.akka.io/docs/akka/current/scala/io.html). This project is also an experiment on working with JNI and automating build infrastructure. ## Copying -flow is released under the terms of the 3-clause BSD license. See LICENSE for details. +akka-serial is released under the terms of the 3-clause BSD license. See LICENSE for details. + +## Notes +This project used to called "flow". It was renamed to "akka-serial". diff --git a/build.sbt b/build.sbt index 50d9f8a..e71b9f6 100644 --- a/build.sbt +++ b/build.sbt @@ -1,6 +1,6 @@ // Build settings version in ThisBuild := ("git describe --always --dirty=-SNAPSHOT --match v[0-9].*" !!).tail.trim -crossScalaVersions in ThisBuild := List("2.11.8", "2.12.0") +crossScalaVersions in ThisBuild := List("2.12.1", "2.11.8") scalaVersion in ThisBuild := crossScalaVersions.value.head scalacOptions in ThisBuild ++= Seq( "-deprecation", @@ -9,11 +9,13 @@ scalacOptions in ThisBuild ++= Seq( "-target:jvm-1.8" ) fork in ThisBuild := true +connectInput in run in ThisBuild := true +outputStrategy in run in ThisBuild := Some(StdoutOutput) // Publishing organization in ThisBuild := "ch.jodersky" licenses in ThisBuild := Seq(("BSD New", url("http://opensource.org/licenses/BSD-3-Clause"))) -homepage in ThisBuild := Some(url("https://jodersky.github.io/flow")) +homepage in ThisBuild := Some(url("https://jodersky.github.io/akka-serial")) publishMavenStyle in ThisBuild := true publishTo in ThisBuild := { val nexus = "https://oss.sonatype.org/" @@ -21,8 +23,8 @@ publishTo in ThisBuild := { } pomExtra in ThisBuild := { - git@github.com:jodersky/flow.git - scm:git:git@github.com:jodersky/flow.git + git@github.com:jodersky/akka-serial.git + scm:git:git@github.com:jodersky/akka-serial.git @@ -34,26 +36,30 @@ pomExtra in ThisBuild := { // Project structure lazy val root = (project in file(".")) - .aggregate(core, native, stream) + .aggregate(core, native, stream, sync) -lazy val core = (project in file("flow-core")) - .settings(name := "flow-core") - .dependsOn(native % "test->runtime") +lazy val core = (project in file("core")) + .settings(name := "akka-serial-core") + .dependsOn(sync, sync % "test->test") + +lazy val native = (project in file("native")) + .settings(name := "akka-serial-native") -lazy val native = (project in file("flow-native")) - .settings(name := "flow-native") +lazy val stream = (project in file("stream")) + .settings(name := "akka-serial-stream") + .dependsOn(core, sync % "test->test") -lazy val stream = (project in file("flow-stream")) - .settings(name := "flow-stream") - .dependsOn(core, core % "test->test", native % "test->runtime") +lazy val sync = (project in file("sync")) + .settings(name := "akka-serial-sync") + .dependsOn(native % "test->runtime") -lazy val samplesTerminal = (project in file("flow-samples") / "terminal") +lazy val samplesTerminal = (project in file("samples") / "terminal") .dependsOn(core, native % Runtime) -lazy val samplesTerminalStream = (project in file("flow-samples") / "terminal-stream") +lazy val samplesTerminalStream = (project in file("samples") / "terminal-stream") .dependsOn(stream, native % Runtime) -lazy val samplesWatcher = (project in file("flow-samples") / "watcher") +lazy val samplesWatcher = (project in file("samples") / "watcher") .dependsOn(core, native % Runtime) // Root project settings @@ -68,13 +74,16 @@ enablePlugins(PreprocessPlugin) sourceDirectory in Preprocess := (baseDirectory in ThisBuild).value / "Documentation" preprocessVars in Preprocess := Map( "version" -> version.value, - "native_major" -> "4", + "native_major" -> "1", "native_minor" -> "0" ) // Add scaladoc to documentation enablePlugins(SiteScaladocPlugin) +import UnidocKeys._ unidocSettings +unidocProjectFilter in (ScalaUnidoc, unidoc) := inAnyProject -- inProjects( + samplesTerminal, samplesTerminalStream, samplesWatcher) scalacOptions in (ScalaUnidoc, doc) ++= Seq( "-groups", // Group similar methods together based on the @group annotation. "-diagrams", // Show classs hierarchy diagrams (requires 'dot' to be available on path) @@ -83,7 +92,7 @@ scalacOptions in (ScalaUnidoc, doc) ++= Seq( ) ++ { val latestTag: String = "git describe --abbrev=0 --match v[0-9].*".!! Opts.doc.sourceUrl( - s"https://github.com/jodersky/flow/blob/$latestTag€{FILE_PATH}.scala" + s"https://github.com/jodersky/akka-serial/blob/$latestTag€{FILE_PATH}.scala" ) } siteMappings ++= (mappings in (ScalaUnidoc, packageDoc)).value.map{ case (file, path) => diff --git a/core/build.sbt b/core/build.sbt new file mode 100644 index 0000000..78673ce --- /dev/null +++ b/core/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaTestKit % "test" +libraryDependencies += Dependencies.scalatest % "test" diff --git a/core/src/main/scala/akka/serial/Serial.scala b/core/src/main/scala/akka/serial/Serial.scala new file mode 100644 index 0000000..a8e564a --- /dev/null +++ b/core/src/main/scala/akka/serial/Serial.scala @@ -0,0 +1,132 @@ +package akka.serial + +import akka.actor.ExtensionKey +import akka.util.ByteString + +/** Defines messages used by akka-serial's serial IO layer. */ +object Serial extends ExtensionKey[SerialExt] { + + /** Base trait for any akka-serial-related messages. */ + sealed trait Message + + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to akka-serial's API. */ + trait Command extends Message + + /** A message extending this trait is to be viewed as an event, an in-bound message issued by akka-serial to the client. */ + trait Event extends Message + + /** A command has failed. */ + case class CommandFailed(command: Command, reason: Throwable) extends Event + + /** + * Open a new serial port. + * + * Send this command to the serial manager to request the opening of a serial port. The manager will + * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. + * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. + * + * In case the port is successfully opened, the operator will respond with an `Opened` message. + * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. + * + * @param port name of serial port to open + * @param settings settings of serial port to open + * @param bufferSize maximum read and write buffer sizes + */ + case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command + + /** + * A port has been successfully opened. + * + * Event sent by a port operator, indicating that a serial port was successfully opened. The sender + * of this message is the operator associated to the given serial port. + * + * @param port name of opened serial port + */ + case class Opened(port: String) extends Event + + /** + * Data has been received. + * + * Event sent by an operator, indicating that data was received on the operator's serial port. + * + * @param data data received on the port + */ + case class Received(data: ByteString) extends Event + + /** + * Write data to a serial port. + * + * Send this command to an operator to write the given data to its associated serial port. + * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. + * Note that a successful write does not guarantee the actual transmission of data through the serial port, + * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to + * be transmitted. + * + * @param data data to be written to port + * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment + * is a function 'number of bytes written => event') + */ + case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command + + /** + * Special type of acknowledgment that is not sent back. + */ + case object NoAck extends Function1[Int, Event] { + def apply(length: Int) = sys.error("cannot apply NoAck") + } + + /** + * Request closing of port. + * + * Send this command to an operator to close its associated port. The operator will respond + * with a `Closed` message upon closing the serial port. + */ + case object Close extends Command + + /** + * A port has been closed. + * + * Event sent from operator, indicating that its port has been closed. + */ + case object Closed extends Event + + /** + * Watch a directory for new ports. + * + * Send this command to the manager to get notifications when a new port (i.e. file) is created in + * the given directory. + * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. + * + * Note: the sender is also notified of currently existing ports. + * + * @param directory the directory to watch + * @param skipInitial don't get notified of already existing ports + * + * @see Unwatch + * @see Connected + */ + case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command + + /** + * Stop receiving notifications about a previously watched directory. + * + * @param directory the directory to unwatch + */ + case class Unwatch(directory: String = "/dev") extends Command + + /** + * A new port (i.e. file) has been detected. + * + * @param port the absolute file name of the connected port + */ + case class Connected(port: String) extends Event + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + def debug(value: Boolean) = sync.UnsafeSerial.debug(value) + +} diff --git a/core/src/main/scala/akka/serial/SerialExt.scala b/core/src/main/scala/akka/serial/SerialExt.scala new file mode 100644 index 0000000..2fcec7f --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialExt.scala @@ -0,0 +1,9 @@ +package akka.serial + +import akka.actor.{ ExtendedActorSystem, Props } +import akka.io.IO + +/** Provides the serial IO manager. */ +class SerialExt(system: ExtendedActorSystem) extends IO.Extension { + lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") +} diff --git a/core/src/main/scala/akka/serial/SerialManager.scala b/core/src/main/scala/akka/serial/SerialManager.scala new file mode 100644 index 0000000..4833165 --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialManager.scala @@ -0,0 +1,48 @@ +package akka.serial + +import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } +import akka.actor.SupervisorStrategy.{ Escalate, Stop } +import scala.util.{ Failure, Success, Try } +import sync.SerialConnection + +/** + * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to + * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. + * @see SerialOperator + */ +private[serial] class SerialManager extends Actor { + import SerialManager._ + import context._ + + override val supervisorStrategy = OneForOneStrategy() { + case _: Exception if sender == watcher => Escalate + case _: Exception => Stop + } + + private val watcher = actorOf(Watcher(self), "watcher") + + def receive = { + + case open @ Serial.Open(port, settings, bufferSize) => Try { + SerialConnection.open(port, settings) + } match { + case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) + case Failure(err) => sender ! Serial.CommandFailed(open, err) + } + + case w: Serial.Watch => watcher.forward(w) + + case u: Serial.Unwatch => watcher.forward(u) + + } + +} + +private[serial] object SerialManager { + + private def escapePortString(port: String) = port map { + case '/' => '-' + case c => c + } + +} diff --git a/core/src/main/scala/akka/serial/SerialOperator.scala b/core/src/main/scala/akka/serial/SerialOperator.scala new file mode 100644 index 0000000..cb5b46d --- /dev/null +++ b/core/src/main/scala/akka/serial/SerialOperator.scala @@ -0,0 +1,84 @@ +package akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import akka.util.ByteString +import java.nio.ByteBuffer + +import sync.SerialConnection + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +private[serial] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { + import SerialOperator._ + import context._ + + case class ReaderDied(ex: Throwable) + object Reader extends Thread { + val buffer = ByteBuffer.allocateDirect(bufferSize) + + def loop() = { + var stop = false + while (!connection.isClosed && !stop) { + try { + buffer.clear() + connection.read(buffer) + val data = ByteString.fromByteBuffer(buffer) + client.tell(Serial.Received(data), self) + } catch { + // don't do anything if port is interrupted + case ex: PortInterruptedException => {} + + //stop and tell operator on other exception + case ex: Exception => + stop = true + self.tell(ReaderDied(ex), Actor.noSender) + } + } + } + + override def run() { + this.setName(s"serial-reader(${connection.port})") + loop() + } + + } + + val writeBuffer = ByteBuffer.allocateDirect(bufferSize) + + override def preStart() = { + context watch client + client ! Serial.Opened(connection.port) + Reader.start() + } + + override def receive: Receive = { + + case Serial.Write(data, ack) => + writeBuffer.clear() + data.copyToBuffer(writeBuffer) + val sent = connection.write(writeBuffer) + if (ack != Serial.NoAck) sender ! ack(sent) + + case Serial.Close => + client ! Serial.Closed + context stop self + + case Terminated(`client`) => + context stop self + + // go down with reader thread + case ReaderDied(ex) => throw ex + + } + + override def postStop() = { + connection.close() + } + +} + +private[serial] object SerialOperator { + def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) +} diff --git a/core/src/main/scala/akka/serial/Watcher.scala b/core/src/main/scala/akka/serial/Watcher.scala new file mode 100644 index 0000000..faaf39a --- /dev/null +++ b/core/src/main/scala/akka/serial/Watcher.scala @@ -0,0 +1,143 @@ +package akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } +import scala.util.{ Failure, Success, Try } + +private[serial] class Watcher(from: Option[ActorRef]) extends Actor { + + case class WatcherDied(reason: Throwable) + object WatcherThread extends Thread { + import Watcher.NewFile + + private val service = FileSystems.getDefault().newWatchService() + + def register(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + this.setName("serial-port-watcher") + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + self.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) + } + } + } + + def close() = service.close // causes the service to throw a ClosedWatchServiceException + } + + + // directory -> subscribers + private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] + + // directory -> watchkey + private val keys: Map[String, WatchKey] = Map.empty + + def subscribe(directory: String, client: ActorRef): WatchKey = { + val normal = Paths.get(directory).toAbsolutePath + val index = normal.toString + val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) + clients addBinding (index, client) + key + } + + def unsubscribe(directory: String, client: ActorRef): Unit = { + val index = Paths.get(directory).toAbsolutePath.toString + + clients removeBinding (index, sender) + + if (clients.get(index).isEmpty && keys.get(index).isDefined) { + keys(index).cancel() + keys -= index + } + } + + def reply(msg: Any, sender: ActorRef) = { + val origin = from match { + case Some(ref) => ref + case None => self + } + sender.tell(msg, origin) + } + + override def preStart() = { + WatcherThread.setDaemon(true) + WatcherThread.start() + } + + override def receive = { + + case w @ Serial.Watch(directory, skipInitial) => + val normalPath = Paths.get(directory).toAbsolutePath + val normal = normalPath.toString + + Try { + subscribe(directory, sender) + } match { + case Failure(err) => reply(Serial.CommandFailed(w, err), sender) + case Success(key) => + context watch sender + if (!skipInitial) { + Files.newDirectoryStream(normalPath) foreach { path => + if (!Files.isDirectory(path)) { + reply(Serial.Connected(path.toString), sender) + } + } + } + } + + case u @ Serial.Unwatch(directory) => + val normal = Paths.get(directory).toAbsolutePath.toString + + clients.removeBinding(normal, sender) + + if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { + keys(normal).cancel() + keys -= normal + } + + case Terminated(client) => + for ((directory, c) <- clients if c == client) { + unsubscribe(directory, client) + } + + case Watcher.NewFile(directory, file) => + val normal = directory.toAbsolutePath + val absFile = normal resolve file + clients.getOrElse(normal.toString, Set.empty) foreach { client => + reply(Serial.Connected(absFile.toString), client) + } + + case WatcherDied(err) => throw err // go down with watcher thread + + } + + override def postStop() = { + WatcherThread.close() + } + +} + +private[serial] object Watcher { + private case class NewFile(directory: Path, file: Path) + + def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) + +} diff --git a/core/src/test/resources/reference.conf b/core/src/test/resources/reference.conf new file mode 100644 index 0000000..bdf80e2 --- /dev/null +++ b/core/src/test/resources/reference.conf @@ -0,0 +1,3 @@ +# Don't fill test output with log messages +akka.stdout-loglevel = "OFF" +akka.loglevel = "OFF" \ No newline at end of file diff --git a/core/src/test/scala/akka/serial/SerialManagerSpec.scala b/core/src/test/scala/akka/serial/SerialManagerSpec.scala new file mode 100644 index 0000000..eab5013 --- /dev/null +++ b/core/src/test/scala/akka/serial/SerialManagerSpec.scala @@ -0,0 +1,40 @@ +package akka.serial + +import akka.actor.ActorSystem +import akka.io.IO +import akka.testkit.{ImplicitSender, TestKit} +import org.scalatest._ + +class SerialManagerSpec + extends TestKit(ActorSystem("serial-manager")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + "Serial manager" should { + val manager = IO(Serial) + + "open an existing port" in { + withEcho{ case (port, settings) => + manager ! Serial.Open(port, settings) + expectMsgType[Serial.Opened] + } + } + + "fail opening a non-existing port" in { + val cmd = Serial.Open("nonexistent", SerialSettings(115200)) + manager ! cmd + + //manager.context.set + assert(expectMsgType[Serial.CommandFailed].command == cmd) + } + + } + +} diff --git a/core/src/test/scala/akka/serial/SerialOperatorSpec.scala b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala new file mode 100644 index 0000000..6907494 --- /dev/null +++ b/core/src/test/scala/akka/serial/SerialOperatorSpec.scala @@ -0,0 +1,50 @@ +package akka.serial + +import scala.concurrent.duration._ + +import akka.actor.{ActorRef, ActorSystem} +import akka.testkit.{ImplicitSender, TestKit} +import akka.util.ByteString +import org.scalatest._ +import sync._ + +case class Ack(n: Int) extends Serial.Event + +class SerialOperatorSpec + extends TestKit(ActorSystem("serial-operator")) + with ImplicitSender + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with SequentialNestedSuiteExecution + with PseudoTerminal { + + override def afterAll { + TestKit.shutdownActorSystem(system) + } + + def withEchoOp[A](action: ActorRef => A): A = { + withEcho { case (port, settings) => + val connection = SerialConnection.open(port, settings) + val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor)) + action(operator) + } + } + + "Serial operator" should { + + "follow the correct protocol" in withEchoOp { op => + expectMsgType[Serial.Opened] + + val data = ByteString("hello world".getBytes("utf-8")) + op ! Serial.Write(data) + expectMsg(Serial.Received(data)) + + op ! Serial.Close + expectMsg(Serial.Closed) + + } + + } + +} diff --git a/flow-core/build.sbt b/flow-core/build.sbt deleted file mode 100644 index fdfcbab..0000000 --- a/flow-core/build.sbt +++ /dev/null @@ -1,7 +0,0 @@ -import flow.Dependencies - -libraryDependencies += Dependencies.akkaActor -libraryDependencies += Dependencies.akkaTestKit % "test" -libraryDependencies += Dependencies.scalatest % "test" - -target in javah := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include" diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala deleted file mode 100644 index 30596d2..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala +++ /dev/null @@ -1,9 +0,0 @@ -package ch.jodersky.flow - -/** Specifies available parities used in serial communication. */ -object Parity extends Enumeration { - type Parity = Value - val None = Value(0) - val Odd = Value(1) - val Even = Value(2) -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala deleted file mode 100644 index 43b1d19..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala +++ /dev/null @@ -1,132 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.ExtensionKey -import akka.util.ByteString - -/** Defines messages used by flow's serial IO layer. */ -object Serial extends ExtensionKey[SerialExt] { - - /** Base trait for any flow-related messages. */ - sealed trait Message - - /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ - trait Command extends Message - - /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */ - trait Event extends Message - - /** A command has failed. */ - case class CommandFailed(command: Command, reason: Throwable) extends Event - - /** - * Open a new serial port. - * - * Send this command to the serial manager to request the opening of a serial port. The manager will - * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port. - * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages. - * - * In case the port is successfully opened, the operator will respond with an `Opened` message. - * In case the port cannot be opened, the manager will respond with a `CommandFailed` message. - * - * @param port name of serial port to open - * @param settings settings of serial port to open - * @param bufferSize maximum read and write buffer sizes - */ - case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command - - /** - * A port has been successfully opened. - * - * Event sent by a port operator, indicating that a serial port was successfully opened. The sender - * of this message is the operator associated to the given serial port. - * - * @param port name of opened serial port - */ - case class Opened(port: String) extends Event - - /** - * Data has been received. - * - * Event sent by an operator, indicating that data was received on the operator's serial port. - * - * @param data data received on the port - */ - case class Received(data: ByteString) extends Event - - /** - * Write data to a serial port. - * - * Send this command to an operator to write the given data to its associated serial port. - * An acknowledgment may be set, in which case it is sent back to the sender on a successful write. - * Note that a successful write does not guarantee the actual transmission of data through the serial port, - * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to - * be transmitted. - * - * @param data data to be written to port - * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment - * is a function 'number of bytes written => event') - */ - case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command - - /** - * Special type of acknowledgment that is not sent back. - */ - case object NoAck extends Function1[Int, Event] { - def apply(length: Int) = sys.error("cannot apply NoAck") - } - - /** - * Request closing of port. - * - * Send this command to an operator to close its associated port. The operator will respond - * with a `Closed` message upon closing the serial port. - */ - case object Close extends Command - - /** - * A port has been closed. - * - * Event sent from operator, indicating that its port has been closed. - */ - case object Closed extends Event - - /** - * Watch a directory for new ports. - * - * Send this command to the manager to get notifications when a new port (i.e. file) is created in - * the given directory. - * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. - * - * Note: the sender is also notified of currently existing ports. - * - * @param directory the directory to watch - * @param skipInitial don't get notified of already existing ports - * - * @see Unwatch - * @see Connected - */ - case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command - - /** - * Stop receiving notifications about a previously watched directory. - * - * @param directory the directory to unwatch - */ - case class Unwatch(directory: String = "/dev") extends Command - - /** - * A new port (i.e. file) has been detected. - * - * @param port the absolute file name of the connected port - */ - case class Connected(port: String) extends Event - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - def debug(value: Boolean) = UnsafeSerial.debug(value) - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala deleted file mode 100644 index 1cd1046..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala +++ /dev/null @@ -1,140 +0,0 @@ -package ch.jodersky.flow - -import java.nio.ByteBuffer -import java.util.concurrent.atomic.AtomicBoolean - -/** - * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In - * contrast to the latter, this class encapsulates and secures any pointers used to communicate with - * the native backend and is thread-safe. - * - * The underlying serial port is assumed open when this class is initialized. - */ -private[flow] class SerialConnection private ( - unsafe: UnsafeSerial, - val port: String -) { - - private var reading: Boolean = false - private val readLock = new Object - - private var writing: Boolean = false - private val writeLock = new Object - - private val closed = new AtomicBoolean(false) - - /** - * Checks if this serial port is closed. - */ - def isClosed = closed.get() - - /** - * Closes the underlying serial connection. Any callers blocked on read or write will return. - * A call of this method has no effect if the serial port is already closed. - * @throws IOException on IO error - */ - def close(): Unit = this.synchronized { - if (!closed.get) { - closed.set(true) - unsafe.cancelRead() - readLock.synchronized { - while (reading) this.wait() - } - writeLock.synchronized { - while (writing) this.wait() - } - unsafe.close() - } - } - - /** - * Reads data from underlying serial connection into a ByteBuffer. - * Note that data is read into the buffer's memory, its attributes - * such as position and limit are not modified. - * - * A call to this method is blocking, however it is interrupted - * if the connection is closed. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer into which data is read - * @return the actual number of bytes read - * @throws PortInterruptedException if port is closed while reading - * @throws IOException on IO error - */ - def read(buffer: ByteBuffer): Int = readLock.synchronized { - if (!closed.get) { - try { - reading = true - unsafe.read(buffer) - } finally { - reading = false - if (closed.get) readLock.notify() - } - } else { - throw new PortClosedException(s"${port} is closed") - } - } - - /** - * Writes data from a ByteBuffer to underlying serial connection. - * Note that data is read from the buffer's memory, its attributes - * such as position and limit are not modified. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * This method works for direct and indirect buffers but is optimized - * for the former. - * - * @param buffer a ByteBuffer from which data is taken - * @return the actual number of bytes written - * @throws IOException on IO error - */ - def write(buffer: ByteBuffer): Int = writeLock.synchronized { - if (!closed.get) { - try { - writing = true - unsafe.write(buffer, buffer.position) - } finally { - writing = false - if (closed.get) writeLock.notify() - } - } else { - throw new PortClosedException(s"${port} is closed") - } - } - -} - -private[flow] object SerialConnection { - - /** - * Opens a new connection to a serial port. - * This method acts as a factory to creating serial connections. - * - * @param port name of serial port to open - * @param settings settings with which to initialize the connection - * @return an instance of the open serial connection - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - def open( - port: String, - settings: SerialSettings - ): SerialConnection = synchronized { - val pointer = UnsafeSerial.open( - port, - settings.baud, - settings.characterSize, - settings.twoStopBits, - settings.parity.id - ) - new SerialConnection(new UnsafeSerial(pointer), port) - } - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala deleted file mode 100644 index 4ed3e2e..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala +++ /dev/null @@ -1,9 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ ExtendedActorSystem, Props } -import akka.io.IO - -/** Provides the serial IO manager. */ -class SerialExt(system: ExtendedActorSystem) extends IO.Extension { - lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL") -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala deleted file mode 100644 index 7967087..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala +++ /dev/null @@ -1,47 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } -import akka.actor.SupervisorStrategy.{ Escalate, Stop } -import scala.util.{ Failure, Success, Try } - -/** - * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to - * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. - * @see SerialOperator - */ -private[flow] class SerialManager extends Actor { - import SerialManager._ - import context._ - - override val supervisorStrategy = OneForOneStrategy() { - case _: Exception if sender == watcher => Escalate - case _: Exception => Stop - } - - private val watcher = actorOf(Watcher(self), "watcher") - - def receive = { - - case open @ Serial.Open(port, settings, bufferSize) => Try { - SerialConnection.open(port, settings) - } match { - case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) - case Failure(err) => sender ! Serial.CommandFailed(open, err) - } - - case w: Serial.Watch => watcher.forward(w) - - case u: Serial.Unwatch => watcher.forward(u) - - } - -} - -private[flow] object SerialManager { - - private def escapePortString(port: String) = port map { - case '/' => '-' - case c => c - } - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala deleted file mode 100644 index d5c131c..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala +++ /dev/null @@ -1,83 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } -import akka.util.ByteString -import java.nio.ByteBuffer - -/** - * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. - * @see SerialManager - */ -private[flow] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { - import SerialOperator._ - import context._ - - case class ReaderDied(ex: Throwable) - object Reader extends Thread { - val buffer = ByteBuffer.allocateDirect(bufferSize) - - def loop() = { - var stop = false - while (!connection.isClosed && !stop) { - try { - buffer.clear() - val length = connection.read(buffer) - buffer.limit(length) - val data = ByteString.fromByteBuffer(buffer) - client.tell(Serial.Received(data), self) - } catch { - // don't do anything if port is interrupted - case ex: PortInterruptedException => {} - - //stop and tell operator on other exception - case ex: Exception => - stop = true - self.tell(ReaderDied(ex), Actor.noSender) - } - } - } - - override def run() { - this.setName(s"serial-reader(${connection.port})") - loop() - } - - } - - val writeBuffer = ByteBuffer.allocateDirect(bufferSize) - - override def preStart() = { - context watch client - client ! Serial.Opened(connection.port) - Reader.start() - } - - override def receive: Receive = { - - case Serial.Write(data, ack) => - writeBuffer.clear() - data.copyToBuffer(writeBuffer) - val sent = connection.write(writeBuffer) - if (ack != Serial.NoAck) sender ! ack(sent) - - case Serial.Close => - client ! Serial.Closed - context stop self - - case Terminated(`client`) => - context stop self - - // go down with reader thread - case ReaderDied(ex) => throw ex - - } - - override def postStop() = { - connection.close() - } - -} - -private[flow] object SerialOperator { - def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala deleted file mode 100644 index 2d3a6ed..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala +++ /dev/null @@ -1,10 +0,0 @@ -package ch.jodersky.flow - -/** - * Groups settings used in communication over a serial port. - * @param baud baud rate to use with serial port - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - */ -case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala deleted file mode 100644 index 3126618..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala +++ /dev/null @@ -1,108 +0,0 @@ -package ch.jodersky.flow - -import java.nio.ByteBuffer - -import ch.jodersky.jni.nativeLoader - -/** - * Low-level wrapper of native serial backend. - * - * WARNING: Methods in this class allocate native structures and deal with pointers. These - * pointers are handled as longs by java and are NOT checked for correctness, therefore passing - * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. - * - * See SerialConnection for a higher-level, more secured wrapper - * of serial communication. - * - * @param serialAddr address of natively allocated serial configuration structure - */ -@nativeLoader("flow4") -private[flow] class UnsafeSerial(final val serialAddr: Long) { - - final val ParityNone: Int = 0 - final val ParityOdd: Int = 1 - final val ParityEven: Int = 2 - - /** - * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only - * read into the buffer's allocated memory, its position or limit are not changed. - * - * The read is blocking, however it may be interrupted by calling cancelRead() on the given - * serial port. - * - * @param buffer direct ByteBuffer to read into - * @return number of bytes actually read - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws PortInterruptedException if the call to this function was interrupted - * @throws IOException on IO error - */ - @native def read(buffer: ByteBuffer): Int - - /** - * Cancels a read (any caller to read or readDirect will return with a - * PortInterruptedException). This function may be called from any thread. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - @native def cancelRead(): Unit - - /** - * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is - * only taken from the buffer's allocated memory, its position or limit are not changed. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer direct ByteBuffer from which data is taken - * @param length actual amount of data that should be taken from the buffer (this is needed since the native - * backend does not provide a way to query the buffer's current limit) - * @return number of bytes actually written - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws IOException on IO error - */ - @native def write(buffer: ByteBuffer, length: Int): Int - - /** - * Closes an previously open serial port. Natively allocated resources are freed and the serial - * pointer becomes invalid, therefore this function should only be called ONCE per open serial - * port. - * - * A port should not be closed while it is used (by a read or write) as this - * results in undefined behaviour. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - @native def close(): Unit - -} - -private[flow] object UnsafeSerial { - - /** - * Opens a serial port. - * - * @param port name of serial port to open - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - * @return address of natively allocated serial configuration structure - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - @native def debug(value: Boolean): Unit - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala deleted file mode 100644 index 9fa519b..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala +++ /dev/null @@ -1,143 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.{ Actor, ActorRef, Props, Terminated } -import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } -import java.nio.file.StandardWatchEventKinds._ -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } -import scala.util.{ Failure, Success, Try } - -private[flow] class Watcher(from: Option[ActorRef]) extends Actor { - - case class WatcherDied(reason: Throwable) - object WatcherThread extends Thread { - import Watcher.NewFile - - private val service = FileSystems.getDefault().newWatchService() - - def register(directory: Path) = directory.register(service, ENTRY_CREATE) - - override def run(): Unit = { - this.setName("serial-port-watcher") - var stop = false - while (!stop) { - try { - val key = service.take() - key.pollEvents() foreach { ev => - val event = ev.asInstanceOf[WatchEvent[Path]] - if (event.kind == ENTRY_CREATE) { - val directory = key.watchable().asInstanceOf[Path] - val file = event.context() - self.tell(NewFile(directory, file), Actor.noSender) - } - } - key.reset() - } catch { - case _: InterruptedException => stop = true - case _: ClosedWatchServiceException => stop = true - case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) - } - } - } - - def close() = service.close // causes the service to throw a ClosedWatchServiceException - } - - - // directory -> subscribers - private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] - - // directory -> watchkey - private val keys: Map[String, WatchKey] = Map.empty - - def subscribe(directory: String, client: ActorRef): WatchKey = { - val normal = Paths.get(directory).toAbsolutePath - val index = normal.toString - val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) - clients addBinding (index, client) - key - } - - def unsubscribe(directory: String, client: ActorRef): Unit = { - val index = Paths.get(directory).toAbsolutePath.toString - - clients removeBinding (index, sender) - - if (clients.get(index).isEmpty && keys.get(index).isDefined) { - keys(index).cancel() - keys -= index - } - } - - def reply(msg: Any, sender: ActorRef) = { - val origin = from match { - case Some(ref) => ref - case None => self - } - sender.tell(msg, origin) - } - - override def preStart() = { - WatcherThread.setDaemon(true) - WatcherThread.start() - } - - override def receive = { - - case w @ Serial.Watch(directory, skipInitial) => - val normalPath = Paths.get(directory).toAbsolutePath - val normal = normalPath.toString - - Try { - subscribe(directory, sender) - } match { - case Failure(err) => reply(Serial.CommandFailed(w, err), sender) - case Success(key) => - context watch sender - if (!skipInitial) { - Files.newDirectoryStream(normalPath) foreach { path => - if (!Files.isDirectory(path)) { - reply(Serial.Connected(path.toString), sender) - } - } - } - } - - case u @ Serial.Unwatch(directory) => - val normal = Paths.get(directory).toAbsolutePath.toString - - clients.removeBinding(normal, sender) - - if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { - keys(normal).cancel() - keys -= normal - } - - case Terminated(client) => - for ((directory, c) <- clients if c == client) { - unsubscribe(directory, client) - } - - case Watcher.NewFile(directory, file) => - val normal = directory.toAbsolutePath - val absFile = normal resolve file - clients.getOrElse(normal.toString, Set.empty) foreach { client => - reply(Serial.Connected(absFile.toString), client) - } - - case WatcherDied(err) => throw err // go down with watcher thread - - } - - override def postStop() = { - WatcherThread.close() - } - -} - -private[flow] object Watcher { - private case class NewFile(directory: Path, file: Path) - - def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) - -} diff --git a/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala deleted file mode 100644 index ee087a8..0000000 --- a/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala +++ /dev/null @@ -1,19 +0,0 @@ -package ch.jodersky.flow - -/** The requested port could not be found. */ -class NoSuchPortException(message: String) extends Exception(message) - -/** The requested port is in use by someone else. */ -class PortInUseException(message: String) extends Exception(message) - -/** Permissions are not sufficient to open a serial port. */ -class AccessDeniedException(message: String) extends Exception(message) - -/** The settings specified are invalid. */ -class InvalidSettingsException(message: String) extends Exception(message) - -/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ -class PortInterruptedException(message: String) extends Exception(message) - -/** The specified port has been closed. */ -class PortClosedException(message: String) extends Exception(message) diff --git a/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala b/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala deleted file mode 100644 index 8e891ae..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/PseudoTerminal.scala +++ /dev/null @@ -1,43 +0,0 @@ -package ch.jodersky.flow - -import java.io.{File, IOException} -import java.nio.file.Files - -import scala.concurrent.duration._ -import scala.sys.process._ -import scala.util.control.NonFatal - -trait PseudoTerminal { - - final val SetupTimeout = 100.milliseconds - - def withEcho[A](action: (String, SerialSettings) => A): A = { - val dir = Files.createTempDirectory("flow-pty").toFile - val pty = new File(dir, "pty") - - val socat = try { - val s = Seq( - "socat", - "-d -d", - s"exec:cat,pty,raw,b115200,echo=0", - s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}" - ).run(ProcessLogger(println(_)), false) - Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up - s - } catch { - case NonFatal(ex) => - throw new IOException( - "Error running echo service, make sure the program 'socat' is installed", ex) - } - - try { - val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200)) - Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys - result - } finally { - socat.destroy() - dir.delete() - } - } - -} diff --git a/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala b/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala deleted file mode 100644 index 59af305..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/SerialManagerSpec.scala +++ /dev/null @@ -1,38 +0,0 @@ -package ch.jodersky.flow - -import akka.actor.ActorSystem -import akka.io.IO -import akka.testkit.{ImplicitSender, TestKit} -import org.scalatest._ - -class SerialManagerSpec - extends TestKit(ActorSystem("serial-manager")) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with PseudoTerminal { - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - "Serial manager" should { - val manager = IO(Serial) - - "open an existing port" in { - withEcho{ case (port, settings) => - manager ! Serial.Open(port, settings) - expectMsgType[Serial.Opened] - } - } - - "fail opening a non-existing port" in { - val cmd = Serial.Open("nonexistent", SerialSettings(115200)) - manager ! cmd - assert(expectMsgType[Serial.CommandFailed].command == cmd) - } - - } - -} diff --git a/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala b/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala deleted file mode 100644 index 4c1dd94..0000000 --- a/flow-core/src/test/scala/ch/jodersky/flow/SerialOperatorSpec.scala +++ /dev/null @@ -1,49 +0,0 @@ -package ch.jodersky.flow - -import scala.concurrent.duration._ - -import akka.actor.{ActorRef, ActorSystem} -import akka.testkit.{ImplicitSender, TestKit} -import akka.util.ByteString -import org.scalatest._ - -case class Ack(n: Int) extends Serial.Event - -class SerialOperatorSpec - extends TestKit(ActorSystem("serial-operator")) - with ImplicitSender - with WordSpecLike - with Matchers - with BeforeAndAfterAll - with SequentialNestedSuiteExecution - with PseudoTerminal { - - override def afterAll { - TestKit.shutdownActorSystem(system) - } - - def withEchoOp[A](action: ActorRef => A): A = { - withEcho { case (port, settings) => - val connection = SerialConnection.open(port, settings) - val operator = system.actorOf(SerialOperator.apply(connection, 1024, testActor)) - action(operator) - } - } - - "Serial operator" should { - - "follow the correct protocol" in withEchoOp { op => - expectMsgType[Serial.Opened] - - val data = ByteString("hello world".getBytes("utf-8")) - op ! Serial.Write(data) - expectMsg(Serial.Received(data)) - - op ! Serial.Close - expectMsg(Serial.Closed) - - } - - } - -} diff --git a/flow-native/build.sbt b/flow-native/build.sbt deleted file mode 100644 index 2c7ffea..0000000 --- a/flow-native/build.sbt +++ /dev/null @@ -1,8 +0,0 @@ -enablePlugins(JniNative) - -sourceDirectory in nativeCompile := sourceDirectory.value - -// package native libraries from lib_native during releases -val isRelease = sys.props("release") == "true" -enableNativeCompilation in Compile := !isRelease -enableNativeCompilation in Test := !isRelease diff --git a/flow-native/lib_native/armv7l-linux/libflow4.so b/flow-native/lib_native/armv7l-linux/libflow4.so deleted file mode 100755 index f5d9ae0..0000000 Binary files a/flow-native/lib_native/armv7l-linux/libflow4.so and /dev/null differ diff --git a/flow-native/lib_native/i686-linux/libflow4.so b/flow-native/lib_native/i686-linux/libflow4.so deleted file mode 100644 index fb438e2..0000000 Binary files a/flow-native/lib_native/i686-linux/libflow4.so and /dev/null differ diff --git a/flow-native/lib_native/x86_64-darwin/libflow4.dylib b/flow-native/lib_native/x86_64-darwin/libflow4.dylib deleted file mode 100644 index 213415d..0000000 Binary files a/flow-native/lib_native/x86_64-darwin/libflow4.dylib and /dev/null differ diff --git a/flow-native/lib_native/x86_64-linux/libflow4.so b/flow-native/lib_native/x86_64-linux/libflow4.so deleted file mode 100755 index 18fb300..0000000 Binary files a/flow-native/lib_native/x86_64-linux/libflow4.so and /dev/null differ diff --git a/flow-native/src/.gitignore b/flow-native/src/.gitignore deleted file mode 100644 index 1785c46..0000000 --- a/flow-native/src/.gitignore +++ /dev/null @@ -1,12 +0,0 @@ -# CMake -/CMakeFiles/ -/CMakeCache.txt -/cmake_install.cmake -/Makefile - -# Binary files -*.o -*.so* -*.dylib -*.a -*~ \ No newline at end of file diff --git a/flow-native/src/CMakeLists.txt b/flow-native/src/CMakeLists.txt deleted file mode 100644 index a57451c..0000000 --- a/flow-native/src/CMakeLists.txt +++ /dev/null @@ -1,46 +0,0 @@ -################################################################ -# A minimal CMake file that is compatible with sbt-jni # -# # -# All settings required by sbt-jni have been marked so, please # -# add/modify/remove settings to build your specific library. # -################################################################ - -cmake_minimum_required(VERSION 2.8.0) -set(ignoreMe "${SBT}") # sbt-jni defines -DSBT - -# Define project and related variables -# (required by sbt-jni) please use semantic versioning -# -project (flow) -set(PROJECT_VERSION_MAJOR 4) -set(PROJECT_VERSION_MINOR 0) -set(PROJECT_VERSION_PATCH 0) - -set(CMAKE_C_FLAGS "-std=c99") -add_definitions(-Wall) -add_definitions(-Wextra) -add_definitions(-pedantic) - -# Setup JNI -find_package(JNI REQUIRED) -if (JNI_FOUND) - message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}") -endif() - -# Include directories -include_directories(.) -include_directories(include) -include_directories(${JNI_INCLUDE_DIRS}) - -# Sources -file(GLOB LIB_SRC - "*.c" - "platform/posix/*.c" -) - -# Setup installation targets -# (required by sbt-jni) major version should always be appended to library name -# -set (LIB_NAME ${PROJECT_NAME}${PROJECT_VERSION_MAJOR}) -add_library(${LIB_NAME} SHARED ${LIB_SRC}) -install(TARGETS ${LIB_NAME} LIBRARY DESTINATION .) diff --git a/flow-native/src/flow_jni.c b/flow-native/src/flow_jni.c deleted file mode 100644 index 75bffff..0000000 --- a/flow-native/src/flow_jni.c +++ /dev/null @@ -1,150 +0,0 @@ -#include - -#include "flow.h" - -#include "ch_jodersky_flow_UnsafeSerial.h" -#include "ch_jodersky_flow_UnsafeSerial__.h" - -// suppress unused parameter warnings -#define UNUSED_ARG(x) (void)(x) - -static inline void throwException(JNIEnv* env, const char* const exception, const char * const message) -{ - (*env)->ThrowNew(env, (*env)->FindClass(env, exception), message); -} - -/** Check return code and throw exception in case it is non-zero. */ -static void check(JNIEnv* env, int ret) -{ - switch (ret) { - case -E_IO: throwException(env, "java/io/IOException", ""); break; - case -E_BUSY: throwException(env, "ch/jodersky/flow/PortInUseException", ""); break; - case -E_ACCESS_DENIED: throwException(env, "ch/jodersky/flow/AccessDeniedException", ""); break; - case -E_INVALID_SETTINGS: throwException(env, "ch/jodersky/flow/InvalidSettingsException", ""); break; - case -E_INTERRUPT: throwException(env, "ch/jodersky/flow/PortInterruptedException", ""); break; - case -E_NO_PORT: throwException(env, "ch/jodersky/flow/NoSuchPortException", ""); break; - default: return; - } -} - -/** Get pointer to serial config associated to an UnsafeSerial instance. */ -static struct serial_config* get_config(JNIEnv* env, jobject unsafe_serial) -{ - jclass clazz = (*env)->FindClass(env, "ch/jodersky/flow/UnsafeSerial"); - jfieldID field = (*env)->GetFieldID(env, clazz, "serialAddr", "J"); - jlong addr = (*env)->GetLongField(env, unsafe_serial, field); - return (struct serial_config*) (intptr_t) addr; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: open - * Signature: (Ljava/lang/String;IIZI)J - */ -JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open -(JNIEnv *env, jobject instance, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity) -{ - UNUSED_ARG(instance); - - const char *dev = (*env)->GetStringUTFChars(env, port_name, 0); - struct serial_config* config; - int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &config); - (*env)->ReleaseStringUTFChars(env, port_name, dev); - - if (r < 0) { - check(env, r); - return -E_IO; - } - - long jpointer = (long) config; - return (jlong) jpointer; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: read - * Signature: (Ljava/nio/ByteBuffer;)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read -(JNIEnv *env, jobject instance, jobject buffer) -{ - char* local_buffer = (char*) (*env)->GetDirectBufferAddress(env, buffer); - if (local_buffer == NULL) { - throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); - return -E_IO; - } - size_t size = (size_t) (*env)->GetDirectBufferCapacity(env, buffer); - struct serial_config* config = get_config(env, instance); - - int r = serial_read(config, local_buffer, size); - if (r < 0) { - check(env, r); - } - return r; - -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: cancelRead - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead -(JNIEnv *env, jobject instance) -{ - int r = serial_cancel_read(get_config(env, instance)); - if (r < 0) { - check(env, r); - } -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: write - * Signature: (Ljava/nio/ByteBuffer;I)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write -(JNIEnv *env, jobject instance, jobject buffer, jint size) -{ - - char* local_buffer = (char *) (*env)->GetDirectBufferAddress(env, buffer); - if (local_buffer == NULL) { - throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); - return -E_IO; - } - - int r = serial_write(get_config(env, instance), local_buffer, (size_t) size); - if (r < 0) { - check(env, r); - return -E_IO; - } - return r; -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: close - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close -(JNIEnv *env, jobject instance) -{ - int r = serial_close(get_config(env, instance)); - if (r < 0) { - check(env, r); - } -} - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: debug - * Signature: (Z)V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug -(JNIEnv *env, jobject instance, jboolean value) -{ - UNUSED_ARG(env); - UNUSED_ARG(instance); - - serial_debug((bool) value); -} diff --git a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h b/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h deleted file mode 100644 index f80ada0..0000000 --- a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial.h +++ /dev/null @@ -1,45 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include -/* Header for class ch_jodersky_flow_UnsafeSerial */ - -#ifndef _Included_ch_jodersky_flow_UnsafeSerial -#define _Included_ch_jodersky_flow_UnsafeSerial -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: read - * Signature: (Ljava/nio/ByteBuffer;)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_read - (JNIEnv *, jobject, jobject); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: cancelRead - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_cancelRead - (JNIEnv *, jobject); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: write - * Signature: (Ljava/nio/ByteBuffer;I)I - */ -JNIEXPORT jint JNICALL Java_ch_jodersky_flow_UnsafeSerial_write - (JNIEnv *, jobject, jobject, jint); - -/* - * Class: ch_jodersky_flow_UnsafeSerial - * Method: close - * Signature: ()V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_close - (JNIEnv *, jobject); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h b/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h deleted file mode 100644 index 617875f..0000000 --- a/flow-native/src/include/ch_jodersky_flow_UnsafeSerial__.h +++ /dev/null @@ -1,29 +0,0 @@ -/* DO NOT EDIT THIS FILE - it is machine generated */ -#include -/* Header for class ch_jodersky_flow_UnsafeSerial__ */ - -#ifndef _Included_ch_jodersky_flow_UnsafeSerial__ -#define _Included_ch_jodersky_flow_UnsafeSerial__ -#ifdef __cplusplus -extern "C" { -#endif -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: open - * Signature: (Ljava/lang/String;IIZI)J - */ -JNIEXPORT jlong JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_open - (JNIEnv *, jobject, jstring, jint, jint, jboolean, jint); - -/* - * Class: ch_jodersky_flow_UnsafeSerial__ - * Method: debug - * Signature: (Z)V - */ -JNIEXPORT void JNICALL Java_ch_jodersky_flow_UnsafeSerial_00024_debug - (JNIEnv *, jobject, jboolean); - -#ifdef __cplusplus -} -#endif -#endif diff --git a/flow-native/src/include/flow.h b/flow-native/src/include/flow.h deleted file mode 100644 index e3f33b9..0000000 --- a/flow-native/src/include/flow.h +++ /dev/null @@ -1,103 +0,0 @@ -#ifndef FLOW_H -#define FLOW_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include - -// general error codes, whose that are returned by functions -#define E_IO 1 // IO error -#define E_ACCESS_DENIED 2 // access denied -#define E_BUSY 3 // port is busy -#define E_INVALID_SETTINGS 4 // some port settings are invalid -#define E_INTERRUPT 5 // not really an error, function call aborted because port is closed -#define E_NO_PORT 6 // requested port does not exist - -#define PARITY_NONE 0 -#define PARITY_ODD 1 -#define PARITY_EVEN 2 - -/** - * Contains internal configuration of an open serial port. - */ -struct serial_config; - -/** - * Opens a serial port and allocates memory for storing configuration. Note: if this function fails, - * any internally allocated resources will be freed. - * @param port_name name of port - * @param baud baud rate - * @param char_size character size of data transmitted through serial device - * @param two_stop_bits set to use two stop bits instead of one - * @param parity kind of parity checking to use - * @param serial pointer to memory that will be allocated with a serial structure - * @return 0 on success - * @return -E_NO_PORT if the given port does not exist - * @return -E_ACCESS_DENIED if permissions are not sufficient to open port - * @return -E_BUSY if port is already in use - * @return -E_INVALID_SETTINGS if any of the specified settings are invalid - * @return -E_IO on other error - */ -int serial_open( - const char* port_name, - int baud, - int char_size, - bool two_stop_bits, - int parity, - struct serial_config** const serial); - -/** - * Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to - * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT - * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per - * close manual page, close should not be called on a file descriptor that is in use by another thread). - * @param serial pointer to serial configuration that is to be closed (and freed) - * @return 0 on success - * @return -E_IO on error - */ -int serial_close(struct serial_config* const serial); - -/** - * Starts a read from a previously opened serial port. The read is blocking, however it may be - * interrupted by calling 'serial_cancel_read' on the given serial port. - * @param serial pointer to serial configuration from which to read - * @param buffer buffer into which data is read - * @param size maximum buffer size - * @return n>0 the number of bytes read into buffer - * @return -E_INTERRUPT if the call to this function was interrupted - * @return -E_IO on IO error - */ -int serial_read(struct serial_config* const serial, char* const buffer, size_t size); - -/** - * Cancels a blocked read call. This function is thread safe, i.e. it may be called from a thread even - * while another thread is blocked in a read call. - * @param serial_config the serial port to interrupt - * @return 0 on success - * @return -E_IO on error - */ -int serial_cancel_read(struct serial_config* const serial); - -/** - * Writes data to a previously opened serial port. Non bocking. - * @param serial pointer to serial configuration to which to write - * @param data data to write - * @param size number of bytes to write from data - * @return n>0 the number of bytes written - * @return -E_IO on IO error - */ -int serial_write(struct serial_config* const serial, char* const data, size_t size); - -/** - * Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. - */ -void serial_debug(bool value); - -#ifdef __cplusplus -} -#endif - -#endif /* FLOW_H */ diff --git a/flow-native/src/platform/posix/flow.c b/flow-native/src/platform/posix/flow.c deleted file mode 100644 index 969949b..0000000 --- a/flow-native/src/platform/posix/flow.c +++ /dev/null @@ -1,263 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include "flow.h" - -#define DATA_CANCEL 0xffffffff - -static bool debug = false; - -static void print_debug(const char* const msg, int en) -{ - if (debug) { - if (errno == 0) { - fprintf(stderr, "%s", msg); - } else { - fprintf(stderr, "%s: %d\n", msg, en); - } - fflush(stderr); - } -} - -void serial_debug(bool value) -{ - debug = value; -} - -//contains file descriptors used in managing a serial port -struct serial_config { - int port_fd; // file descriptor of serial port - - /* a pipe is used to abort a serial read by writing something into the - * write end of the pipe */ - int pipe_read_fd; // file descriptor, read end of pipe - int pipe_write_fd; // file descriptor, write end of pipe -}; - -int serial_open( - const char* const port_name, - int baud, - int char_size, - bool two_stop_bits, - int parity, - struct serial_config** serial) -{ - - int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK); - - if (fd < 0) { - int en = errno; - print_debug("Error obtaining file descriptor for port", en); - if (en == EACCES) return -E_ACCESS_DENIED; - if (en == ENOENT) return -E_NO_PORT; - return -E_IO; - } - - if (flock(fd, LOCK_EX | LOCK_NB) < 0) { - print_debug("Error acquiring lock on port", errno); - close(fd); - return -E_BUSY; - } - - /* configure new port settings */ - struct termios newtio; - - /* initialize serial interface */ - newtio.c_iflag = 0; - newtio.c_oflag = 0; - newtio.c_lflag = 0; - newtio.c_cflag = CREAD; - - /* set speed */ - speed_t bd; - switch (baud) { - case 50: bd = B50; break; - case 75: bd = B75; break; - case 110: bd = B110; break; - case 134: bd = B134; break; - case 150: bd = B150; break; - case 200: bd = B200; break; - case 300: bd = B300; break; - case 600: bd = B600; break; - case 1200: bd = B1200; break; - case 1800: bd = B1800; break; - case 2400: bd = B2400; break; - case 4800: bd = B4800; break; - case 9600: bd = B9600; break; - case 19200: bd = B19200; break; - case 38400: bd = B38400; break; - case 57600: bd = B57600; break; - case 115200: bd = B115200; break; - case 230400: bd = B230400; break; - default: - close(fd); - print_debug("Invalid baud rate", 0); - return -E_INVALID_SETTINGS; - } - - if (cfsetspeed(&newtio, bd) < 0) { - print_debug("Error setting baud rate", errno); - close(fd); - return -E_IO; - } - - /* set char size*/ - switch (char_size) { - case 5: newtio.c_cflag |= CS5; break; - case 6: newtio.c_cflag |= CS6; break; - case 7: newtio.c_cflag |= CS7; break; - case 8: newtio.c_cflag |= CS8; break; - default: - close(fd); - print_debug("Invalid character size", 0); - return -E_INVALID_SETTINGS; - } - - /* use two stop bits */ - if (two_stop_bits){ - newtio.c_cflag |= CSTOPB; - } - - /* set parity */ - switch (parity) { - case PARITY_NONE: break; - case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break; - case PARITY_EVEN: newtio.c_cflag |= PARENB; break; - default: - close(fd); - print_debug("Invalid parity", 0); - return -E_INVALID_SETTINGS; - } - - if (tcflush(fd, TCIOFLUSH) < 0) { - print_debug("Error flushing serial settings", errno); - close(fd); - return -E_IO; - } - - if (tcsetattr(fd, TCSANOW, &newtio) < 0) { - print_debug("Error applying serial settings", errno); - close(fd); - return -E_IO; - } - - int pipe_fd[2]; - if (pipe(pipe_fd) < 0) { - print_debug("Error opening pipe", errno); - close(fd); - return -E_IO; - } - - if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) { - print_debug("Error setting pipe to non-blocking", errno); - close(fd); - close(pipe_fd[0]); - close(pipe_fd[1]); - return -E_IO; - } - - struct serial_config* s = malloc(sizeof(s)); - if (s == NULL) { - print_debug("Error allocating memory for serial configuration", errno); - close(fd); - close(pipe_fd[0]); - close(pipe_fd[1]); - return -E_IO; - } - - s->port_fd = fd; - s->pipe_read_fd = pipe_fd[0]; - s->pipe_write_fd = pipe_fd[1]; - (*serial) = s; - - return 0; -} - -int serial_close(struct serial_config* const serial) -{ - if (close(serial->pipe_write_fd) < 0) { - print_debug("Error closing write end of pipe", errno); - return -E_IO; - } - if (close(serial->pipe_read_fd) < 0) { - print_debug("Error closing read end of pipe", errno); - return -E_IO; - } - - if (flock(serial->port_fd, LOCK_UN) < 0){ - print_debug("Error releasing lock on port", errno); - return -E_IO; - } - if (close(serial->port_fd) < 0) { - print_debug("Error closing port", errno); - return -E_IO; - } - - free(serial); - return 0; -} - -int serial_read(struct serial_config* const serial, char* const buffer, size_t size) -{ - int port = serial->port_fd; - int pipe = serial->pipe_read_fd; - - fd_set rfds; - FD_ZERO(&rfds); - FD_SET(port, &rfds); - FD_SET(pipe, &rfds); - - int nfds = pipe + 1; - if (pipe < port) nfds = port + 1; - - int n = select(nfds, &rfds, NULL, NULL, NULL); - if (n < 0) { - print_debug("Error trying to call select on port and pipe", errno); - return -E_IO; - } - - if (FD_ISSET(pipe, &rfds)) { - return -E_INTERRUPT; - } else if (FD_ISSET(port, &rfds)) { - int r = read(port, buffer, size); - - // treat 0 bytes read as an error to avoid problems on disconnect - // anyway, after a poll there should be more than 0 bytes available to read - if (r <= 0) { - print_debug("Error data not available after select", errno); - return -E_IO; - } - return r; - } else { - print_debug("Select returned unknown read sets", 0); - return -E_IO; - } -} - -int serial_cancel_read(struct serial_config* const serial) -{ - int data = DATA_CANCEL; - - //write to pipe to wake up any blocked read thread (self-pipe trick) - if (write(serial->pipe_write_fd, &data, 1) < 0) { - print_debug("Error writing to pipe during read cancel", errno); - return -E_IO; - } - - return 0; -} - -int serial_write(struct serial_config* const serial, char* const data, size_t size) -{ - int r = write(serial->port_fd, data, size); - if (r < 0) { - print_debug("Error writing to port", errno); - return -E_IO; - } - return r; -} diff --git a/flow-native/src/platform/windows/README b/flow-native/src/platform/windows/README deleted file mode 100644 index 3d24410..0000000 --- a/flow-native/src/platform/windows/README +++ /dev/null @@ -1 +0,0 @@ -The contents of the file flow.c were found in the avrdude project. They look like they may be a good starting point for serial communication on windows. diff --git a/flow-native/src/platform/windows/flow.c.disabled b/flow-native/src/platform/windows/flow.c.disabled deleted file mode 100644 index 86a267c..0000000 --- a/flow-native/src/platform/windows/flow.c.disabled +++ /dev/null @@ -1,416 +0,0 @@ -/* - * avrdude - A Downloader/Uploader for AVR device programmers - * Copyright (C) 2003, 2004 Martin J. Thomas - * Copyright (C) 2006 Joerg Wunsch - * - * This program is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 2 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with this program. If not, see . - */ - -/* $Id$ */ - -/* - * Native Win32 serial interface for avrdude. - */ - -#include "avrdude.h" - -#if defined(WIN32NATIVE) - -#include -#include -#include /* for isprint */ - -#include "serial.h" - -long serial_recv_timeout = 5000; /* ms */ - -#define W32SERBUFSIZE 1024 - -struct baud_mapping { - long baud; - DWORD speed; -}; - -/* HANDLE hComPort=INVALID_HANDLE_VALUE; */ - -static struct baud_mapping baud_lookup_table [] = { - { 1200, CBR_1200 }, - { 2400, CBR_2400 }, - { 4800, CBR_4800 }, - { 9600, CBR_9600 }, - { 19200, CBR_19200 }, - { 38400, CBR_38400 }, - { 57600, CBR_57600 }, - { 115200, CBR_115200 }, - { 0, 0 } /* Terminator. */ -}; - -static DWORD serial_baud_lookup(long baud) -{ - struct baud_mapping *map = baud_lookup_table; - - while (map->baud) { - if (map->baud == baud) - return map->speed; - map++; - } - - /* - * If a non-standard BAUD rate is used, issue - * a warning (if we are verbose) and return the raw rate - */ - if (verbose > 0) - fprintf(stderr, "%s: serial_baud_lookup(): Using non-standard baud rate: %ld", - progname, baud); - - return baud; -} - - -static BOOL serial_w32SetTimeOut(HANDLE hComPort, DWORD timeout) // in ms -{ - COMMTIMEOUTS ctmo; - ZeroMemory (&ctmo, sizeof(COMMTIMEOUTS)); - ctmo.ReadIntervalTimeout = timeout; - ctmo.ReadTotalTimeoutMultiplier = timeout; - ctmo.ReadTotalTimeoutConstant = timeout; - - return SetCommTimeouts(hComPort, &ctmo); -} - -static int ser_setspeed(union filedescriptor *fd, long baud) -{ - DCB dcb; - HANDLE hComPort = (HANDLE)fd->pfd; - - ZeroMemory (&dcb, sizeof(DCB)); - dcb.DCBlength = sizeof(DCB); - dcb.BaudRate = serial_baud_lookup (baud); - dcb.fBinary = 1; - dcb.fDtrControl = DTR_CONTROL_DISABLE; - dcb.fRtsControl = RTS_CONTROL_DISABLE; - dcb.ByteSize = 8; - dcb.Parity = NOPARITY; - dcb.StopBits = ONESTOPBIT; - - if (!SetCommState(hComPort, &dcb)) - return -1; - - return 0; -} - - -static int ser_open(char * port, long baud, union filedescriptor *fdp) -{ - LPVOID lpMsgBuf; - HANDLE hComPort=INVALID_HANDLE_VALUE; - char *newname = 0; - - /* - * If the port is of the form "net::", then - * handle it as a TCP connection to a terminal server. - * - * This is curently not implemented for Win32. - */ - if (strncmp(port, "net:", strlen("net:")) == 0) { - fprintf(stderr, - "%s: ser_open(): network connects are currently not" - "implemented for Win32 environments\n", - progname); - return -1; - } - - if (strncasecmp(port, "com", strlen("com")) == 0) { - - // prepend "\\\\.\\" to name, required for port # >= 10 - newname = malloc(strlen("\\\\.\\") + strlen(port) + 1); - - if (newname == 0) { - fprintf(stderr, - "%s: ser_open(): out of memory\n", - progname); - exit(1); - } - strcpy(newname, "\\\\.\\"); - strcat(newname, port); - - port = newname; - } - - hComPort = CreateFile(port, GENERIC_READ | GENERIC_WRITE, 0, NULL, - OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); - - if (hComPort == INVALID_HANDLE_VALUE) { - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL); - fprintf(stderr, "%s: ser_open(): can't open device \"%s\": %s\n", - progname, port, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - return -1; - } - - if (!SetupComm(hComPort, W32SERBUFSIZE, W32SERBUFSIZE)) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set buffers for \"%s\"\n", - progname, port); - return -1; - } - - fdp->pfd = (void *)hComPort; - if (ser_setspeed(fdp, baud) != 0) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set com-state for \"%s\"\n", - progname, port); - return -1; - } - - if (!serial_w32SetTimeOut(hComPort,0)) - { - CloseHandle(hComPort); - fprintf(stderr, "%s: ser_open(): can't set initial timeout for \"%s\"\n", - progname, port); - return -1; - } - - if (newname != 0) { - free(newname); - } - return 0; -} - - -static void ser_close(union filedescriptor *fd) -{ - HANDLE hComPort=(HANDLE)fd->pfd; - if (hComPort != INVALID_HANDLE_VALUE) - CloseHandle (hComPort); - - hComPort = INVALID_HANDLE_VALUE; -} - -static int ser_set_dtr_rts(union filedescriptor *fd, int is_on) -{ - HANDLE hComPort=(HANDLE)fd->pfd; - - if (is_on) { - EscapeCommFunction(hComPort, SETDTR); - EscapeCommFunction(hComPort, SETRTS); - } else { - EscapeCommFunction(hComPort, CLRDTR); - EscapeCommFunction(hComPort, CLRRTS); - } - return 0; -} - - -static int ser_send(union filedescriptor *fd, unsigned char * buf, size_t buflen) -{ - size_t len = buflen; - unsigned char c='\0'; - DWORD written; - unsigned char * b = buf; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_send(): port not open\n", - progname); - exit(1); - } - - if (!len) - return 0; - - if (verbose > 3) - { - fprintf(stderr, "%s: Send: ", progname); - - while (len) { - c = *b; - if (isprint(c)) { - fprintf(stderr, "%c ", c); - } - else { - fprintf(stderr, ". "); - } - fprintf(stderr, "[%02x] ", c); - b++; - len--; - } - fprintf(stderr, "\n"); - } - - serial_w32SetTimeOut(hComPort,500); - - if (!WriteFile (hComPort, buf, buflen, &written, NULL)) { - fprintf(stderr, "%s: ser_send(): write error: %s\n", - progname, "sorry no info avail"); // TODO - exit(1); - } - - if (written != buflen) { - fprintf(stderr, "%s: ser_send(): size/send mismatch\n", - progname); - exit(1); - } - - return 0; -} - - -static int ser_recv(union filedescriptor *fd, unsigned char * buf, size_t buflen) -{ - unsigned char c; - unsigned char * p = buf; - DWORD read; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_read(): port not open\n", - progname); - exit(1); - } - - serial_w32SetTimeOut(hComPort, serial_recv_timeout); - - if (!ReadFile(hComPort, buf, buflen, &read, NULL)) { - LPVOID lpMsgBuf; - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL ); - fprintf(stderr, "%s: ser_recv(): read error: %s\n", - progname, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - exit(1); - } - - /* time out detected */ - if (read == 0) { - if (verbose > 1) - fprintf(stderr, - "%s: ser_recv(): programmer is not responding\n", - progname); - return -1; - } - - p = buf; - - if (verbose > 3) - { - fprintf(stderr, "%s: Recv: ", progname); - - while (read) { - c = *p; - if (isprint(c)) { - fprintf(stderr, "%c ", c); - } - else { - fprintf(stderr, ". "); - } - fprintf(stderr, "[%02x] ", c); - - p++; - read--; - } - fprintf(stderr, "\n"); - } - return 0; -} - - -static int ser_drain(union filedescriptor *fd, int display) -{ - // int rc; - unsigned char buf[10]; - BOOL readres; - DWORD read; - - HANDLE hComPort=(HANDLE)fd->pfd; - - if (hComPort == INVALID_HANDLE_VALUE) { - fprintf(stderr, "%s: ser_drain(): port not open\n", - progname); - exit(1); - } - - serial_w32SetTimeOut(hComPort,250); - - if (display) { - fprintf(stderr, "drain>"); - } - - while (1) { - readres=ReadFile(hComPort, buf, 1, &read, NULL); - if (!readres) { - LPVOID lpMsgBuf; - FormatMessage( - FORMAT_MESSAGE_ALLOCATE_BUFFER | - FORMAT_MESSAGE_FROM_SYSTEM | - FORMAT_MESSAGE_IGNORE_INSERTS, - NULL, - GetLastError(), - MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language - (LPTSTR) &lpMsgBuf, - 0, - NULL ); - fprintf(stderr, "%s: ser_drain(): read error: %s\n", - progname, (char*)lpMsgBuf); - LocalFree( lpMsgBuf ); - exit(1); - } - - if (read) { // data avail - if (display) fprintf(stderr, "%02x ", buf[0]); - } - else { // no more data - if (display) fprintf(stderr, "/run`. - -All projects, including samples, can be listed by running `sbt projects`. diff --git a/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala b/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala deleted file mode 100644 index 74433db..0000000 --- a/flow-samples/terminal-stream/src/main/scala/ch/jodersky/flow/samples/terminalstream/Main.scala +++ /dev/null @@ -1,66 +0,0 @@ -package ch.jodersky.flow -package samples.terminalstream - -import scala.concurrent.Future -import scala.concurrent.duration._ -import scala.io.StdIn - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Flow, Keep, Sink, Source} -import akka.util.ByteString - -import stream.Serial - -object Main { - - final val Delay = FiniteDuration(500, MILLISECONDS) - - implicit val system = ActorSystem("terminal-stream") - implicit val materializer = ActorMaterializer() - - def ask(label: String, default: String) = { - print(label + " [" + default.toString + "]: ") - val in = StdIn.readLine() - println("") - if (in.isEmpty) default else in - } - - def main(args: Array[String]): Unit = { - import system.dispatcher - - val port = ask("Device", "/dev/ttyACM0") - val baud = ask("Baud rate", "115200").toInt - val cs = ask("Char size", "8").toInt - val tsb = ask("Use two stop bits", "false").toBoolean - val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) - val settings = SerialSettings(baud, cs, tsb, parity) - - val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] = - Serial().open(port, settings) - - val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data => - println("server says: " + data.decodeString("UTF-8")) - } - - val ticker: Source[ByteString, _] = Source.tick(Delay, Delay, ()).scan(0){case (x, _) => - x + 1 - }.map{ x => - println(x) - ByteString(x.toString) - } - - val connection: Future[Serial.Connection] = ticker.viaMat(serial)(Keep.right).to(printer).run() - - connection map { conn => - println("Connected to " + conn.port) - StdIn.readLine("Press enter to exit") - } recover { case err => - println("Cannot connect: " + err) - } andThen { case _ => - system.terminate() - } - - } - -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala deleted file mode 100644 index 2b30663..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/ConsoleReader.scala +++ /dev/null @@ -1,29 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.Actor -import scala.io.StdIn - -class ConsoleReader extends Actor { - import context._ - import ConsoleReader._ - - def receive = { - case Read => - StdIn.readLine() match { - case ":q" | null => parent ! EOT - case s => { - parent ! ConsoleInput(s) - } - } - } - -} - -object ConsoleReader { - - case object Read - case object EOT - case class ConsoleInput(in: String) - -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala deleted file mode 100644 index 80c0f80..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Main.scala +++ /dev/null @@ -1,30 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.ActorSystem -import scala.io.StdIn - -object Main { - - def ask(label: String, default: String) = { - print(label + " [" + default.toString + "]: ") - val in = StdIn.readLine() - println("") - if (in.isEmpty) default else in - } - - def main(args: Array[String]): Unit = { - val port = ask("Device", "/dev/ttyACM0") - val baud = ask("Baud rate", "115200").toInt - val cs = ask("Char size", "8").toInt - val tsb = ask("Use two stop bits", "false").toBoolean - val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) - val settings = SerialSettings(baud, cs, tsb, parity) - - println("Starting terminal system, enter :q to exit.") - Serial.debug(true) - val system = ActorSystem("flow") - val terminal = system.actorOf(Terminal(port, settings), name = "terminal") - system.registerOnTermination(println("Stopped terminal system.")) - } -} diff --git a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala b/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala deleted file mode 100644 index de60620..0000000 --- a/flow-samples/terminal/src/main/scala/ch/jodersky/flow/samples/terminal/Terminal.scala +++ /dev/null @@ -1,75 +0,0 @@ -package ch.jodersky.flow -package samples.terminal - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } -import akka.io.IO -import akka.util.ByteString - -class Terminal(port: String, settings: SerialSettings) extends Actor with ActorLogging { - import Terminal._ - import context._ - - val reader = actorOf(Props[ConsoleReader]) - - log.info(s"Requesting manager to open port: ${port}, baud: ${settings.baud}") - IO(Serial) ! Serial.Open(port, settings) - - override def postStop() = { - system.terminate() - } - - def receive = { - case Serial.CommandFailed(cmd, reason) => { - log.error(s"Connection failed, stopping terminal. Reason: ${reason}") - context stop self - } - case Serial.Opened(port) => { - log.info(s"Port ${port} is now open.") - val operator = sender - context become opened(operator) - context watch operator - reader ! ConsoleReader.Read - } - } - - def opened(operator: ActorRef): Receive = { - - case Serial.Received(data) => { - log.info(s"Received data: ${formatData(data)}") - } - - case Terminal.Wrote(data) => log.info(s"Wrote data: ${formatData(data)}") - - case Serial.Closed => { - log.info("Operator closed normally, exiting terminal.") - context unwatch operator - context stop self - } - - case Terminated(`operator`) => { - log.error("Operator crashed, exiting terminal.") - context stop self - } - - case ConsoleReader.EOT => { - log.info("Initiating close.") - operator ! Serial.Close - } - - case ConsoleReader.ConsoleInput(input) => { - val data = ByteString(input.getBytes) - operator ! Serial.Write(data, length => Wrote(data.take(length))) - reader ! ConsoleReader.Read - } - } - -} - -object Terminal { - case class Wrote(data: ByteString) extends Serial.Event - - def apply(port: String, settings: SerialSettings) = Props(classOf[Terminal], port, settings) - - private def formatData(data: ByteString) = data.mkString("[", ",", "]") + " " + (new String(data.toArray, "UTF-8")) - -} diff --git a/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala b/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala deleted file mode 100644 index 650d08e..0000000 --- a/flow-samples/watcher/src/main/scala/ch/jodersky/flow/samples/watcher/main.scala +++ /dev/null @@ -1,50 +0,0 @@ -package ch.jodersky.flow -package samples.watcher - -import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } -import akka.io.IO -import scala.io.StdIn - -class Watcher extends Actor with ActorLogging { - import context._ - - val ports = List( - "/dev/ttyUSB\\d+", - "/dev/ttyACM\\d+", - "/dev/cu\\d+", - "/dev/ttyS\\d+" - ) - - override def preStart() = { - val cmd = Serial.Watch() - IO(Serial) ! cmd //watch for new devices - log.info(s"Watching ${cmd.directory} for new devices.") - } - - def receive = { - - case Serial.CommandFailed(w: Serial.Watch, err) => - log.error(err, s"Could not get a watch on ${w.directory}.") - context stop self - - case Serial.Connected(path) => - log.info(s"New device: ${path}") - ports.find(path matches _) match { - case Some(port) => log.info(s"Device is a serial device.") - case None => log.warning(s"Device is NOT serial device.") - } - - } - -} - -object Main { - - def main(args: Array[String]): Unit = { - val system = ActorSystem("flow") - val watcher = system.actorOf(Props(classOf[Watcher]), name = "watcher") - StdIn.readLine() - system.terminate() - } - -} diff --git a/flow-stream/build.sbt b/flow-stream/build.sbt deleted file mode 100644 index c9aa7eb..0000000 --- a/flow-stream/build.sbt +++ /dev/null @@ -1,5 +0,0 @@ -import flow.Dependencies - -libraryDependencies += Dependencies.akkaActor -libraryDependencies += Dependencies.akkaStream -libraryDependencies += Dependencies.scalatest % "test" diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala deleted file mode 100644 index d478de8..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala +++ /dev/null @@ -1,67 +0,0 @@ -package ch.jodersky.flow -package stream - -import akka.stream.scaladsl.Source -import scala.concurrent.Future - -import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} -import akka.io.IO -import akka.stream.scaladsl.Flow -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial} -import impl._ - -object Serial extends ExtensionId[Serial] with ExtensionIdProvider { - - /** - * Represents a prospective serial connection. - */ - case class Connection(port: String, settings: SerialSettings) - - case class Watch(ports: Set[String]) - - def apply()(implicit system: ActorSystem): Serial = super.apply(system) - - override def lookup() = Serial - - override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) - -} - -/** - * Entry point to streaming over serial ports. - * The design of this API is inspired by Akka's Tcp streams. - */ -class Serial(system: ExtendedActorSystem) extends Extension { - - /** - * Creates a Flow that will open a serial port when materialized. - * This Flow then represents an open serial connection: data pushed to its - * inlet will be written to the underlying serial port, and data received - * on the port will be emitted by its outlet. - * @param port name of serial port to open - * @param settings settings to use with serial port - * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped - * @param bufferSize maximum read and write buffer sizes - * @return a Flow associated to the given serial port - */ - def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): - Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( - new SerialConnectionStage( - IO(CoreSerial)(system), - port, - settings, - failOnOverflow, - bufferSize - ) - ) - - def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( - new WatcherStage( - IO(CoreSerial)(system), - ports - ) - ) - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala deleted file mode 100644 index 78438f9..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala +++ /dev/null @@ -1,5 +0,0 @@ -package ch.jodersky.flow -package stream - -/** Represents a generic exception occured during streaming of serial data. */ -class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala deleted file mode 100644 index 8eee61c..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala +++ /dev/null @@ -1,4 +0,0 @@ -package ch.jodersky.flow -package stream - -class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala deleted file mode 100644 index 764b054..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala +++ /dev/null @@ -1,172 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.{FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings} - -/** - * Graph logic that handles establishing and forwarding serial communication. - * The underlying stream is closed when downstream (output) finishes, - * upstream (input) closes are ignored. - */ -private[stream] class SerialConnectionLogic( - shape: FlowShape[ByteString, ByteString], - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int, - connectionPromise: Promise[Serial.Connection]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - import SerialConnectionLogic._ - - /** Receives data and writes it to the serial backend. */ - private def in: Inlet[ByteString] = shape.in - - /** Receives data from the serial backend and pushes it downstream. */ - private def out: Outlet[ByteString] = shape.out - - /** Implicit alias to stageActor so it will be used in "!" calls, without - * explicitly specifying a sender. */ - implicit private def self = stageActor.ref - - /** - * Input handler for an established connection. - * @param operator the operator actor of the established connection - */ - class ConnectedInHandler(operator: ActorRef) extends InHandler { - - override def onPush(): Unit = { - val elem = grab(in) - require(elem != null) // reactive streams requirement - operator ! CoreSerial.Write(elem, _ => WriteAck) - } - - override def onUpstreamFinish(): Unit = { - if (isClosed(out)) { // close serial connection if output is also closed - operator ! CoreSerial.Close - } - } - - } - - class ConnectedOutHandler(operator: ActorRef) extends OutHandler { - // implicit alias to stage actor, so it will be used in "!" calls - implicit val self = stageActor.ref - - override def onPull(): Unit = { - // serial connections are at the end of the "backpressure chain", - // they do not natively support backpressure (as does TCP for example) - // therefore nothing is done here - } - - override def onDownstreamFinish(): Unit = { - // closing downstream also closes the underlying connection - operator ! CoreSerial.Close - } - - } - - override def preStart(): Unit = { - setKeepGoing(true) // serial connection operator will manage completing stage - getStageActor(connecting) - stageActor watch manager - manager ! CoreSerial.Open(port, settings, bufferSize) - } - - setHandler(in, IgnoreTerminateInput) - setHandler(out, IgnoreTerminateOutput) - - /** Initial behavior, before a serial connection is established. */ - private def connecting(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`manager`) => - val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.Opened(port) => - val operator = sender - setHandler(in, new ConnectedInHandler(operator)) - setHandler(out, new ConnectedOutHandler(operator)) - stageActor become connected(operator) - connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value - stageActor unwatch manager - stageActor watch operator - if (!isClosed(in)) { - pull(in) // start pulling input - } - - case other => - val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") - failStage(ex) - connectionPromise.failure(ex) - - } - - } - - /** Behaviour once a connection has been established. It is assumed that operator is not null. */ - private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`operator`) => - failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) - - case CoreSerial.CommandFailed(cmd, reason) => - failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) - - case CoreSerial.Closed => - completeStage() - - case CoreSerial.Received(data) => - if (isAvailable(out)) { - push(out, data) - } else if (failOnOverflow) { - /* Note that the native backend does not provide any way of informing about - * dropped serial data. However, in most cases, a computer capable of running flow - * is also capable of processing incoming serial data at typical baud rates. - * Hence packets will usually only be dropped if an application that uses flow - * backpressures, which can however be detected here. */ - failStage(new StreamSerialException("Incoming serial data was dropped.")) - } - - case WriteAck => - if (!isClosed(in)) { - pull(in) - } - - case other => - failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) - - } - - } - -} - -private[stream] object SerialConnectionLogic { - - case object WriteAck extends CoreSerial.Event - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala deleted file mode 100644 index ceeac01..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala +++ /dev/null @@ -1,49 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} -import akka.util.ByteString - -/** - * Graph stage that establishes and thereby materializes a serial connection. - * The actual connection logic is deferred to [[SerialConnectionLogic]]. - */ -private[stream] class SerialConnectionStage( - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int -) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { - - val in: Inlet[ByteString] = Inlet("Serial.in") - val out: Outlet[ByteString] = Outlet("Serial.out") - - val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): - (GraphStageLogic, Future[Serial.Connection]) = { - - val connectionPromise = Promise[Serial.Connection] - - val logic = new SerialConnectionLogic( - shape, - manager, - port, - settings, - failOnOverflow, - bufferSize, - connectionPromise - ) - - (logic, connectionPromise.future) - } - - override def toString = s"Serial($port)" - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala deleted file mode 100644 index 60b7c90..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala +++ /dev/null @@ -1,65 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.SourceShape -import akka.stream.stage.GraphStageLogic -import ch.jodersky.flow.{Serial => CoreSerial} - -private[stream] class WatcherLogic( - shape: SourceShape[String], - ioManager: ActorRef, - ports: Set[String], - watchPromise: Promise[Serial.Watch]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - - implicit private def self = stageActor.ref - - override def preStart(): Unit = { - getStageActor(receive) - stageActor watch ioManager - for (dir <- WatcherLogic.getDirs(ports)) { - ioManager ! CoreSerial.Watch(dir, skipInitial = false) - } - } - - setHandler(shape.out, IgnoreTerminateOutput) - - private def receive(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`ioManager`) => - val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.Connected(port) => - if (ports contains port) { - if (isAvailable(shape.out)) { - push(shape.out, port) - } - } - - case other => - failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) - - } - } - -} - -private[stream] object WatcherLogic { - def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala deleted file mode 100644 index 82fad69..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala +++ /dev/null @@ -1,38 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, Outlet, SourceShape} -import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} - - -private[stream] class WatcherStage( - ioManager: ActorRef, - ports: Set[String] -) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { - - val out = Outlet[String]("Watcher.out") - - val shape = new SourceShape(out) - - override def createLogicAndMaterializedValue(attributes: Attributes): - (GraphStageLogic, Future[Serial.Watch]) = { - - val promise = Promise[Serial.Watch] - - val logic = new WatcherLogic( - shape, - ioManager, - ports, - promise - ) - - (logic, promise.future) - } - - override def toString = s"Watcher($ports)" - -} diff --git a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala b/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala deleted file mode 100644 index 1a1ebdc..0000000 --- a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -package ch.jodersky.flow -package stream - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.ByteString -import org.scalatest._ - -class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { - - implicit val system = ActorSystem("flow-test") - implicit val materializer = ActorMaterializer() - - override def afterAll { - system.terminate() - } - - "Serial stream" should { - val data = ByteString(("hello world").getBytes("utf-8")) - - "receive the same data it sends in an echo test" in { - withEcho { case (port, settings) => - val graph = Source.single(data) - .via(Serial().open(port, settings)) // send to echo pty - .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS - .dropWhile(_ != data) - .toMat(Sink.head)(Keep.right) - - Await.result(graph.run(), 2.seconds) - } - } - - "fail if the underlying pty fails" in { - val result = withEcho { case (port, settings) => - Source.single(data) - .via(Serial().open(port, settings)) - .toMat(Sink.last)(Keep.right) - .run()} - - intercept[StreamSerialException] { - Await.result(result, 10.seconds) - } - } - - } - -} diff --git a/native/build.sbt b/native/build.sbt new file mode 100644 index 0000000..2c7ffea --- /dev/null +++ b/native/build.sbt @@ -0,0 +1,8 @@ +enablePlugins(JniNative) + +sourceDirectory in nativeCompile := sourceDirectory.value + +// package native libraries from lib_native during releases +val isRelease = sys.props("release") == "true" +enableNativeCompilation in Compile := !isRelease +enableNativeCompilation in Test := !isRelease diff --git a/native/src/.gitignore b/native/src/.gitignore new file mode 100644 index 0000000..1785c46 --- /dev/null +++ b/native/src/.gitignore @@ -0,0 +1,12 @@ +# CMake +/CMakeFiles/ +/CMakeCache.txt +/cmake_install.cmake +/Makefile + +# Binary files +*.o +*.so* +*.dylib +*.a +*~ \ No newline at end of file diff --git a/native/src/CMakeLists.txt b/native/src/CMakeLists.txt new file mode 100644 index 0000000..a6b037d --- /dev/null +++ b/native/src/CMakeLists.txt @@ -0,0 +1,46 @@ +################################################################ +# A minimal CMake file that is compatible with sbt-jni # +# # +# All settings required by sbt-jni have been marked so, please # +# add/modify/remove settings to build your specific library. # +################################################################ + +cmake_minimum_required(VERSION 2.8.0) +set(ignoreMe "${SBT}") # sbt-jni defines -DSBT + +# Define project and related variables +# (required by sbt-jni) please use semantic versioning +# +project (akkaserial) +set(PROJECT_VERSION_MAJOR 1) +set(PROJECT_VERSION_MINOR 0) +set(PROJECT_VERSION_PATCH 0) + +set(CMAKE_C_FLAGS "-std=c99") +add_definitions(-Wall) +add_definitions(-Wextra) +add_definitions(-pedantic) + +# Setup JNI +find_package(JNI REQUIRED) +if (JNI_FOUND) + message (STATUS "JNI include directories: ${JNI_INCLUDE_DIRS}") +endif() + +# Include directories +include_directories(.) +include_directories(include) +include_directories(${JNI_INCLUDE_DIRS}) + +# Sources +file(GLOB LIB_SRC + "*.c" + "platform/posix/*.c" +) + +# Setup installation targets +# (required by sbt-jni) major version should always be appended to library name +# +set (LIB_NAME ${PROJECT_NAME}${PROJECT_VERSION_MAJOR}) +add_library(${LIB_NAME} SHARED ${LIB_SRC}) +install(TARGETS ${LIB_NAME} LIBRARY DESTINATION .) diff --git a/native/src/akka_serial_jni.c b/native/src/akka_serial_jni.c new file mode 100644 index 0000000..126a7bf --- /dev/null +++ b/native/src/akka_serial_jni.c @@ -0,0 +1,150 @@ +#include + +#include "akka_serial.h" + +#include "akka_serial_sync_UnsafeSerial.h" +#include "akka_serial_sync_UnsafeSerial__.h" + +// suppress unused parameter warnings +#define UNUSED_ARG(x) (void)(x) + +static inline void throwException(JNIEnv* env, const char* const exception, const char * const message) +{ + (*env)->ThrowNew(env, (*env)->FindClass(env, exception), message); +} + +/** Check return code and throw exception in case it is non-zero. */ +static void check(JNIEnv* env, int ret) +{ + switch (ret) { + case -E_IO: throwException(env, "java/io/IOException", ""); break; + case -E_BUSY: throwException(env, "akka/serial/PortInUseException", ""); break; + case -E_ACCESS_DENIED: throwException(env, "akka/serial/AccessDeniedException", ""); break; + case -E_INVALID_SETTINGS: throwException(env, "akka/serial/InvalidSettingsException", ""); break; + case -E_INTERRUPT: throwException(env, "akka/serial/PortInterruptedException", ""); break; + case -E_NO_PORT: throwException(env, "akka/serial/NoSuchPortException", ""); break; + default: return; + } +} + +/** Get pointer to serial config associated to an UnsafeSerial instance. */ +static struct serial_config* get_config(JNIEnv* env, jobject unsafe_serial) +{ + jclass clazz = (*env)->FindClass(env, "akka/serial/sync/UnsafeSerial"); + jfieldID field = (*env)->GetFieldID(env, clazz, "serialAddr", "J"); + jlong addr = (*env)->GetLongField(env, unsafe_serial, field); + return (struct serial_config*) (intptr_t) addr; +} + +/* + * Class: akka_serial_sync_UnsafeSerial__ + * Method: open + * Signature: (Ljava/lang/String;IIZI)J + */ +JNIEXPORT jlong JNICALL Java_akka_serial_sync_UnsafeSerial_00024_open +(JNIEnv *env, jobject instance, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity) +{ + UNUSED_ARG(instance); + + const char *dev = (*env)->GetStringUTFChars(env, port_name, 0); + struct serial_config* config; + int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &config); + (*env)->ReleaseStringUTFChars(env, port_name, dev); + + if (r < 0) { + check(env, r); + return -E_IO; + } + + long jpointer = (long) config; + return (jlong) jpointer; +} + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: read + * Signature: (Ljava/nio/ByteBuffer;)I + */ +JNIEXPORT jint JNICALL Java_akka_serial_sync_UnsafeSerial_read +(JNIEnv *env, jobject instance, jobject buffer) +{ + char* local_buffer = (char*) (*env)->GetDirectBufferAddress(env, buffer); + if (local_buffer == NULL) { + throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); + return -E_IO; + } + size_t size = (size_t) (*env)->GetDirectBufferCapacity(env, buffer); + struct serial_config* config = get_config(env, instance); + + int r = serial_read(config, local_buffer, size); + if (r < 0) { + check(env, r); + } + return r; + +} + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: cancelRead + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_cancelRead +(JNIEnv *env, jobject instance) +{ + int r = serial_cancel_read(get_config(env, instance)); + if (r < 0) { + check(env, r); + } +} + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: write + * Signature: (Ljava/nio/ByteBuffer;I)I + */ +JNIEXPORT jint JNICALL Java_akka_serial_sync_UnsafeSerial_write +(JNIEnv *env, jobject instance, jobject buffer, jint size) +{ + + char* local_buffer = (char *) (*env)->GetDirectBufferAddress(env, buffer); + if (local_buffer == NULL) { + throwException(env, "java/lang/IllegalArgumentException", "buffer is not direct"); + return -E_IO; + } + + int r = serial_write(get_config(env, instance), local_buffer, (size_t) size); + if (r < 0) { + check(env, r); + return -E_IO; + } + return r; +} + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: close + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_close +(JNIEnv *env, jobject instance) +{ + int r = serial_close(get_config(env, instance)); + if (r < 0) { + check(env, r); + } +} + +/* + * Class: akka_serial_sync_UnsafeSerial__ + * Method: debug + * Signature: (Z)V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_00024_debug +(JNIEnv *env, jobject instance, jboolean value) +{ + UNUSED_ARG(env); + UNUSED_ARG(instance); + + serial_debug((bool) value); +} diff --git a/native/src/include/akka_serial.h b/native/src/include/akka_serial.h new file mode 100644 index 0000000..23507a9 --- /dev/null +++ b/native/src/include/akka_serial.h @@ -0,0 +1,103 @@ +#ifndef AKKA_SERIAL_H +#define AKKA_SERIAL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include + +// general error codes, whose that are returned by functions +#define E_IO 1 // IO error +#define E_ACCESS_DENIED 2 // access denied +#define E_BUSY 3 // port is busy +#define E_INVALID_SETTINGS 4 // some port settings are invalid +#define E_INTERRUPT 5 // not really an error, function call aborted because port is closed +#define E_NO_PORT 6 // requested port does not exist + +#define PARITY_NONE 0 +#define PARITY_ODD 1 +#define PARITY_EVEN 2 + +/** + * Contains internal configuration of an open serial port. + */ +struct serial_config; + +/** + * Opens a serial port and allocates memory for storing configuration. Note: if this function fails, + * any internally allocated resources will be freed. + * @param port_name name of port + * @param baud baud rate + * @param char_size character size of data transmitted through serial device + * @param two_stop_bits set to use two stop bits instead of one + * @param parity kind of parity checking to use + * @param serial pointer to memory that will be allocated with a serial structure + * @return 0 on success + * @return -E_NO_PORT if the given port does not exist + * @return -E_ACCESS_DENIED if permissions are not sufficient to open port + * @return -E_BUSY if port is already in use + * @return -E_INVALID_SETTINGS if any of the specified settings are invalid + * @return -E_IO on other error + */ +int serial_open( + const char* port_name, + int baud, + int char_size, + bool two_stop_bits, + int parity, + struct serial_config** const serial); + +/** + * Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to + * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT + * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per + * close manual page, close should not be called on a file descriptor that is in use by another thread). + * @param serial pointer to serial configuration that is to be closed (and freed) + * @return 0 on success + * @return -E_IO on error + */ +int serial_close(struct serial_config* const serial); + +/** + * Starts a read from a previously opened serial port. The read is blocking, however it may be + * interrupted by calling 'serial_cancel_read' on the given serial port. + * @param serial pointer to serial configuration from which to read + * @param buffer buffer into which data is read + * @param size maximum buffer size + * @return n>0 the number of bytes read into buffer + * @return -E_INTERRUPT if the call to this function was interrupted + * @return -E_IO on IO error + */ +int serial_read(struct serial_config* const serial, char* const buffer, size_t size); + +/** + * Cancels a blocked read call. This function is thread safe, i.e. it may be called from a thread even + * while another thread is blocked in a read call. + * @param serial_config the serial port to interrupt + * @return 0 on success + * @return -E_IO on error + */ +int serial_cancel_read(struct serial_config* const serial); + +/** + * Writes data to a previously opened serial port. Non bocking. + * @param serial pointer to serial configuration to which to write + * @param data data to write + * @param size number of bytes to write from data + * @return n>0 the number of bytes written + * @return -E_IO on IO error + */ +int serial_write(struct serial_config* const serial, char* const data, size_t size); + +/** + * Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. + */ +void serial_debug(bool value); + +#ifdef __cplusplus +} +#endif + +#endif /* AKKA_SERIAL_H */ diff --git a/native/src/include/akka_serial_sync_UnsafeSerial.h b/native/src/include/akka_serial_sync_UnsafeSerial.h new file mode 100644 index 0000000..893ccf6 --- /dev/null +++ b/native/src/include/akka_serial_sync_UnsafeSerial.h @@ -0,0 +1,45 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class akka_serial_sync_UnsafeSerial */ + +#ifndef _Included_akka_serial_sync_UnsafeSerial +#define _Included_akka_serial_sync_UnsafeSerial +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: read + * Signature: (Ljava/nio/ByteBuffer;)I + */ +JNIEXPORT jint JNICALL Java_akka_serial_sync_UnsafeSerial_read + (JNIEnv *, jobject, jobject); + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: cancelRead + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_cancelRead + (JNIEnv *, jobject); + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: write + * Signature: (Ljava/nio/ByteBuffer;I)I + */ +JNIEXPORT jint JNICALL Java_akka_serial_sync_UnsafeSerial_write + (JNIEnv *, jobject, jobject, jint); + +/* + * Class: akka_serial_sync_UnsafeSerial + * Method: close + * Signature: ()V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_close + (JNIEnv *, jobject); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/native/src/include/akka_serial_sync_UnsafeSerial__.h b/native/src/include/akka_serial_sync_UnsafeSerial__.h new file mode 100644 index 0000000..5f4bdb9 --- /dev/null +++ b/native/src/include/akka_serial_sync_UnsafeSerial__.h @@ -0,0 +1,29 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class akka_serial_sync_UnsafeSerial__ */ + +#ifndef _Included_akka_serial_sync_UnsafeSerial__ +#define _Included_akka_serial_sync_UnsafeSerial__ +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: akka_serial_sync_UnsafeSerial__ + * Method: open + * Signature: (Ljava/lang/String;IIZI)J + */ +JNIEXPORT jlong JNICALL Java_akka_serial_sync_UnsafeSerial_00024_open + (JNIEnv *, jobject, jstring, jint, jint, jboolean, jint); + +/* + * Class: akka_serial_sync_UnsafeSerial__ + * Method: debug + * Signature: (Z)V + */ +JNIEXPORT void JNICALL Java_akka_serial_sync_UnsafeSerial_00024_debug + (JNIEnv *, jobject, jboolean); + +#ifdef __cplusplus +} +#endif +#endif diff --git a/native/src/platform/posix/akka_serial.c b/native/src/platform/posix/akka_serial.c new file mode 100644 index 0000000..a2de056 --- /dev/null +++ b/native/src/platform/posix/akka_serial.c @@ -0,0 +1,263 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include "akka_serial.h" + +#define DATA_CANCEL 0xffffffff + +static bool debug = false; + +static void print_debug(const char* const msg, int en) +{ + if (debug) { + if (errno == 0) { + fprintf(stderr, "%s", msg); + } else { + fprintf(stderr, "%s: %d\n", msg, en); + } + fflush(stderr); + } +} + +void serial_debug(bool value) +{ + debug = value; +} + +//contains file descriptors used in managing a serial port +struct serial_config { + int port_fd; // file descriptor of serial port + + /* a pipe is used to abort a serial read by writing something into the + * write end of the pipe */ + int pipe_read_fd; // file descriptor, read end of pipe + int pipe_write_fd; // file descriptor, write end of pipe +}; + +int serial_open( + const char* const port_name, + int baud, + int char_size, + bool two_stop_bits, + int parity, + struct serial_config** serial) +{ + + int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK); + + if (fd < 0) { + int en = errno; + print_debug("Error obtaining file descriptor for port", en); + if (en == EACCES) return -E_ACCESS_DENIED; + if (en == ENOENT) return -E_NO_PORT; + return -E_IO; + } + + if (flock(fd, LOCK_EX | LOCK_NB) < 0) { + print_debug("Error acquiring lock on port", errno); + close(fd); + return -E_BUSY; + } + + /* configure new port settings */ + struct termios newtio; + + /* initialize serial interface */ + newtio.c_iflag = 0; + newtio.c_oflag = 0; + newtio.c_lflag = 0; + newtio.c_cflag = CREAD; + + /* set speed */ + speed_t bd; + switch (baud) { + case 50: bd = B50; break; + case 75: bd = B75; break; + case 110: bd = B110; break; + case 134: bd = B134; break; + case 150: bd = B150; break; + case 200: bd = B200; break; + case 300: bd = B300; break; + case 600: bd = B600; break; + case 1200: bd = B1200; break; + case 1800: bd = B1800; break; + case 2400: bd = B2400; break; + case 4800: bd = B4800; break; + case 9600: bd = B9600; break; + case 19200: bd = B19200; break; + case 38400: bd = B38400; break; + case 57600: bd = B57600; break; + case 115200: bd = B115200; break; + case 230400: bd = B230400; break; + default: + close(fd); + print_debug("Invalid baud rate", 0); + return -E_INVALID_SETTINGS; + } + + if (cfsetspeed(&newtio, bd) < 0) { + print_debug("Error setting baud rate", errno); + close(fd); + return -E_IO; + } + + /* set char size*/ + switch (char_size) { + case 5: newtio.c_cflag |= CS5; break; + case 6: newtio.c_cflag |= CS6; break; + case 7: newtio.c_cflag |= CS7; break; + case 8: newtio.c_cflag |= CS8; break; + default: + close(fd); + print_debug("Invalid character size", 0); + return -E_INVALID_SETTINGS; + } + + /* use two stop bits */ + if (two_stop_bits){ + newtio.c_cflag |= CSTOPB; + } + + /* set parity */ + switch (parity) { + case PARITY_NONE: break; + case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break; + case PARITY_EVEN: newtio.c_cflag |= PARENB; break; + default: + close(fd); + print_debug("Invalid parity", 0); + return -E_INVALID_SETTINGS; + } + + if (tcflush(fd, TCIOFLUSH) < 0) { + print_debug("Error flushing serial settings", errno); + close(fd); + return -E_IO; + } + + if (tcsetattr(fd, TCSANOW, &newtio) < 0) { + print_debug("Error applying serial settings", errno); + close(fd); + return -E_IO; + } + + int pipe_fd[2]; + if (pipe(pipe_fd) < 0) { + print_debug("Error opening pipe", errno); + close(fd); + return -E_IO; + } + + if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) { + print_debug("Error setting pipe to non-blocking", errno); + close(fd); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -E_IO; + } + + struct serial_config* s = malloc(sizeof(s)); + if (s == NULL) { + print_debug("Error allocating memory for serial configuration", errno); + close(fd); + close(pipe_fd[0]); + close(pipe_fd[1]); + return -E_IO; + } + + s->port_fd = fd; + s->pipe_read_fd = pipe_fd[0]; + s->pipe_write_fd = pipe_fd[1]; + (*serial) = s; + + return 0; +} + +int serial_close(struct serial_config* const serial) +{ + if (close(serial->pipe_write_fd) < 0) { + print_debug("Error closing write end of pipe", errno); + return -E_IO; + } + if (close(serial->pipe_read_fd) < 0) { + print_debug("Error closing read end of pipe", errno); + return -E_IO; + } + + if (flock(serial->port_fd, LOCK_UN) < 0){ + print_debug("Error releasing lock on port", errno); + return -E_IO; + } + if (close(serial->port_fd) < 0) { + print_debug("Error closing port", errno); + return -E_IO; + } + + free(serial); + return 0; +} + +int serial_read(struct serial_config* const serial, char* const buffer, size_t size) +{ + int port = serial->port_fd; + int pipe = serial->pipe_read_fd; + + fd_set rfds; + FD_ZERO(&rfds); + FD_SET(port, &rfds); + FD_SET(pipe, &rfds); + + int nfds = pipe + 1; + if (pipe < port) nfds = port + 1; + + int n = select(nfds, &rfds, NULL, NULL, NULL); + if (n < 0) { + print_debug("Error trying to call select on port and pipe", errno); + return -E_IO; + } + + if (FD_ISSET(pipe, &rfds)) { + return -E_INTERRUPT; + } else if (FD_ISSET(port, &rfds)) { + int r = read(port, buffer, size); + + // treat 0 bytes read as an error to avoid problems on disconnect + // anyway, after a poll there should be more than 0 bytes available to read + if (r <= 0) { + print_debug("Error data not available after select", errno); + return -E_IO; + } + return r; + } else { + print_debug("Select returned unknown read sets", 0); + return -E_IO; + } +} + +int serial_cancel_read(struct serial_config* const serial) +{ + int data = DATA_CANCEL; + + //write to pipe to wake up any blocked read thread (self-pipe trick) + if (write(serial->pipe_write_fd, &data, 1) < 0) { + print_debug("Error writing to pipe during read cancel", errno); + return -E_IO; + } + + return 0; +} + +int serial_write(struct serial_config* const serial, char* const data, size_t size) +{ + int r = write(serial->port_fd, data, size); + if (r < 0) { + print_debug("Error writing to port", errno); + return -E_IO; + } + return r; +} diff --git a/native/src/platform/windows/README b/native/src/platform/windows/README new file mode 100644 index 0000000..ea7c736 --- /dev/null +++ b/native/src/platform/windows/README @@ -0,0 +1 @@ +The contents of the file akka_serial.c were found in the avrdude project. They look like they may be a good starting point for serial communication on windows. diff --git a/native/src/platform/windows/akka_serial.c.disabled b/native/src/platform/windows/akka_serial.c.disabled new file mode 100644 index 0000000..86a267c --- /dev/null +++ b/native/src/platform/windows/akka_serial.c.disabled @@ -0,0 +1,416 @@ +/* + * avrdude - A Downloader/Uploader for AVR device programmers + * Copyright (C) 2003, 2004 Martin J. Thomas + * Copyright (C) 2006 Joerg Wunsch + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 2 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +/* $Id$ */ + +/* + * Native Win32 serial interface for avrdude. + */ + +#include "avrdude.h" + +#if defined(WIN32NATIVE) + +#include +#include +#include /* for isprint */ + +#include "serial.h" + +long serial_recv_timeout = 5000; /* ms */ + +#define W32SERBUFSIZE 1024 + +struct baud_mapping { + long baud; + DWORD speed; +}; + +/* HANDLE hComPort=INVALID_HANDLE_VALUE; */ + +static struct baud_mapping baud_lookup_table [] = { + { 1200, CBR_1200 }, + { 2400, CBR_2400 }, + { 4800, CBR_4800 }, + { 9600, CBR_9600 }, + { 19200, CBR_19200 }, + { 38400, CBR_38400 }, + { 57600, CBR_57600 }, + { 115200, CBR_115200 }, + { 0, 0 } /* Terminator. */ +}; + +static DWORD serial_baud_lookup(long baud) +{ + struct baud_mapping *map = baud_lookup_table; + + while (map->baud) { + if (map->baud == baud) + return map->speed; + map++; + } + + /* + * If a non-standard BAUD rate is used, issue + * a warning (if we are verbose) and return the raw rate + */ + if (verbose > 0) + fprintf(stderr, "%s: serial_baud_lookup(): Using non-standard baud rate: %ld", + progname, baud); + + return baud; +} + + +static BOOL serial_w32SetTimeOut(HANDLE hComPort, DWORD timeout) // in ms +{ + COMMTIMEOUTS ctmo; + ZeroMemory (&ctmo, sizeof(COMMTIMEOUTS)); + ctmo.ReadIntervalTimeout = timeout; + ctmo.ReadTotalTimeoutMultiplier = timeout; + ctmo.ReadTotalTimeoutConstant = timeout; + + return SetCommTimeouts(hComPort, &ctmo); +} + +static int ser_setspeed(union filedescriptor *fd, long baud) +{ + DCB dcb; + HANDLE hComPort = (HANDLE)fd->pfd; + + ZeroMemory (&dcb, sizeof(DCB)); + dcb.DCBlength = sizeof(DCB); + dcb.BaudRate = serial_baud_lookup (baud); + dcb.fBinary = 1; + dcb.fDtrControl = DTR_CONTROL_DISABLE; + dcb.fRtsControl = RTS_CONTROL_DISABLE; + dcb.ByteSize = 8; + dcb.Parity = NOPARITY; + dcb.StopBits = ONESTOPBIT; + + if (!SetCommState(hComPort, &dcb)) + return -1; + + return 0; +} + + +static int ser_open(char * port, long baud, union filedescriptor *fdp) +{ + LPVOID lpMsgBuf; + HANDLE hComPort=INVALID_HANDLE_VALUE; + char *newname = 0; + + /* + * If the port is of the form "net::", then + * handle it as a TCP connection to a terminal server. + * + * This is curently not implemented for Win32. + */ + if (strncmp(port, "net:", strlen("net:")) == 0) { + fprintf(stderr, + "%s: ser_open(): network connects are currently not" + "implemented for Win32 environments\n", + progname); + return -1; + } + + if (strncasecmp(port, "com", strlen("com")) == 0) { + + // prepend "\\\\.\\" to name, required for port # >= 10 + newname = malloc(strlen("\\\\.\\") + strlen(port) + 1); + + if (newname == 0) { + fprintf(stderr, + "%s: ser_open(): out of memory\n", + progname); + exit(1); + } + strcpy(newname, "\\\\.\\"); + strcat(newname, port); + + port = newname; + } + + hComPort = CreateFile(port, GENERIC_READ | GENERIC_WRITE, 0, NULL, + OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, NULL); + + if (hComPort == INVALID_HANDLE_VALUE) { + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL); + fprintf(stderr, "%s: ser_open(): can't open device \"%s\": %s\n", + progname, port, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + return -1; + } + + if (!SetupComm(hComPort, W32SERBUFSIZE, W32SERBUFSIZE)) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set buffers for \"%s\"\n", + progname, port); + return -1; + } + + fdp->pfd = (void *)hComPort; + if (ser_setspeed(fdp, baud) != 0) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set com-state for \"%s\"\n", + progname, port); + return -1; + } + + if (!serial_w32SetTimeOut(hComPort,0)) + { + CloseHandle(hComPort); + fprintf(stderr, "%s: ser_open(): can't set initial timeout for \"%s\"\n", + progname, port); + return -1; + } + + if (newname != 0) { + free(newname); + } + return 0; +} + + +static void ser_close(union filedescriptor *fd) +{ + HANDLE hComPort=(HANDLE)fd->pfd; + if (hComPort != INVALID_HANDLE_VALUE) + CloseHandle (hComPort); + + hComPort = INVALID_HANDLE_VALUE; +} + +static int ser_set_dtr_rts(union filedescriptor *fd, int is_on) +{ + HANDLE hComPort=(HANDLE)fd->pfd; + + if (is_on) { + EscapeCommFunction(hComPort, SETDTR); + EscapeCommFunction(hComPort, SETRTS); + } else { + EscapeCommFunction(hComPort, CLRDTR); + EscapeCommFunction(hComPort, CLRRTS); + } + return 0; +} + + +static int ser_send(union filedescriptor *fd, unsigned char * buf, size_t buflen) +{ + size_t len = buflen; + unsigned char c='\0'; + DWORD written; + unsigned char * b = buf; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_send(): port not open\n", + progname); + exit(1); + } + + if (!len) + return 0; + + if (verbose > 3) + { + fprintf(stderr, "%s: Send: ", progname); + + while (len) { + c = *b; + if (isprint(c)) { + fprintf(stderr, "%c ", c); + } + else { + fprintf(stderr, ". "); + } + fprintf(stderr, "[%02x] ", c); + b++; + len--; + } + fprintf(stderr, "\n"); + } + + serial_w32SetTimeOut(hComPort,500); + + if (!WriteFile (hComPort, buf, buflen, &written, NULL)) { + fprintf(stderr, "%s: ser_send(): write error: %s\n", + progname, "sorry no info avail"); // TODO + exit(1); + } + + if (written != buflen) { + fprintf(stderr, "%s: ser_send(): size/send mismatch\n", + progname); + exit(1); + } + + return 0; +} + + +static int ser_recv(union filedescriptor *fd, unsigned char * buf, size_t buflen) +{ + unsigned char c; + unsigned char * p = buf; + DWORD read; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_read(): port not open\n", + progname); + exit(1); + } + + serial_w32SetTimeOut(hComPort, serial_recv_timeout); + + if (!ReadFile(hComPort, buf, buflen, &read, NULL)) { + LPVOID lpMsgBuf; + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL ); + fprintf(stderr, "%s: ser_recv(): read error: %s\n", + progname, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + exit(1); + } + + /* time out detected */ + if (read == 0) { + if (verbose > 1) + fprintf(stderr, + "%s: ser_recv(): programmer is not responding\n", + progname); + return -1; + } + + p = buf; + + if (verbose > 3) + { + fprintf(stderr, "%s: Recv: ", progname); + + while (read) { + c = *p; + if (isprint(c)) { + fprintf(stderr, "%c ", c); + } + else { + fprintf(stderr, ". "); + } + fprintf(stderr, "[%02x] ", c); + + p++; + read--; + } + fprintf(stderr, "\n"); + } + return 0; +} + + +static int ser_drain(union filedescriptor *fd, int display) +{ + // int rc; + unsigned char buf[10]; + BOOL readres; + DWORD read; + + HANDLE hComPort=(HANDLE)fd->pfd; + + if (hComPort == INVALID_HANDLE_VALUE) { + fprintf(stderr, "%s: ser_drain(): port not open\n", + progname); + exit(1); + } + + serial_w32SetTimeOut(hComPort,250); + + if (display) { + fprintf(stderr, "drain>"); + } + + while (1) { + readres=ReadFile(hComPort, buf, 1, &read, NULL); + if (!readres) { + LPVOID lpMsgBuf; + FormatMessage( + FORMAT_MESSAGE_ALLOCATE_BUFFER | + FORMAT_MESSAGE_FROM_SYSTEM | + FORMAT_MESSAGE_IGNORE_INSERTS, + NULL, + GetLastError(), + MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language + (LPTSTR) &lpMsgBuf, + 0, + NULL ); + fprintf(stderr, "%s: ser_drain(): read error: %s\n", + progname, (char*)lpMsgBuf); + LocalFree( lpMsgBuf ); + exit(1); + } + + if (read) { // data avail + if (display) fprintf(stderr, "%02x ", buf[0]); + } + else { // no more data + if (display) fprintf(stderr, "/run`. + +All projects, including samples, can be listed by running `sbt projects`. diff --git a/samples/terminal-stream/src/main/scala/akka/serial/samples/terminalstream/Main.scala b/samples/terminal-stream/src/main/scala/akka/serial/samples/terminalstream/Main.scala new file mode 100644 index 0000000..de626a3 --- /dev/null +++ b/samples/terminal-stream/src/main/scala/akka/serial/samples/terminalstream/Main.scala @@ -0,0 +1,66 @@ +package akka.serial +package samples.terminalstream + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.io.StdIn + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.util.ByteString + +import stream.Serial + +object Main { + + final val Delay = FiniteDuration(500, MILLISECONDS) + + implicit val system = ActorSystem("terminal-stream") + implicit val materializer = ActorMaterializer() + + def ask(label: String, default: String) = { + print(label + " [" + default.toString + "]: ") + val in = StdIn.readLine() + println("") + if (in.isEmpty) default else in + } + + def main(args: Array[String]): Unit = { + import system.dispatcher + + val port = ask("Device", "/dev/ttyACM0") + val baud = ask("Baud rate", "115200").toInt + val cs = ask("Char size", "8").toInt + val tsb = ask("Use two stop bits", "false").toBoolean + val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) + val settings = SerialSettings(baud, cs, tsb, parity) + + val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] = + Serial().open(port, settings) + + val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data => + println("server says: " + data.decodeString("UTF-8")) + } + + val ticker: Source[ByteString, _] = Source.tick(Delay, Delay, ()).scan(0){case (x, _) => + x + 1 + }.map{ x => + println(x) + ByteString(x.toString) + } + + val connection: Future[Serial.Connection] = ticker.viaMat(serial)(Keep.right).to(printer).run() + + connection map { conn => + println("Connected to " + conn.port) + StdIn.readLine("Press enter to exit") + } recover { case err => + println("Cannot connect: " + err) + } andThen { case _ => + system.terminate() + } + + } + +} diff --git a/samples/terminal/src/main/scala/akka/serial/samples/terminal/ConsoleReader.scala b/samples/terminal/src/main/scala/akka/serial/samples/terminal/ConsoleReader.scala new file mode 100644 index 0000000..cc14dd4 --- /dev/null +++ b/samples/terminal/src/main/scala/akka/serial/samples/terminal/ConsoleReader.scala @@ -0,0 +1,29 @@ +package akka.serial +package samples.terminal + +import akka.actor.Actor +import scala.io.StdIn + +class ConsoleReader extends Actor { + import context._ + import ConsoleReader._ + + def receive = { + case Read => + StdIn.readLine() match { + case ":q" | null => parent ! EOT + case s => { + parent ! ConsoleInput(s) + } + } + } + +} + +object ConsoleReader { + + case object Read + case object EOT + case class ConsoleInput(in: String) + +} diff --git a/samples/terminal/src/main/scala/akka/serial/samples/terminal/Main.scala b/samples/terminal/src/main/scala/akka/serial/samples/terminal/Main.scala new file mode 100644 index 0000000..0226e72 --- /dev/null +++ b/samples/terminal/src/main/scala/akka/serial/samples/terminal/Main.scala @@ -0,0 +1,30 @@ +package akka.serial +package samples.terminal + +import akka.actor.ActorSystem +import scala.io.StdIn + +object Main { + + def ask(label: String, default: String) = { + print(label + " [" + default.toString + "]: ") + val in = StdIn.readLine() + println("") + if (in.isEmpty) default else in + } + + def main(args: Array[String]): Unit = { + val port = ask("Device", "/dev/ttyACM0") + val baud = ask("Baud rate", "115200").toInt + val cs = ask("Char size", "8").toInt + val tsb = ask("Use two stop bits", "false").toBoolean + val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) + val settings = SerialSettings(baud, cs, tsb, parity) + + println("Starting terminal system, enter :q to exit.") + Serial.debug(true) + val system = ActorSystem("akka-serial") + val terminal = system.actorOf(Terminal(port, settings), name = "terminal") + system.registerOnTermination(println("Stopped terminal system.")) + } +} diff --git a/samples/terminal/src/main/scala/akka/serial/samples/terminal/Terminal.scala b/samples/terminal/src/main/scala/akka/serial/samples/terminal/Terminal.scala new file mode 100644 index 0000000..55b0422 --- /dev/null +++ b/samples/terminal/src/main/scala/akka/serial/samples/terminal/Terminal.scala @@ -0,0 +1,75 @@ +package akka.serial +package samples.terminal + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } +import akka.io.IO +import akka.util.ByteString + +class Terminal(port: String, settings: SerialSettings) extends Actor with ActorLogging { + import Terminal._ + import context._ + + val reader = actorOf(Props[ConsoleReader]) + + log.info(s"Requesting manager to open port: ${port}, baud: ${settings.baud}") + IO(Serial) ! Serial.Open(port, settings) + + override def postStop() = { + system.terminate() + } + + def receive = { + case Serial.CommandFailed(cmd, reason) => { + log.error(s"Connection failed, stopping terminal. Reason: ${reason}") + context stop self + } + case Serial.Opened(port) => { + log.info(s"Port ${port} is now open.") + val operator = sender + context become opened(operator) + context watch operator + reader ! ConsoleReader.Read + } + } + + def opened(operator: ActorRef): Receive = { + + case Serial.Received(data) => { + log.info(s"Received data: ${formatData(data)}") + } + + case Terminal.Wrote(data) => log.info(s"Wrote data: ${formatData(data)}") + + case Serial.Closed => { + log.info("Operator closed normally, exiting terminal.") + context unwatch operator + context stop self + } + + case Terminated(`operator`) => { + log.error("Operator crashed, exiting terminal.") + context stop self + } + + case ConsoleReader.EOT => { + log.info("Initiating close.") + operator ! Serial.Close + } + + case ConsoleReader.ConsoleInput(input) => { + val data = ByteString(input.getBytes) + operator ! Serial.Write(data, length => Wrote(data.take(length))) + reader ! ConsoleReader.Read + } + } + +} + +object Terminal { + case class Wrote(data: ByteString) extends Serial.Event + + def apply(port: String, settings: SerialSettings) = Props(classOf[Terminal], port, settings) + + private def formatData(data: ByteString) = data.mkString("[", ",", "]") + " " + (new String(data.toArray, "UTF-8")) + +} diff --git a/samples/watcher/src/main/scala/akka/serial/samples/watcher/main.scala b/samples/watcher/src/main/scala/akka/serial/samples/watcher/main.scala new file mode 100644 index 0000000..d9a0d4e --- /dev/null +++ b/samples/watcher/src/main/scala/akka/serial/samples/watcher/main.scala @@ -0,0 +1,50 @@ +package akka.serial +package samples.watcher + +import akka.actor.{ Actor, ActorLogging, ActorSystem, Props } +import akka.io.IO +import scala.io.StdIn + +class Watcher extends Actor with ActorLogging { + import context._ + + val ports = List( + "/dev/ttyUSB\\d+", + "/dev/ttyACM\\d+", + "/dev/cu\\d+", + "/dev/ttyS\\d+" + ) + + override def preStart() = { + val cmd = Serial.Watch() + IO(Serial) ! cmd //watch for new devices + log.info(s"Watching ${cmd.directory} for new devices.") + } + + def receive = { + + case Serial.CommandFailed(w: Serial.Watch, err) => + log.error(err, s"Could not get a watch on ${w.directory}.") + context stop self + + case Serial.Connected(path) => + log.info(s"New device: ${path}") + ports.find(path matches _) match { + case Some(port) => log.info(s"Device is a serial device.") + case None => log.warning(s"Device is NOT serial device.") + } + + } + +} + +object Main { + + def main(args: Array[String]): Unit = { + val system = ActorSystem("akka-serial") + val watcher = system.actorOf(Props(classOf[Watcher]), name = "watcher") + StdIn.readLine() + system.terminate() + } + +} diff --git a/stream/build.sbt b/stream/build.sbt new file mode 100644 index 0000000..e66f1d5 --- /dev/null +++ b/stream/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaStream +libraryDependencies += Dependencies.scalatest % "test" diff --git a/stream/src/main/scala/akka/serial/stream/Serial.scala b/stream/src/main/scala/akka/serial/stream/Serial.scala new file mode 100644 index 0000000..785001f --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/Serial.scala @@ -0,0 +1,68 @@ +package akka.serial +package stream + +import akka.stream.scaladsl.Source +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +import akka.serial.{Serial => CoreSerial} +import impl._ + + +object Serial extends ExtensionId[Serial] with ExtensionIdProvider { + + /** + * Represents a prospective serial connection. + */ + case class Connection(port: String, settings: SerialSettings) + + case class Watch(ports: Set[String]) + + def apply()(implicit system: ActorSystem): Serial = super.apply(system) + + override def lookup() = Serial + + override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) + +} + +/** + * Entry point to streaming over serial ports. + * The design of this API is inspired by Akka's Tcp streams. + */ +class Serial(system: ExtendedActorSystem) extends Extension { + + /** + * Creates a Flow that will open a serial port when materialized. + * This Flow then represents an open serial connection: data pushed to its + * inlet will be written to the underlying serial port, and data received + * on the port will be emitted by its outlet. + * @param port name of serial port to open + * @param settings settings to use with serial port + * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped + * @param bufferSize maximum read and write buffer sizes + * @return a Flow associated to the given serial port + */ + def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): + Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( + new SerialConnectionStage( + IO(CoreSerial)(system), + port, + settings, + failOnOverflow, + bufferSize + ) + ) + + def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( + new WatcherStage( + IO(CoreSerial)(system), + ports + ) + ) + +} diff --git a/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala new file mode 100644 index 0000000..e2bb519 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala @@ -0,0 +1,5 @@ +package akka.serial +package stream + +/** Represents a generic exception occured during streaming of serial data. */ +class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala new file mode 100644 index 0000000..aafbfd6 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala @@ -0,0 +1,4 @@ +package akka.serial +package stream + +class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala new file mode 100644 index 0000000..a650c82 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala @@ -0,0 +1,172 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.{FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString + +import akka.serial.{Serial => CoreSerial, SerialSettings} + +/** + * Graph logic that handles establishing and forwarding serial communication. + * The underlying stream is closed when downstream (output) finishes, + * upstream (input) closes are ignored. + */ +private[stream] class SerialConnectionLogic( + shape: FlowShape[ByteString, ByteString], + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int, + connectionPromise: Promise[Serial.Connection]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + import SerialConnectionLogic._ + + /** Receives data and writes it to the serial backend. */ + private def in: Inlet[ByteString] = shape.in + + /** Receives data from the serial backend and pushes it downstream. */ + private def out: Outlet[ByteString] = shape.out + + /** Implicit alias to stageActor so it will be used in "!" calls, without + * explicitly specifying a sender. */ + implicit private def self = stageActor.ref + + /** + * Input handler for an established connection. + * @param operator the operator actor of the established connection + */ + class ConnectedInHandler(operator: ActorRef) extends InHandler { + + override def onPush(): Unit = { + val elem = grab(in) + require(elem != null) // reactive streams requirement + operator ! CoreSerial.Write(elem, _ => WriteAck) + } + + override def onUpstreamFinish(): Unit = { + if (isClosed(out)) { // close serial connection if output is also closed + operator ! CoreSerial.Close + } + } + + } + + class ConnectedOutHandler(operator: ActorRef) extends OutHandler { + // implicit alias to stage actor, so it will be used in "!" calls + implicit val self = stageActor.ref + + override def onPull(): Unit = { + // serial connections are at the end of the "backpressure chain", + // they do not natively support backpressure (as does TCP for example) + // therefore nothing is done here + } + + override def onDownstreamFinish(): Unit = { + // closing downstream also closes the underlying connection + operator ! CoreSerial.Close + } + + } + + override def preStart(): Unit = { + setKeepGoing(true) // serial connection operator will manage completing stage + getStageActor(connecting) + stageActor watch manager + manager ! CoreSerial.Open(port, settings, bufferSize) + } + + setHandler(in, IgnoreTerminateInput) + setHandler(out, IgnoreTerminateOutput) + + /** Initial behavior, before a serial connection is established. */ + private def connecting(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`manager`) => + val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.Opened(port) => + val operator = sender + setHandler(in, new ConnectedInHandler(operator)) + setHandler(out, new ConnectedOutHandler(operator)) + stageActor become connected(operator) + connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value + stageActor unwatch manager + stageActor watch operator + if (!isClosed(in)) { + pull(in) // start pulling input + } + + case other => + val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") + failStage(ex) + connectionPromise.failure(ex) + + } + + } + + /** Behaviour once a connection has been established. It is assumed that operator is not null. */ + private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`operator`) => + failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) + + case CoreSerial.CommandFailed(cmd, reason) => + failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) + + case CoreSerial.Closed => + completeStage() + + case CoreSerial.Received(data) => + if (isAvailable(out)) { + push(out, data) + } else if (failOnOverflow) { + /* Note that the native backend does not provide any way of informing about dropped serial + * data. However, in most cases, a computer capable of running akka-serial is also capable + * of processing incoming serial data at typical baud rates. Hence packets will usually + * only be dropped if an application that uses akka-serial backpressures, which can + * however be detected here. */ + failStage(new StreamSerialException("Incoming serial data was dropped.")) + } + + case WriteAck => + if (!isClosed(in)) { + pull(in) + } + + case other => + failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) + + } + + } + +} + +private[stream] object SerialConnectionLogic { + + case object WriteAck extends CoreSerial.Event + +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala new file mode 100644 index 0000000..aa67943 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala @@ -0,0 +1,49 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString + +/** + * Graph stage that establishes and thereby materializes a serial connection. + * The actual connection logic is deferred to [[SerialConnectionLogic]]. + */ +private[stream] class SerialConnectionStage( + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int +) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { + + val in: Inlet[ByteString] = Inlet("Serial.in") + val out: Outlet[ByteString] = Outlet("Serial.out") + + val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): + (GraphStageLogic, Future[Serial.Connection]) = { + + val connectionPromise = Promise[Serial.Connection] + + val logic = new SerialConnectionLogic( + shape, + manager, + port, + settings, + failOnOverflow, + bufferSize, + connectionPromise + ) + + (logic, connectionPromise.future) + } + + override def toString = s"Serial($port)" + +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala new file mode 100644 index 0000000..afab60e --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala @@ -0,0 +1,65 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import akka.serial.{Serial => CoreSerial} + +private[stream] class WatcherLogic( + shape: SourceShape[String], + ioManager: ActorRef, + ports: Set[String], + watchPromise: Promise[Serial.Watch]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + + implicit private def self = stageActor.ref + + override def preStart(): Unit = { + getStageActor(receive) + stageActor watch ioManager + for (dir <- WatcherLogic.getDirs(ports)) { + ioManager ! CoreSerial.Watch(dir, skipInitial = false) + } + } + + setHandler(shape.out, IgnoreTerminateOutput) + + private def receive(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`ioManager`) => + val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.Connected(port) => + if (ports contains port) { + if (isAvailable(shape.out)) { + push(shape.out, port) + } + } + + case other => + failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) + + } + } + +} + +private[stream] object WatcherLogic { + def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala new file mode 100644 index 0000000..649249f --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala @@ -0,0 +1,38 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} + + +private[stream] class WatcherStage( + ioManager: ActorRef, + ports: Set[String] +) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { + + val out = Outlet[String]("Watcher.out") + + val shape = new SourceShape(out) + + override def createLogicAndMaterializedValue(attributes: Attributes): + (GraphStageLogic, Future[Serial.Watch]) = { + + val promise = Promise[Serial.Watch] + + val logic = new WatcherLogic( + shape, + ioManager, + ports, + promise + ) + + (logic, promise.future) + } + + override def toString = s"Watcher($ports)" + +} diff --git a/stream/src/test/resources/application.conf b/stream/src/test/resources/application.conf new file mode 100644 index 0000000..bdf80e2 --- /dev/null +++ b/stream/src/test/resources/application.conf @@ -0,0 +1,3 @@ +# Don't fill test output with log messages +akka.stdout-loglevel = "OFF" +akka.loglevel = "OFF" \ No newline at end of file diff --git a/stream/src/test/scala/akka/serial/stream/SerialSpec.scala b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala new file mode 100644 index 0000000..7a64383 --- /dev/null +++ b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala @@ -0,0 +1,51 @@ +package akka.serial +package stream + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.ByteString +import org.scalatest._ + +class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { + + implicit val system = ActorSystem("akka-serial-test") + implicit val materializer = ActorMaterializer() + + override def afterAll { + system.terminate() + } + + "Serial stream" should { + val data = ByteString(("hello world").getBytes("utf-8")) + + "receive the same data it sends in an echo test" in { + withEcho { case (port, settings) => + val graph = Source.single(data) + .via(Serial().open(port, settings)) // send to echo pty + .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS + .dropWhile(_ != data) + .toMat(Sink.head)(Keep.right) + + Await.result(graph.run(), 2.seconds) + } + } + + "fail if the underlying pty fails" in { + val result = withEcho { case (port, settings) => + Source.single(data) + .via(Serial().open(port, settings)) + .toMat(Sink.last)(Keep.right) + .run()} + + intercept[StreamSerialException] { + Await.result(result, 10.seconds) + } + } + + } + +} diff --git a/sync/build.sbt b/sync/build.sbt new file mode 100644 index 0000000..547a3fd --- /dev/null +++ b/sync/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.scalatest % "test" + +target in javah := (baseDirectory in ThisBuild).value / "native" / "src" / "include" diff --git a/sync/src/main/scala/akka/serial/Parity.scala b/sync/src/main/scala/akka/serial/Parity.scala new file mode 100644 index 0000000..a1e1d04 --- /dev/null +++ b/sync/src/main/scala/akka/serial/Parity.scala @@ -0,0 +1,9 @@ +package akka.serial + +/** Specifies available parities used in serial communication. */ +object Parity extends Enumeration { + type Parity = Value + val None = Value(0) + val Odd = Value(1) + val Even = Value(2) +} diff --git a/sync/src/main/scala/akka/serial/SerialSettings.scala b/sync/src/main/scala/akka/serial/SerialSettings.scala new file mode 100644 index 0000000..56345ae --- /dev/null +++ b/sync/src/main/scala/akka/serial/SerialSettings.scala @@ -0,0 +1,10 @@ +package akka.serial + +/** + * Groups settings used in communication over a serial port. + * @param baud baud rate to use with serial port + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + */ +case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) diff --git a/sync/src/main/scala/akka/serial/exceptions.scala b/sync/src/main/scala/akka/serial/exceptions.scala new file mode 100644 index 0000000..26dfceb --- /dev/null +++ b/sync/src/main/scala/akka/serial/exceptions.scala @@ -0,0 +1,19 @@ +package akka.serial + +/** The requested port could not be found. */ +class NoSuchPortException(message: String) extends Exception(message) + +/** The requested port is in use by someone else. */ +class PortInUseException(message: String) extends Exception(message) + +/** Permissions are not sufficient to open a serial port. */ +class AccessDeniedException(message: String) extends Exception(message) + +/** The settings specified are invalid. */ +class InvalidSettingsException(message: String) extends Exception(message) + +/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */ +class PortInterruptedException(message: String) extends Exception(message) + +/** The specified port has been closed. */ +class PortClosedException(message: String) extends Exception(message) diff --git a/sync/src/main/scala/akka/serial/sync/SerialConnection.scala b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala new file mode 100644 index 0000000..7d80a4c --- /dev/null +++ b/sync/src/main/scala/akka/serial/sync/SerialConnection.scala @@ -0,0 +1,142 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer +import java.util.concurrent.atomic.AtomicBoolean + +/** + * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In + * contrast to the latter, this class encapsulates and secures any pointers used to communicate with + * the native backend and is thread-safe. + * + * The underlying serial port is assumed open when this class is initialized. + */ +class SerialConnection private ( + unsafe: UnsafeSerial, + val port: String +) { + + private var reading: Boolean = false + private val readLock = new Object + + private var writing: Boolean = false + private val writeLock = new Object + + private val closed = new AtomicBoolean(false) + + /** + * Checks if this serial port is closed. + */ + def isClosed = closed.get() + + /** + * Closes the underlying serial connection. Any callers blocked on read or write will return. + * A call of this method has no effect if the serial port is already closed. + * @throws IOException on IO error + */ + def close(): Unit = this.synchronized { + if (!closed.get) { + closed.set(true) + unsafe.cancelRead() + readLock.synchronized { + while (reading) this.wait() + } + writeLock.synchronized { + while (writing) this.wait() + } + unsafe.close() + } + } + + /** + * Reads data from underlying serial connection into a ByteBuffer. + * Note that data is read into the buffer's memory, starting at the + * first position. The buffer's limit is set to the number of bytes + * read. + * + * A call to this method is blocking, however it is interrupted + * if the connection is closed. + * + * This method works only for direct buffers. + * + * @param buffer a ByteBuffer into which data is read + * @return the actual number of bytes read + * @throws PortInterruptedException if port is closed while reading + * @throws IOException on IO error + */ + def read(buffer: ByteBuffer): Int = readLock.synchronized { + if (!closed.get) { + try { + reading = true + val n = unsafe.read(buffer) + buffer.limit(n) + n + } finally { + reading = false + if (closed.get) readLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + + /** + * Writes data from a ByteBuffer to underlying serial connection. + * Note that data is read from the buffer's memory, its attributes + * such as position and limit are not modified. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * This method works only for direct buffers. + * + * @param buffer a ByteBuffer from which data is taken + * @return the actual number of bytes written + * @throws IOException on IO error + */ + def write(buffer: ByteBuffer): Int = writeLock.synchronized { + if (!closed.get) { + try { + writing = true + unsafe.write(buffer, buffer.position) + } finally { + writing = false + if (closed.get) writeLock.notify() + } + } else { + throw new PortClosedException(s"${port} is closed") + } + } + +} + +object SerialConnection { + + /** + * Opens a new connection to a serial port. + * This method acts as a factory to creating serial connections. + * + * @param port name of serial port to open + * @param settings settings with which to initialize the connection + * @return an instance of the open serial connection + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + def open( + port: String, + settings: SerialSettings + ): SerialConnection = synchronized { + val pointer = UnsafeSerial.open( + port, + settings.baud, + settings.characterSize, + settings.twoStopBits, + settings.parity.id + ) + new SerialConnection(new UnsafeSerial(pointer), port) + } + +} diff --git a/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala new file mode 100644 index 0000000..dcca960 --- /dev/null +++ b/sync/src/main/scala/akka/serial/sync/UnsafeSerial.scala @@ -0,0 +1,109 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer + +import ch.jodersky.jni.nativeLoader + +/** + * Low-level wrapper of native serial backend. + * + * WARNING: Methods in this class allocate native structures and deal with pointers. These + * pointers are handled as longs by java and are NOT checked for correctness, therefore passing + * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. + * + * See SerialConnection for a higher-level, more secured wrapper + * of serial communication. + * + * @param serialAddr address of natively allocated serial configuration structure + */ +@nativeLoader("akkaserial1") +private[serial] class UnsafeSerial(final val serialAddr: Long) { + + final val ParityNone: Int = 0 + final val ParityOdd: Int = 1 + final val ParityEven: Int = 2 + + /** + * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only + * read into the buffer's allocated memory, its position or limit are not changed. + * + * The read is blocking, however it may be interrupted by calling cancelRead() on the given + * serial port. + * + * @param buffer direct ByteBuffer to read into + * @return number of bytes actually read + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws PortInterruptedException if the call to this function was interrupted + * @throws IOException on IO error + */ + @native def read(buffer: ByteBuffer): Int + + /** + * Cancels a read (any caller to read or readDirect will return with a + * PortInterruptedException). This function may be called from any thread. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def cancelRead(): Unit + + /** + * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is + * only taken from the buffer's allocated memory, its position or limit are not changed. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * @param serial address of natively allocated serial configuration structure + * @param buffer direct ByteBuffer from which data is taken + * @param length actual amount of data that should be taken from the buffer (this is needed since the native + * backend does not provide a way to query the buffer's current limit) + * @return number of bytes actually written + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws IOException on IO error + */ + @native def write(buffer: ByteBuffer, length: Int): Int + + /** + * Closes an previously open serial port. Natively allocated resources are freed and the serial + * pointer becomes invalid, therefore this function should only be called ONCE per open serial + * port. + * + * A port should not be closed while it is used (by a read or write) as this + * results in undefined behaviour. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def close(): Unit + +} + +private[serial] object UnsafeSerial { + + /** + * Opens a serial port. + * + * @param port name of serial port to open + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + * @return address of natively allocated serial configuration structure + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + @native def debug(value: Boolean): Unit + +} diff --git a/sync/src/test/scala/akka/serial/PseudoTerminal.scala b/sync/src/test/scala/akka/serial/PseudoTerminal.scala new file mode 100644 index 0000000..3e9e9fe --- /dev/null +++ b/sync/src/test/scala/akka/serial/PseudoTerminal.scala @@ -0,0 +1,43 @@ +package akka.serial + +import java.io.{File, IOException} +import java.nio.file.Files + +import scala.concurrent.duration._ +import scala.sys.process._ +import scala.util.control.NonFatal + +trait PseudoTerminal { + + final val SetupTimeout = 100.milliseconds + + def withEcho[A](action: (String, SerialSettings) => A): A = { + val dir = Files.createTempDirectory("akka-serial-pty").toFile + val pty = new File(dir, "pty") + + val socat = try { + val s = Seq( + "socat", + "-d -d", + s"exec:cat,pty,raw,b115200,echo=0", + s"pty,raw,b115200,echo=0,link=${pty.getAbsolutePath}" + ).run(ProcessLogger(println(_)), false) + Thread.sleep(SetupTimeout.toMillis) // allow ptys to set up + s + } catch { + case NonFatal(ex) => + throw new IOException( + "Error running echo service, make sure the program 'socat' is installed", ex) + } + + try { + val result = action(pty.getAbsolutePath, SerialSettings(baud = 115200)) + Thread.sleep(SetupTimeout.toMillis) // allow for async cleanup before destroying ptys + result + } finally { + socat.destroy() + dir.delete() + } + } + +} diff --git a/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala new file mode 100644 index 0000000..24069d7 --- /dev/null +++ b/sync/src/test/scala/akka/serial/sync/SerialConnectionSpec.scala @@ -0,0 +1,101 @@ +package akka.serial +package sync + +import java.nio.ByteBuffer +import org.scalatest._ + +class SerialConnectionSpec extends WordSpec with PseudoTerminal { + + def withEchoConnection[A](action: SerialConnection => A): A = { + withEcho { (port, settings) => + val connection = SerialConnection.open(port, settings) + try { + action(connection) + } finally { + connection.close() + } + } + } + + "A SerialConnection" should { + + "open a valid port" in { + withEcho { (port, settings) => + SerialConnection.open(port, settings) + } + } + + "throw an exception on an invalid port" in { + val settings = SerialSettings(baud = 115200) + intercept[NoSuchPortException] { + SerialConnection.open("/dev/nonexistant", settings) + } + } + + "read the same data it writes to an echo pty" in { + withEchoConnection { conn => + /* Note: this test assumes that all data will be written and read + * within single write and read calls. This in turn assumes that + * internal operating system buffers have enough capacity to + * store all data. */ + val bufferSize = 64 + + val outString = "hello world" + val outBuffer = ByteBuffer.allocateDirect(bufferSize) + val outData = outString.getBytes + outBuffer.put(outData) + conn.write(outBuffer) + + val inBuffer = ByteBuffer.allocateDirect(bufferSize) + conn.read(inBuffer) + val inData = new Array[Byte](inBuffer.remaining()) + inBuffer.get(inData) + val inString = new String(inData) + + assert(inString == outString) + } + } + + "interrupt a read when closing a port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + + val closer = new Thread { + override def run(): Unit = { + Thread.sleep(100) + conn.close() + } + } + closer.start() + intercept[PortInterruptedException]{ + conn.read(buffer) + } + closer.join() + } + } + + "throw an exception when reading from a closed port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + conn.close() + + intercept[PortClosedException]{ + conn.read(buffer) + } + } + } + + "throw an exception when writing to a closed port" in { + withEchoConnection { conn => + val buffer = ByteBuffer.allocateDirect(64) + conn.close() + + intercept[PortClosedException]{ + conn.write(buffer) + } + } + } + + } + +} -- cgit v1.2.3