aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2014-03-27 18:28:11 +0100
committerJakob Odersky <jodersky@gmail.com>2014-03-27 18:28:11 +0100
commitd3960d3748e7b83f3ad18fa226ed88c7a5b01941 (patch)
tree8719e58d0bbfb794bcf20a2843af115e0c1bf5d5
parent35e986e27fc77421e009594770be8317ae8ec08d (diff)
downloadakka-serial-d3960d3748e7b83f3ad18fa226ed88c7a5b01941.tar.gz
akka-serial-d3960d3748e7b83f3ad18fa226ed88c7a5b01941.tar.bz2
akka-serial-d3960d3748e7b83f3ad18fa226ed88c7a5b01941.zip
restructure jvm side for use of direct buffers
-rw-r--r--flow/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java180
-rw-r--r--flow/src/main/native/include/flow.h40
-rw-r--r--flow/src/main/native/posix/Makefile12
-rw-r--r--flow/src/main/native/posix/flow.c456
-rw-r--r--flow/src/main/native/posix/flow_jni.c70
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/Serial.scala14
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala10
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala58
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala3
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala92
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/Reader.scala40
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala3
-rw-r--r--flow/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala157
13 files changed, 650 insertions, 485 deletions
diff --git a/flow/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
index 524803e..ec058da 100644
--- a/flow/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
+++ b/flow/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
@@ -1,75 +1,145 @@
package com.github.jodersky.flow.internal;
-import com.github.jodersky.flow.internal.NativeLoader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
-/** Thin layer on top of native code. */
-class NativeSerial {
+import com.github.jodersky.flow.AccessDeniedException;
+import com.github.jodersky.flow.InvalidSettingsException;
+import com.github.jodersky.flow.NoSuchPortException;
+import com.github.jodersky.flow.PortInUseException;
+import com.github.jodersky.flow.PortInterruptedException;
+
+/**
+ * Low-level wrapper on top of native serial backend.
+ *
+ * WARNING: Methods in this class allocate native structures and deal with pointers.
+ * These pointers are handled as longs by java and are NOT checked for correctness,
+ * therefore passing invalid pointers may have unexpected results, including but not
+ * limited to crashing the VM.
+ *
+ * See SerialConnection for a higher level, more secured wrapper
+ * of serial communication.
+ *
+ * @see com.github.jodersky.flow.internal.SerialConnection
+ */
+final class NativeSerial {
static {
NativeLoader.load();
}
- final static int E_IO = -1;
- final static int E_ACCESS_DENIED = -2;
- final static int E_BUSY = -3;
- final static int E_INVALID_SETTINGS = -4;
- final static int E_INTERRUPT = -5;
- final static int E_NO_PORT = -6;
-
final static int PARITY_NONE = 0;
final static int PARITY_ODD = 1;
final static int PARITY_EVEN = 2;
-
- /**Opens a serial port and allocates memory for storing configuration. Note: if this function fails,
- * any internally allocated resources will be freed.
- * @param device name of port
- * @param baud baud rate
- * @param characterSize character size of data transmitted through serial device
+
+ /**
+ * Opens a serial port.
+ *
+ * @param port name of serial port to open
+ * @param characterSize size of a character of the data sent through the serial port
* @param twoStopBits set to use two stop bits instead of one
- * @param parity kind of parity checking to use
- * @param serial pointer to memory that will be allocated with a serial structure
- * @return 0 on success
- * @return E_NO_PORT if the given port does not exist
- * @return E_ACCESS_DENIED if permissions are not sufficient to open port
- * @return E_BUSY if port is already in use
- * @return E_INVALID_SETTINGS if any of the specified settings are not supported
- * @return E_IO on other error */
- native static int open(String device, int baud, int characterSize, boolean twoStopBits, int parity, long[] serial);
+ * @param parity type of parity to use with serial port
+ * @return address of natively allocated serial configuration structure
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ native static long open(String port, int baud, int characterSize, boolean twoStopBits, int parity)
+ throws NoSuchPortException, AccessDeniedException, PortInUseException, InvalidSettingsException, IOException;
- /**Starts a blocking read from a previously opened serial port. The read is blocking, however it may be
- * interrupted by calling 'serial_interrupt' on the given serial port.
- * @param serial pointer to serial configuration from which to read
- * @param buffer buffer into which data is read
- * @param size maximum buffer size
- * @return n>0 the number of bytes read into buffer
- * @return E_INTERRUPT if the call to this function was interrupted
- * @return E_IO on IO error */
- native static int read(long serial, byte[] buffer);
+ /**
+ * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only read into the
+ * buffer's allocated memory, its position or limit are not changed.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer to read into
+ * @return number of bytes actually read
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ native static int readDirect(long serial, ByteBuffer buffer)
+ throws IllegalArgumentException, PortInterruptedException, IOException;
+
+ /**
+ * Reads data from a previously opened serial port into an array.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer array to read data into
+ * @return number of bytes actually read
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ native static int read(long serial, byte[] buffer)
+ throws PortInterruptedException, IOException;
- /**Writes data to a previously opened serial port.
- * @param serial pointer to serial configuration to which to write
- * @param data data to write
- * @param size number of bytes to write from data
- * @return n>0 the number of bytes written
- * @return E_IO on IO error */
- native static int write(long serial, byte[] buffer);
+ /**
+ * Cancels a read (any caller to read or readDirect will return with a PortInterruptedException). This function may be called from any thread.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ native static void cancelRead(long serial)
+ throws IOException;
+
+ /**
+ * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is only taken from
+ * the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer from which data is taken
+ * @param length actual amount of data that should be taken from the buffer (this is needed since the native
+ * backend does not provide a way to query the buffer's current limit)
+ * @return number of bytes actually written
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws IOException on IO error
+ */
+ native static int writeDirect(long serial, ByteBuffer buffer, int length)
+ throws IllegalArgumentException, IOException;
- /**Interrupts a blocked read call.
- * @param serial_config the serial port to interrupt
- * @return 0 on success
- * @return E_IO on error */
- native static int interrupt(long serial);
+ /**
+ * Writes data from an array to a previously opened serial port.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer array from which data is taken
+ * @param length actual amount of data that should be taken from the buffer
+ * @return number of bytes actually written
+ * @throws IOException on IO error
+ */
+ native static int write(long serial, byte[] buffer, int length)
+ throws IOException;
- /**Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to
- * this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT
- * thread safe, make sure no read or write is in prgress when this function is called (the reason is that per
- * close manual page, close should not be called on a file descriptor that is in use by another thread).
- * @param serial pointer to serial configuration that is to be closed (and freed)
- * @return 0 on success
- * @return E_IO on error */
- native static int close(long serial);
+ /**
+ * Closes an previously open serial port. Natively allocated resources are freed and the serial pointer becomes invalid,
+ * therefore this function should only be called ONCE per open serial port.
+ *
+ * A port should not be closed while it is used (by a read or write) as this
+ * results in undefined behaviour.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ native static void close(long serial)
+ throws IOException;
- /**Sets debugging option. If debugging is enabled, detailed error message are printed (to stderr) from method calls. */
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
native static void debug(boolean value);
}
diff --git a/flow/src/main/native/include/flow.h b/flow/src/main/native/include/flow.h
index ebb845a..d6a173e 100644
--- a/flow/src/main/native/include/flow.h
+++ b/flow/src/main/native/include/flow.h
@@ -2,9 +2,14 @@
#define FLOW_H
#include <stdbool.h>
+#include <stddef.h>
+
+#ifdef _cplusplus
+extern "C" {
+#endif
//general error codes that are returned by functions
-#define E_IO -1 //I/O error (port should be closed)
+#define E_IO -1 //IO error
#define E_ACCESS_DENIED -2 //access denied
#define E_BUSY -3 // port is busy
#define E_INVALID_SETTINGS -4 // some port settings are invalid
@@ -15,7 +20,7 @@
#define PARITY_ODD 1
#define PARITY_EVEN 2
-/** Represents an open serial port. */
+/** Contains internal configuration of an open serial port. */
struct serial_config;
/**Opens a serial port and allocates memory for storing configuration. Note: if this function fails,
@@ -30,15 +35,15 @@ struct serial_config;
* @return E_NO_PORT if the given port does not exist
* @return E_ACCESS_DENIED if permissions are not sufficient to open port
* @return E_BUSY if port is already in use
- * @return E_INVALID_SETTINGS if any of the specified settings are not supported
+ * @return E_INVALID_SETTINGS if any of the specified settings are invalid
* @return E_IO on other error */
int serial_open(
- const char* port_name,
- int baud,
- int char_size,
- bool two_stop_bits,
- int parity,
- struct serial_config** serial);
+ const char* port_name,
+ int baud,
+ int char_size,
+ bool two_stop_bits,
+ int parity,
+ struct serial_config** const serial);
/**Closes a previously opened serial port and frees memory containing the configuration. Note: after a call to
* this function, the 'serial' pointer will become invalid, make sure you only call it once. This function is NOT
@@ -47,23 +52,24 @@ int serial_open(
* @param serial pointer to serial configuration that is to be closed (and freed)
* @return 0 on success
* @return E_IO on error */
-int serial_close(struct serial_config* serial);
+int serial_close(struct serial_config* const serial);
/**Starts a read from a previously opened serial port. The read is blocking, however it may be
- * interrupted by calling 'serial_interrupt' on the given serial port.
+ * interrupted by calling 'serial_cancel_read' on the given serial port.
* @param serial pointer to serial configuration from which to read
* @param buffer buffer into which data is read
* @param size maximum buffer size
* @return n>0 the number of bytes read into buffer
* @return E_INTERRUPT if the call to this function was interrupted
* @return E_IO on IO error */
-int serial_read(struct serial_config* serial, unsigned char* buffer, size_t size);
+int serial_read(struct serial_config* const serial, unsigned char* const buffer, size_t size);
-/**Interrupts a blocked read call.
+/**Cancels a blocked read call. This function is thread safe, i.e. it may be called from a thread even
+ * while another thread is blocked in a read call.
* @param serial_config the serial port to interrupt
* @return 0 on success
* @return E_IO on error */
-int serial_interrupt(struct serial_config* serial);
+int serial_cancel_read(struct serial_config* const serial);
/**Writes data to a previously opened serial port. Non bocking.
* @param serial pointer to serial configuration to which to write
@@ -71,10 +77,14 @@ int serial_interrupt(struct serial_config* serial);
* @param size number of bytes to write from data
* @return n>0 the number of bytes written
* @return E_IO on IO error */
-int serial_write(struct serial_config* serial, unsigned char* data, size_t size);
+int serial_write(struct serial_config* const serial, unsigned char* const data, size_t size);
/**Sets debugging option. If debugging is enabled, detailed error message are printed from method calls. */
void serial_debug(bool value);
+#ifdef __cplusplus
+}
+#endif
+
#endif /* FLOW_H */
diff --git a/flow/src/main/native/posix/Makefile b/flow/src/main/native/posix/Makefile
index d144ec5..81e0fad 100644
--- a/flow/src/main/native/posix/Makefile
+++ b/flow/src/main/native/posix/Makefile
@@ -1,21 +1,23 @@
# Build native binaries for flow.
# This makefile is intended for linux-based operating systems.
-TARGET=libflow.so
+TARGET?=libflow.so
MAJOR_VERSION=2
CC=$(CROSS_COMPILE)gcc
CFLAGS= -O2 -fPIC
-LD=$(CROSS_COMPILE)gcc
+LD=$(CROSS_COMPILE)ld
LDFLAGS=-shared -Wl,-soname,$(TARGET).$(MAJOR_VERSION)
-INCLUDES=../include/ /usr/lib/jvm/java-7-oracle/include/ /usr/lib/jvm/java-7-oracle/include/linux/
+INCLUDES?=../include/ /usr/lib/jvm/java-7-oracle/include/ /usr/lib/jvm/java-7-oracle/include/linux/
+
+OBJECTS=flow.o flow_jni.o
all: $(TARGET)
-$(TARGET): flow.o
+$(TARGET): $(OBJECTS)
$(CC) $(LDFLAGS) -o $@ $<
-flow.o: flow.c
+%.o: %.c
$(CC) $(CFLAGS) $(addprefix -I, $(INCLUDES)) -o $@ -c $<
clean:
diff --git a/flow/src/main/native/posix/flow.c b/flow/src/main/native/posix/flow.c
index caf7b9b..18aa9c2 100644
--- a/flow/src/main/native/posix/flow.c
+++ b/flow/src/main/native/posix/flow.c
@@ -4,309 +4,241 @@
#include <errno.h>
#include <termios.h>
#include <fcntl.h>
-#include "com_github_jodersky_flow_internal_NativeSerial.h"
#include "flow.h"
+#define DATA_CANCEL 0xffffffff
+
static bool debug = false;
#define DEBUG(f) if (debug) {f}
void serial_debug(bool value) {
- debug = value;
+ debug = value;
}
//contains file descriptors used in managing a serial port
struct serial_config {
-
- int port_fd; // file descriptor of serial port
-
- /* a pipe is used to abort a serial read by writing something into the
- * write end of the pipe */
- int pipe_read_fd; // file descriptor, read end of pipe
- int pipe_write_fd; // file descriptor, write end of pipe
-
+ int port_fd; // file descriptor of serial port
+
+ /* a pipe is used to abort a serial read by writing something into the
+ * write end of the pipe */
+ int pipe_read_fd; // file descriptor, read end of pipe
+ int pipe_write_fd; // file descriptor, write end of pipe
};
int serial_open(
- const char* port_name,
- int baud,
- int char_size,
- bool two_stop_bits,
- int parity,
- struct serial_config** serial) {
-
- int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK);
-
- if (fd < 0) {
- int en = errno;
- DEBUG(perror("obtain file descriptor"););
- if (en == EACCES) return E_ACCESS_DENIED;
- if (en == ENOENT) return E_NO_PORT;
- return E_IO;
- }
-
- if (flock(fd, LOCK_EX | LOCK_NB) < 0) {
- DEBUG(perror("acquire lock on port"););
- close(fd);
- return E_BUSY;
- }
-
- /* configure new port settings */
- struct termios newtio;
+ const char* const port_name,
+ int baud,
+ int char_size,
+ bool two_stop_bits,
+ int parity,
+ struct serial_config** serial) {
+
+ int fd = open(port_name, O_RDWR | O_NOCTTY | O_NONBLOCK);
+
+ if (fd < 0) {
+ DEBUG(perror("error obtaining port file descriptor"););
+ if (errno == EACCES) return E_ACCESS_DENIED;
+ if (errno == ENOENT) return E_NO_PORT;
+ return E_IO;
+ }
- /* following calls correspond to makeraw() */
- newtio.c_iflag &= ~(IGNBRK | BRKINT | PARMRK | ISTRIP | INLCR | IGNCR | ICRNL | IXON);
- newtio.c_oflag &= ~OPOST;
- newtio.c_lflag &= ~(ECHO | ECHONL | ICANON | ISIG | IEXTEN);
- newtio.c_cflag &= ~(CSIZE | PARENB);
+ if (flock(fd, LOCK_EX | LOCK_NB) < 0) {
+ DEBUG(perror("error acquiring lock on port"););
+ close(fd);
+ return E_BUSY;
+ }
- /* set speed */
- speed_t bd;
- switch (baud) {
- case 50: bd = B50; break;
- case 75: bd = B75; break;
- case 110: bd = B110; break;
- case 134: bd = B134; break;
- case 150: bd = B150; break;
- case 200: bd = B200; break;
- case 300: bd = B300; break;
- case 600: bd = B600; break;
- case 1200: bd = B1200; break;
- case 1800: bd = B1800; break;
- case 2400: bd = B2400; break;
- case 4800: bd = B4800; break;
- case 9600: bd = B9600; break;
- case 19200: bd = B19200; break;
- case 38400: bd = B38400; break;
- case 57600: bd = B57600; break;
- case 115200: bd = B115200; break;
- case 230400: bd = B230400; break;
- default:
- close(fd);
- return E_INVALID_SETTINGS;
- }
+ /* configure new port settings */
+ struct termios newtio;
+
+ /* following calls correspond to makeraw() */
+ newtio.c_iflag &= ~(IGNBRK | BRKINT | PARMRK | ISTRIP | INLCR | IGNCR | ICRNL | IXON);
+ newtio.c_oflag &= ~OPOST;
+ newtio.c_lflag &= ~(ECHO | ECHONL | ICANON | ISIG | IEXTEN);
+ newtio.c_cflag &= ~(CSIZE | PARENB);
+
+ /* set speed */
+ speed_t bd;
+ switch (baud) {
+ case 50: bd = B50; break;
+ case 75: bd = B75; break;
+ case 110: bd = B110; break;
+ case 134: bd = B134; break;
+ case 150: bd = B150; break;
+ case 200: bd = B200; break;
+ case 300: bd = B300; break;
+ case 600: bd = B600; break;
+ case 1200: bd = B1200; break;
+ case 1800: bd = B1800; break;
+ case 2400: bd = B2400; break;
+ case 4800: bd = B4800; break;
+ case 9600: bd = B9600; break;
+ case 19200: bd = B19200; break;
+ case 38400: bd = B38400; break;
+ case 57600: bd = B57600; break;
+ case 115200: bd = B115200; break;
+ case 230400: bd = B230400; break;
+ default:
+ close(fd);
+ DEBUG(fprintf(stderr, "invalid baud rate %d\n", baud););
+ return E_INVALID_SETTINGS;
+ }
- if (cfsetspeed(&newtio, bd) < 0) {
- DEBUG(perror("set baud rate"););
- close(fd);
- return E_IO;
- }
+ if (cfsetspeed(&newtio, bd) < 0) {
+ DEBUG(perror("error setting baud rate"););
+ close(fd);
+ return E_IO;
+ }
- /* set char size*/
- switch (char_size) {
- case 5: newtio.c_cflag |= CS5; break;
- case 6: newtio.c_cflag |= CS6; break;
- case 7: newtio.c_cflag |= CS7; break;
- case 8: newtio.c_cflag |= CS8; break;
- default:
- close(fd);
- return E_INVALID_SETTINGS;
- }
+ /* set char size*/
+ switch (char_size) {
+ case 5: newtio.c_cflag |= CS5; break;
+ case 6: newtio.c_cflag |= CS6; break;
+ case 7: newtio.c_cflag |= CS7; break;
+ case 8: newtio.c_cflag |= CS8; break;
+ default:
+ close(fd);
+ DEBUG(fprintf(stderr, "invalid character size %d\n", char_size););
+ return E_INVALID_SETTINGS;
+ }
- /* use two stop bits */
- if (two_stop_bits){
- newtio.c_cflag |= CSTOPB;
- }
+ /* use two stop bits */
+ if (two_stop_bits){
+ newtio.c_cflag |= CSTOPB;
+ }
- /* set parity */
- switch (parity) {
- case PARITY_NONE: break;
- case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break;
- case PARITY_EVEN: newtio.c_cflag |= PARENB; break;
- default:
- close(fd);
- return E_INVALID_SETTINGS;
- }
+ /* set parity */
+ switch (parity) {
+ case PARITY_NONE: break;
+ case PARITY_ODD: newtio.c_cflag |= (PARENB | PARODD); break;
+ case PARITY_EVEN: newtio.c_cflag |= PARENB; break;
+ default:
+ close(fd);
+ DEBUG(fprintf(stderr, "invalid parity %d\n", parity););
+ return E_INVALID_SETTINGS;
+ }
- if (tcflush(fd, TCIOFLUSH) < 0) {
- DEBUG(perror("flush serial settings"););
- close(fd);
- return E_IO;
- }
+ if (tcflush(fd, TCIOFLUSH) < 0) {
+ DEBUG(perror("error flushing serial settings"););
+ close(fd);
+ return E_IO;
+ }
- if (tcsetattr(fd, TCSANOW, &newtio) < 0) {
- DEBUG(perror("apply serial settings"););
- close(fd);
- return E_IO;
- }
+ if (tcsetattr(fd, TCSANOW, &newtio) < 0) {
+ DEBUG(perror("error applying serial settings"););
+ close(fd);
+ return E_IO;
+ }
- int pipe_fd[2];
- if (pipe(pipe_fd) < 0) {
- DEBUG(perror("open pipe"););
- close(fd);
- return E_IO;
- }
+ int pipe_fd[2];
+ if (pipe(pipe_fd) < 0) {
+ DEBUG(perror("error opening pipe"););
+ close(fd);
+ return E_IO;
+ }
- if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) {
- DEBUG(perror("make pipe non-blocking"););
- close(fd);
- return E_IO;
- }
+ if (fcntl(pipe_fd[0], F_SETFL, O_NONBLOCK) < 0 || fcntl(pipe_fd[1], F_SETFL, O_NONBLOCK) < 0) {
+ DEBUG(perror("error setting pipe to non-blocking"););
+ close(fd);
+ close(pipe_fd[0]);
+ close(pipe_fd[1]);
+ return E_IO;
+ }
- struct serial_config* s = malloc(sizeof(s));
- if (s == NULL) {
- DEBUG(perror("allocate memory for serial configuration"););
- close(fd);
- close(pipe_fd[0]);
- close(pipe_fd[1]);
- return E_IO;
- }
+ struct serial_config* s = malloc(sizeof(s));
+ if (s == NULL) {
+ DEBUG(perror("error allocating memory for serial configuration"););
+ close(fd);
+ close(pipe_fd[0]);
+ close(pipe_fd[1]);
+ return E_IO;
+ }
- s->port_fd = fd;
- s->pipe_read_fd = pipe_fd[0];
- s->pipe_write_fd = pipe_fd[1];
- (*serial) = s;
+ s->port_fd = fd;
+ s->pipe_read_fd = pipe_fd[0];
+ s->pipe_write_fd = pipe_fd[1];
+ (*serial) = s;
- return 0;
+ return 0;
}
-int serial_close(struct serial_config* serial) {
- if (close(serial->pipe_write_fd) < 0) {
- DEBUG(perror("close write end of pipe"););
- return E_IO;
- }
- if (close(serial->pipe_read_fd) < 0) {
- DEBUG(perror("close read end of pipe"););
- return E_IO;
- }
-
- if (flock(serial->port_fd, LOCK_UN) < 0){
- DEBUG(perror("release lock on port"););
- return E_IO;
- }
- if (close(serial->port_fd) < 0) {
- DEBUG(perror("close port"););
- return E_IO;
- }
-
- free(serial);
- return 0;
-}
-int serial_read(struct serial_config* serial, unsigned char* buffer, size_t size) {
- int port = serial->port_fd;
- int pipe = serial->pipe_read_fd;
-
- fd_set rfds;
- FD_ZERO(&rfds);
- FD_SET(port, &rfds);
- FD_SET(pipe, &rfds);
-
- int nfds = pipe + 1;
- if (pipe < port) nfds = port + 1;
-
- int n = select(nfds, &rfds, NULL, NULL, NULL);
- if (n < 0) {
- DEBUG(perror("select"););
- return E_IO;
- }
-
- if (FD_ISSET(port, &rfds)) {
- int r = read(port, buffer, size);
-
- //treat 0 bytes read as an error to avoid problems on disconnect
- //anyway, after a poll there should be more than 0 bytes available to read
- if (r <= 0) {
- DEBUG(perror("read"););
- return E_IO;
+int serial_close(struct serial_config* const serial) {
+ if (close(serial->pipe_write_fd) < 0) {
+ DEBUG(perror("error closing write end of pipe"););
+ return E_IO;
+ }
+ if (close(serial->pipe_read_fd) < 0) {
+ DEBUG(perror("error closing read end of pipe"););
+ return E_IO;
}
- return r;
- } else if (FD_ISSET(pipe, &rfds)) {
- return E_INTERRUPT;
- } else {
- fputs("select: unknown read sets", stderr);
- return E_IO;
- }
-}
-
-int serial_write(struct serial_config* serial, unsigned char* data, size_t size) {
- int r = write(serial->port_fd, data, size);
- if (r < 0) {
- DEBUG(perror("write"););
- return E_IO;
- }
- return r;
-}
-
-int serial_interrupt(struct serial_config* serial) {
- int data = 0xffffffff;
- //write to pipe to wake up any blocked read thread (self-pipe trick)
- if (write(serial->pipe_write_fd, &data, 1) < 0) {
- DEBUG(perror("write to pipe for interrupt"););
- return E_IO;
- }
+ if (flock(serial->port_fd, LOCK_UN) < 0){
+ DEBUG(perror("error releasing lock on port"););
+ return E_IO;
+ }
+ if (close(serial->port_fd) < 0) {
+ DEBUG(perror("error closing port"););
+ return E_IO;
+ }
- return 0;
+ free(serial);
+ return 0;
}
+int serial_read(struct serial_config* const serial, unsigned char* const buffer, size_t size) {
+ int port = serial->port_fd;
+ int pipe = serial->pipe_read_fd;
-// JNI bindings
-// ============
-
-inline struct serial_config* j2s(jlong pointer) {
- return (struct serial_config*) pointer;
-}
-
-inline jlong s2j(struct serial_config* pointer) {
- return (jlong) pointer;
-}
-
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_open
- (JNIEnv *env, jclass clazz, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity, jlongArray jserialp)
-{
- const char *dev = (*env)->GetStringUTFChars(env, port_name, 0);
- struct serial_config* serial;
- int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &serial);
- (*env)->ReleaseStringUTFChars(env, port_name, dev);
+ fd_set rfds;
+ FD_ZERO(&rfds);
+ FD_SET(port, &rfds);
+ FD_SET(pipe, &rfds);
- long serialp = s2j(serial);
- (*env)->SetLongArrayRegion(env, jserialp, 0, 1, &serialp);
+ int nfds = pipe + 1;
+ if (pipe < port) nfds = port + 1;
- return r;
-}
-
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_close
- (JNIEnv * env, jclass clazz, jlong serial)
-{
- serial_close(j2s(serial));
-}
+ int n = select(nfds, &rfds, NULL, NULL, NULL);
+ if (n < 0) {
+ DEBUG(perror("error trying to call select on port and pipe"););
+ return E_IO;
+ }
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_read
- (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
-{
-
- jsize size = (*env)->GetArrayLength(env, jbuffer);
-
- unsigned char buffer[size];
- int n = serial_read(j2s(serial), buffer, size);
- if (n < 0) {
- return n;
- }
-
- (*env)->SetByteArrayRegion(env, jbuffer, 0, n, (signed char *) buffer);
- return n;
+ if (FD_ISSET(pipe, &rfds)) {
+ return E_INTERRUPT;
+ } else if (FD_ISSET(port, &rfds)) {
+ int r = read(port, buffer, size);
+
+ //treat 0 bytes read as an error to avoid problems on disconnect
+ //anyway, after a poll there should be more than 0 bytes available to read
+ if (r <= 0) {
+ DEBUG(perror("read"););
+ return E_IO;
+ }
+ return r;
+ } else {
+ fprintf(stderr, "select returned unknown read sets\n");
+ return E_IO;
+ }
}
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_write
- (JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
-{
- unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL);
- int size = (*env)->GetArrayLength(env, jbuffer);
- int r = serial_write(j2s(serial), buffer, size);
+int serial_cancel_read(struct serial_config* const serial) {
+ int data = DATA_CANCEL;
- (*env)->ReleaseByteArrayElements(env, jbuffer, buffer, JNI_ABORT);
+ //write to pipe to wake up any blocked read thread (self-pipe trick)
+ if (write(serial->pipe_write_fd, &data, 1) < 0) {
+ DEBUG(perror("error writing to pipe during read cancel"););
+ return E_IO;
+ }
- return r;
-}
-
-JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_interrupt
- (JNIEnv * env, jclass clazz, jlong serial)
-{
- return serial_interrupt(j2s(serial));
+ return 0;
}
-JNIEXPORT void JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_debug
- (JNIEnv *env, jclass clazz, jboolean value)
-{
- serial_debug((bool) value);
-}
+int serial_write(struct serial_config* const serial, unsigned char* const data, size_t size) {
+ int r = write(serial->port_fd, data, size);
+ if (r < 0) {
+ DEBUG(perror("error writing to port"););
+ return E_IO;
+ }
+ return r;
+} \ No newline at end of file
diff --git a/flow/src/main/native/posix/flow_jni.c b/flow/src/main/native/posix/flow_jni.c
new file mode 100644
index 0000000..3af2873
--- /dev/null
+++ b/flow/src/main/native/posix/flow_jni.c
@@ -0,0 +1,70 @@
+#include "flow.h"
+#include "com_github_jodersky_flow_internal_NativeSerial.h"
+
+inline struct serial_config* j2s(jlong pointer) {
+ return (struct serial_config*) pointer;
+}
+
+inline jlong s2j(struct serial_config* pointer) {
+ return (jlong) pointer;
+}
+
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_open
+(JNIEnv *env, jclass clazz, jstring port_name, jint baud, jint char_size, jboolean two_stop_bits, jint parity, jlongArray jserialp)
+{
+ const char *dev = (*env)->GetStringUTFChars(env, port_name, 0);
+ struct serial_config* serial;
+ int r = serial_open(dev, baud, char_size, two_stop_bits, parity, &serial);
+ (*env)->ReleaseStringUTFChars(env, port_name, dev);
+
+ long serialp = s2j(serial);
+ (*env)->SetLongArrayRegion(env, jserialp, 0, 1, &serialp);
+
+ return r;
+}
+
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_close
+(JNIEnv * env, jclass clazz, jlong serial)
+{
+ serial_close(j2s(serial));
+}
+
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_read
+(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
+{
+
+ jsize size = (*env)->GetArrayLength(env, jbuffer);
+
+ unsigned char buffer[size];
+ int n = serial_read(j2s(serial), buffer, size);
+ if (n < 0) {
+ return n;
+}
+
+(*env)->SetByteArrayRegion(env, jbuffer, 0, n, (signed char *) buffer);
+return n;
+}
+
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_write
+(JNIEnv * env, jclass clazz, jlong serial, jbyteArray jbuffer)
+{
+ unsigned char * buffer = (*env)->GetByteArrayElements(env, jbuffer, NULL);
+ int size = (*env)->GetArrayLength(env, jbuffer);
+ int r = serial_write(j2s(serial), buffer, size);
+
+ (*env)->ReleaseByteArrayElements(env, jbuffer, buffer, JNI_ABORT);
+
+ return r;
+}
+
+JNIEXPORT jint JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_interrupt
+(JNIEnv * env, jclass clazz, jlong serial)
+{
+ return serial_interrupt(j2s(serial));
+}
+
+JNIEXPORT void JNICALL Java_com_github_jodersky_flow_internal_NativeSerial_debug
+(JNIEnv *env, jclass clazz, jboolean value)
+{
+ serial_debug((bool) value);
+}
diff --git a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala
index e5cf1d6..6c05e14 100644
--- a/flow/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/flow/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -28,13 +28,9 @@ object Serial extends ExtensionKey[SerialExt] {
* In case the port is successfully opened, the operator will respond with an `Opened` message.
* In case the port cannot be opened, the manager will respond with a `CommandFailed` message.
*
- * @param port name of serial port
- * @param baud baud rate to use with serial port
- * @param characterSize size of a character of the data sent through the serial port
- * @param twoStopBits set to use two stop bits instead of one
- * @param parity type of parity to use with serial port
+ * @param settings settings of serial port to open
*/
- case class Open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Parity.Parity) extends Command
+ case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command
/**
* A port has been successfully opened.
@@ -68,12 +64,14 @@ object Serial extends ExtensionKey[SerialExt] {
* @param data data to be written to port
* @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending
*/
- case class Write(data: ByteString, ack: Event = NoAck) extends Command
+ case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command
/**
* Special type of acknowledgment that is not sent back.
*/
- case object NoAck extends Event
+ case object NoAck extends Function1[Int, Event] {
+ def apply(length: Int) = sys.error("cannot apply NoAck")
+ }
/**
* Request closing of port.
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala
index e884225..47c74a1 100644
--- a/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -4,7 +4,7 @@ import scala.util.Failure
import scala.util.Success
import scala.util.Try
-import com.github.jodersky.flow.internal.InternalSerial
+import com.github.jodersky.flow.internal.SerialConnection
import Serial.CommandFailed
import Serial.Open
@@ -13,7 +13,7 @@ import akka.actor.ActorLogging
import akka.actor.actorRef2Scala
/**
- * Actor that manages serial port creation. Once opened, a serial port is handed over to
+ * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to
* a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
* @see SerialOperator
*/
@@ -22,10 +22,10 @@ class SerialManager extends Actor with ActorLogging {
import context._
def receive = {
- case open @ Open(port, baud, characterSize, twoStopBits, parity) => Try {
- InternalSerial.open(port, baud, characterSize, twoStopBits, parity.id)
+ case open @ Open(port, settings, bufferSize) => Try {
+ SerialConnection.open(port, settings)
} match {
- case Success(internal) => context.actorOf(SerialOperator(internal, sender), name = escapePortString(internal.port))
+ case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
case Failure(err) => sender ! CommandFailed(open, err)
}
}
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 69db4df..fd02577 100644
--- a/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -1,7 +1,7 @@
package com.github.jodersky.flow
import java.io.IOException
-import com.github.jodersky.flow.internal.InternalSerial
+import com.github.jodersky.flow.internal.SerialConnection
import Serial._
import akka.actor.Actor
import akka.actor.ActorLogging
@@ -11,59 +11,37 @@ import akka.actor.actorRef2Scala
import akka.util.ByteString
import scala.collection.mutable.HashSet
import akka.actor.Props
+import java.nio.ByteBuffer
+import com.github.jodersky.flow.internal.Reader
+import com.github.jodersky.flow.internal.ReaderDied
/**
* Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
* @see SerialManager
*/
-class SerialOperator(serial: InternalSerial, client: ActorRef) extends Actor with ActorLogging {
+class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging {
import SerialOperator._
import context._
- private object Reader extends Thread {
- def readLoop() = {
- var continueReading = true
- while (continueReading) {
- try {
- val data = ByteString(serial.read())
- client ! Received(data)
- } catch {
+ val readBuffer = ByteBuffer.allocateDirect(bufferSize)
+ val reader = new Reader(connection, readBuffer, self, client)
+ val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
- //port is closing, stop thread gracefully
- case ex: PortInterruptedException => {
- continueReading = false
- }
-
- //something else went wrong stop and tell actor
- case ex: Exception => {
- continueReading = false
- self ! ReadException(ex)
- }
- }
- }
- }
-
- override def run() {
- this.setName("flow-reader " + serial.port)
- readLoop()
- }
- }
-
-
- client ! Opened(serial.port)
context.watch(client)
- Reader.start()
-
+ client ! Opened(connection.port)
+ reader.start()
override def postStop = {
- serial.close()
+ connection.close()
}
def receive: Receive = {
case Write(data, ack) => {
- val sent = serial.write(data.toArray)
- if (ack != NoAck) sender ! ack
+ writeBuffer.clear()
+ data.copyToBuffer(writeBuffer)
+ val sent = connection.write(writeBuffer)
+ if (ack != NoAck) sender ! ack(sent)
}
case Close => {
@@ -72,14 +50,12 @@ class SerialOperator(serial: InternalSerial, client: ActorRef) extends Actor wit
}
//go down with reader thread
- case ReadException(ex) => throw ex
+ case ReaderDied(ex) => throw ex
}
}
object SerialOperator {
- private case class ReadException(ex: Exception)
-
- def apply(serial: InternalSerial, client: ActorRef) = Props(classOf[SerialOperator], serial, client)
+ def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
index a3bc5e4..08a5556 100644
--- a/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
+++ b/flow/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
@@ -2,10 +2,9 @@ package com.github.jodersky.flow
/**
* Groups settings used in communication over a serial port.
- * @param port name of serial port
* @param baud baud rate to use with serial port
* @param characterSize size of a character of the data sent through the serial port
* @param twoStopBits set to use two stop bits instead of one
* @param parity type of parity to use with serial port
*/
-case class SerialSettings(port: String, baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) \ No newline at end of file
+case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None) \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
deleted file mode 100644
index e69f91a..0000000
--- a/flow/src/main/scala/com/github/jodersky/flow/internal/InternalSerial.scala
+++ /dev/null
@@ -1,92 +0,0 @@
-package com.github.jodersky.flow.internal
-
-import java.io.IOException
-import com.github.jodersky.flow._
-import java.util.concurrent.atomic.AtomicBoolean
-
-/** Wraps `NativeSerial` in a more object-oriented style, still quite low level. */
-class InternalSerial private (val port: String, val baud: Int, val characterSize: Int, val twoStopBits: Boolean, val parity: Int, private val pointer: Long) {
- import InternalSerial._
-
- private val reading = new AtomicBoolean(false)
- private val writing = new AtomicBoolean(false)
- private val closed = new AtomicBoolean(false)
-
- /** Closes the underlying serial connection. Any threads blocking on read or write will return. */
- def close(): Unit = synchronized {
- if (!closed.get()) {
- closed.set(true)
- except(NativeSerial.interrupt(pointer), port)
- if (writing.get()) wait()
- if (reading.get()) wait()
- except(NativeSerial.close(pointer), port)
- }
- }
-
- /**
- * Read data from underlying serial connection.
- * @throws PortInterruptedException if port is closed from another thread
- */
- def read(): Array[Byte] = if (!closed.get) {
- reading.set(true)
- try {
- val buffer = new Array[Byte](100)
- val bytesRead = except(NativeSerial.read(pointer, buffer), port)
- buffer take bytesRead
- } finally {
- synchronized {
- reading.set(false)
- if (closed.get) notify()
- }
- }
- } else {
- throw new PortClosedException(s"port ${port} is already closed")
- }
-
- /**
- * Write data to underlying serial connection.
- * @throws PortInterruptedException if port is closed from another thread
- */
- def write(data: Array[Byte]): Array[Byte] = if (!closed.get) {
- writing.set(true)
- try {
- val bytesWritten = except(NativeSerial.write(pointer, data), port)
- data take bytesWritten
- } finally {
- synchronized {
- writing.set(false)
- if (closed.get) notify()
- }
- }
- } else {
- throw new PortClosedException(s"port ${port} is already closed")
- }
-
-}
-
-object InternalSerial {
- import NativeSerial._
-
- /** Transform error code to exception if necessary. */
- private def except(result: Int, port: String): Int = result match {
- case E_IO => throw new IOException(port)
- case E_ACCESS_DENIED => throw new AccessDeniedException(port)
- case E_BUSY => throw new PortInUseException(port)
- case E_INVALID_SETTINGS => throw new InvalidSettingsException("the provided settings are invalid: be sure to use standard baud rate, character size and parity.")
- case E_INTERRUPT => throw new PortInterruptedException(port)
- case E_NO_PORT => throw new NoSuchPortException(port)
- case error if error < 0 => throw new IOException(s"unknown error code: ${error}")
- case success => success
- }
-
- /** Open a new connection to a serial port. */
- def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): InternalSerial = synchronized {
- val pointer = new Array[Long](1)
- except(NativeSerial.open(port, baud, characterSize, twoStopBits, parity, pointer), port)
- new InternalSerial(port, baud, characterSize, twoStopBits, parity, pointer(0))
- }
-
- /** Set debugging for all serial connections. Debugging results in printing extra messages from the native library in case of errors. */
- def debug(value: Boolean) = NativeSerial.debug(value)
-
-} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
new file mode 100644
index 0000000..7b3f2ef
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
@@ -0,0 +1,40 @@
+package com.github.jodersky.flow.internal
+
+import java.nio.ByteBuffer
+
+import com.github.jodersky.flow.PortInterruptedException
+import com.github.jodersky.flow.Serial.Received
+
+import akka.actor.Actor
+import akka.actor.ActorRef
+import akka.util.ByteString
+
+class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread {
+ def readLoop() = {
+ var stop = false
+ while (!serial.isClosed && !stop) {
+ try {
+ buffer.clear()
+ val length = serial.read(buffer)
+ buffer.limit(length)
+ val data = ByteString.fromByteBuffer(buffer)
+ client.tell(Received(data), operator)
+ } catch {
+
+ //don't do anything if port is interrupted
+ case ex: PortInterruptedException => {}
+
+ //stop and tell operator on other exception
+ case ex: Exception => {
+ stop = true
+ operator.tell(ReaderDied(ex), Actor.noSender)
+ }
+ }
+ }
+ }
+
+ override def run() {
+ this.setName("flow-reader " + serial.port)
+ readLoop()
+ }
+} \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
new file mode 100644
index 0000000..7dd9954
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
@@ -0,0 +1,3 @@
+package com.github.jodersky.flow.internal
+
+case class ReaderDied(reason: Exception) \ No newline at end of file
diff --git a/flow/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
new file mode 100644
index 0000000..446160d
--- /dev/null
+++ b/flow/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
@@ -0,0 +1,157 @@
+package com.github.jodersky.flow.internal
+
+import java.nio.ByteBuffer
+import java.util.concurrent.atomic.AtomicBoolean
+
+import com.github.jodersky.flow.PortClosedException
+import com.github.jodersky.flow.SerialSettings
+
+/**
+ * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast
+ * to the latter, this class encapsulates and secures any pointers used to communicate with the native
+ * backend and is thread-safe.
+ *
+ * The underlying serial port is assumed open when this class is initialized.
+ */
+class SerialConnection private (
+ val port: String,
+ val settings: SerialSettings,
+ private val pointer: Long) {
+
+ import SerialConnection._
+
+ private var reading: Boolean = false
+ private val readLock = new Object
+
+ private var writing: Boolean = false
+ private val writeLock = new Object
+
+ private val closed = new AtomicBoolean(false)
+
+ /**
+ * Checks if this serial port is closed.
+ */
+ def isClosed = closed.get()
+
+ /**
+ * Closes the underlying serial connection. Any callers blocked on read or write will return.
+ * A call of this method has no effect if the serial port is already closed.
+ * @throws IOException on IO error
+ */
+ def close(): Unit = this.synchronized {
+ if (!closed.get) {
+ closed.set(true)
+ NativeSerial.cancelRead(pointer)
+ readLock.synchronized {
+ while (reading) wait()
+ }
+ writeLock.synchronized {
+ while (writing) wait()
+ }
+ NativeSerial.close(pointer)
+ }
+ }
+
+ /**
+ * Reads data from underlying serial connection into a ByteBuffer.
+ * Note that data is read into the buffer's memory, its attributes
+ * such as position and limit are not modified.
+ *
+ * A call to this method is blocking, however it is interrupted
+ * if the connection is closed.
+ *
+ * This method works for direct and indirect buffers but is optimized
+ * for the former.
+ *
+ * @param buffer a ByteBuffer into which data is read
+ * @return the actual number of bytes read
+ * @throws PortInterruptedException if port is closed while reading
+ * @throws IOException on IO error
+ */
+ def read(buffer: ByteBuffer): Int = readLock.synchronized {
+ if (!closed.get) {
+ reading = true
+ try {
+ transfer(
+ b => NativeSerial.readDirect(pointer, b),
+ b => NativeSerial.read(pointer, b.array()))(buffer)
+ } finally {
+ reading = false
+ if (closed.get) notify()
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is closed")
+ }
+ }
+
+ /**
+ * Writes data from a ByteBuffer to underlying serial connection.
+ * Note that data is read from the buffer's memory, its attributes
+ * such as position and limit are not modified.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * This method works for direct and indirect buffers but is optimized
+ * for the former.
+ *
+ * @param buffer a ByteBuffer from which data is taken
+ * @return the actual number of bytes written
+ * @throws IOException on IO error
+ */
+ def write(buffer: ByteBuffer): Int = writeLock.synchronized {
+ if (!closed.get) {
+ writing = true
+ try {
+ transfer(
+ b => NativeSerial.writeDirect(pointer, b, b.remaining()),
+ b => NativeSerial.write(pointer, b.array(), b.remaining()))(buffer)
+ } finally {
+ writing = false
+ if (closed.get) notify()
+ }
+ } else {
+ throw new PortClosedException(s"port ${port} is closed")
+ }
+ }
+
+ private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) {
+ direct(buffer)
+ } else if (buffer.hasArray()) {
+ indirect(buffer)
+ } else {
+ throw new IllegalArgumentException("buffer is not direct and has no array");
+ }
+
+}
+
+object SerialConnection {
+ import NativeSerial._
+
+ /**
+ * Opens a new connection to a serial port.
+ * This method acts as a factory to creating serial connections.
+ *
+ * @param port name of serial port to open
+ * @param settings settings with which to initialize the connection
+ * @return an instance of the open serial connection
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ def open(port: String, settings: SerialSettings): SerialConnection = synchronized {
+ val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id)
+ new SerialConnection(port, settings, pointer)
+ }
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ def debug(value: Boolean) = NativeSerial.debug(value)
+
+} \ No newline at end of file