aboutsummaryrefslogtreecommitdiff
path: root/flow-core
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-05-16 15:28:59 -0700
committerJakob Odersky <jakob@odersky.com>2016-06-09 03:30:35 -0700
commit92c4b3d41e06ad4b89004212c85248e9e6cd61d7 (patch)
tree69470f7c4ed48edaebea91964d7d552e7eaacf0d /flow-core
parentf6f26c2c9e3ec9bdd45fb384483b3450bef5984a (diff)
downloadakka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.tar.gz
akka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.tar.bz2
akka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.zip
Move project to `ch.jodersky` and upgrade sbt-jni
Diffstat (limited to 'flow-core')
-rw-r--r--flow-core/build.sbt7
-rw-r--r--flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java147
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/Parity.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala)2
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/Serial.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala)10
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala)74
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala)2
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala)9
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala83
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala)2
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala108
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala)90
-rw-r--r--flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala (renamed from flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala)2
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala54
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala58
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala36
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala4
16 files changed, 282 insertions, 406 deletions
diff --git a/flow-core/build.sbt b/flow-core/build.sbt
index 7a29d1e..487afe0 100644
--- a/flow-core/build.sbt
+++ b/flow-core/build.sbt
@@ -4,9 +4,4 @@ FlowBuild.commonSettings
libraryDependencies += Dependencies.akkaActor
-//there are also java sources in this project
-compileOrder in Compile := CompileOrder.Mixed
-
-enablePlugins(JniLoading)
-
-target in javah in Compile := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include"
+target in javah := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include"
diff --git a/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
deleted file mode 100644
index 6fac8da..0000000
--- a/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java
+++ /dev/null
@@ -1,147 +0,0 @@
-package com.github.jodersky.flow.internal;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import ch.jodersky.jni.NativeLoader;
-
-import com.github.jodersky.flow.AccessDeniedException;
-import com.github.jodersky.flow.InvalidSettingsException;
-import com.github.jodersky.flow.NoSuchPortException;
-import com.github.jodersky.flow.PortInUseException;
-import com.github.jodersky.flow.PortInterruptedException;
-
-/**
- * Low-level wrapper on top of native serial backend.
- *
- * WARNING: Methods in this class allocate native structures and deal with pointers.
- * These pointers are handled as longs by java and are NOT checked for correctness,
- * therefore passing invalid pointers may have unexpected results, including but not
- * limited to crashing the VM.
- *
- * See SerialConnection for a higher level, more secured wrapper
- * of serial communication.
- *
- * @see com.github.jodersky.flow.internal.SerialConnection
- */
-final class NativeSerial {
-
- static {
- NativeLoader.load("/com/github/jodersky/flow", "flow3");
- }
-
- final static int PARITY_NONE = 0;
- final static int PARITY_ODD = 1;
- final static int PARITY_EVEN = 2;
-
- /**
- * Opens a serial port.
- *
- * @param port name of serial port to open
- * @param characterSize size of a character of the data sent through the serial port
- * @param twoStopBits set to use two stop bits instead of one
- * @param parity type of parity to use with serial port
- * @return address of natively allocated serial configuration structure
- * @throws NoSuchPortException if the given port does not exist
- * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
- * @throws PortInUseException if port is already in use
- * @throws InvalidSettingsException if any of the specified settings are invalid
- * @throws IOException on IO error
- */
- native static long open(String port, int baud, int characterSize, boolean twoStopBits, int parity)
- throws NoSuchPortException, AccessDeniedException, PortInUseException, InvalidSettingsException, IOException;
-
- /**
- * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only read into the
- * buffer's allocated memory, its position or limit are not changed.
- *
- * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
- *
- * @param serial address of natively allocated serial configuration structure
- * @param buffer direct ByteBuffer to read into
- * @return number of bytes actually read
- * @throws IllegalArgumentException if the ByteBuffer is not direct
- * @throws PortInterruptedException if the call to this function was interrupted
- * @throws IOException on IO error
- */
- native static int readDirect(long serial, ByteBuffer buffer)
- throws IllegalArgumentException, PortInterruptedException, IOException;
-
- /**
- * Reads data from a previously opened serial port into an array.
- *
- * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port.
- *
- * @param serial address of natively allocated serial configuration structure
- * @param buffer array to read data into
- * @return number of bytes actually read
- * @throws PortInterruptedException if the call to this function was interrupted
- * @throws IOException on IO error
- */
- native static int read(long serial, byte[] buffer)
- throws PortInterruptedException, IOException;
-
- /**
- * Cancels a read (any caller to read or readDirect will return with a PortInterruptedException). This function may be called from any thread.
- *
- * @param serial address of natively allocated serial configuration structure
- * @throws IOException on IO error
- */
- native static void cancelRead(long serial)
- throws IOException;
-
- /**
- * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is only taken from
- * the buffer's allocated memory, its position or limit are not changed.
- *
- * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
- * transmission buffer.
- *
- * @param serial address of natively allocated serial configuration structure
- * @param buffer direct ByteBuffer from which data is taken
- * @param length actual amount of data that should be taken from the buffer (this is needed since the native
- * backend does not provide a way to query the buffer's current limit)
- * @return number of bytes actually written
- * @throws IllegalArgumentException if the ByteBuffer is not direct
- * @throws IOException on IO error
- */
- native static int writeDirect(long serial, ByteBuffer buffer, int length)
- throws IllegalArgumentException, IOException;
-
- /**
- * Writes data from an array to a previously opened serial port.
- *
- * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
- * transmission buffer.
- *
- * @param serial address of natively allocated serial configuration structure
- * @param buffer array from which data is taken
- * @param length actual amount of data that should be taken from the buffer
- * @return number of bytes actually written
- * @throws IOException on IO error
- */
- native static int write(long serial, byte[] buffer, int length)
- throws IOException;
-
- /**
- * Closes an previously open serial port. Natively allocated resources are freed and the serial pointer becomes invalid,
- * therefore this function should only be called ONCE per open serial port.
- *
- * A port should not be closed while it is used (by a read or write) as this
- * results in undefined behaviour.
- *
- * @param serial address of natively allocated serial configuration structure
- * @throws IOException on IO error
- */
- native static void close(long serial)
- throws IOException;
-
- /**
- * Sets native debugging mode. If debugging is enabled, detailed error messages
- * are printed (to stderr) from native method calls.
- *
- * @param value set to enable debugging
- */
- native static void debug(boolean value);
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala
index 04d64a9..30596d2 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
/** Specifies available parities used in serial communication. */
object Parity extends Enumeration {
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala
index d3b8873..43b1d19 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
import akka.actor.ExtensionKey
import akka.util.ByteString
@@ -121,4 +121,12 @@ object Serial extends ExtensionKey[SerialExt] {
*/
case class Connected(port: String) extends Event
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ def debug(value: Boolean) = UnsafeSerial.debug(value)
+
}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala
index 73416e3..1cd1046 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala
@@ -1,24 +1,20 @@
-package com.github.jodersky.flow
-package internal
+package ch.jodersky.flow
import java.nio.ByteBuffer
import java.util.concurrent.atomic.AtomicBoolean
/**
- * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast
- * to the latter, this class encapsulates and secures any pointers used to communicate with the native
- * backend and is thread-safe.
+ * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In
+ * contrast to the latter, this class encapsulates and secures any pointers used to communicate with
+ * the native backend and is thread-safe.
*
* The underlying serial port is assumed open when this class is initialized.
*/
-class SerialConnection private (
- val port: String,
- val settings: SerialSettings,
- private val pointer: Long
+private[flow] class SerialConnection private (
+ unsafe: UnsafeSerial,
+ val port: String
) {
- import SerialConnection._
-
private var reading: Boolean = false
private val readLock = new Object
@@ -40,14 +36,14 @@ class SerialConnection private (
def close(): Unit = this.synchronized {
if (!closed.get) {
closed.set(true)
- NativeSerial.cancelRead(pointer)
+ unsafe.cancelRead()
readLock.synchronized {
while (reading) this.wait()
}
writeLock.synchronized {
while (writing) this.wait()
}
- NativeSerial.close(pointer)
+ unsafe.close()
}
}
@@ -69,18 +65,15 @@ class SerialConnection private (
*/
def read(buffer: ByteBuffer): Int = readLock.synchronized {
if (!closed.get) {
- reading = true
try {
- transfer(
- b => NativeSerial.readDirect(pointer, b),
- b => NativeSerial.read(pointer, b.array())
- )(buffer)
+ reading = true
+ unsafe.read(buffer)
} finally {
reading = false
if (closed.get) readLock.notify()
}
} else {
- throw new PortClosedException(s"port ${port} is closed")
+ throw new PortClosedException(s"${port} is closed")
}
}
@@ -101,33 +94,21 @@ class SerialConnection private (
*/
def write(buffer: ByteBuffer): Int = writeLock.synchronized {
if (!closed.get) {
- writing = true
try {
- transfer(
- b => NativeSerial.writeDirect(pointer, b, b.position()),
- b => NativeSerial.write(pointer, b.array(), b.position())
- )(buffer)
+ writing = true
+ unsafe.write(buffer, buffer.position)
} finally {
writing = false
if (closed.get) writeLock.notify()
}
} else {
- throw new PortClosedException(s"port ${port} is closed")
+ throw new PortClosedException(s"${port} is closed")
}
}
- private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) {
- direct(buffer)
- } else if (buffer.hasArray()) {
- indirect(buffer)
- } else {
- throw new IllegalArgumentException("buffer is not direct and has no array");
- }
-
}
-object SerialConnection {
- import NativeSerial._
+private[flow] object SerialConnection {
/**
* Opens a new connection to a serial port.
@@ -142,17 +123,18 @@ object SerialConnection {
* @throws InvalidSettingsException if any of the specified settings are invalid
* @throws IOException on IO error
*/
- def open(port: String, settings: SerialSettings): SerialConnection = synchronized {
- val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id)
- new SerialConnection(port, settings, pointer)
+ def open(
+ port: String,
+ settings: SerialSettings
+ ): SerialConnection = synchronized {
+ val pointer = UnsafeSerial.open(
+ port,
+ settings.baud,
+ settings.characterSize,
+ settings.twoStopBits,
+ settings.parity.id
+ )
+ new SerialConnection(new UnsafeSerial(pointer), port)
}
- /**
- * Sets native debugging mode. If debugging is enabled, detailed error messages
- * are printed (to stderr) from native method calls.
- *
- * @param value set to enable debugging
- */
- def debug(value: Boolean) = NativeSerial.debug(value)
-
}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala
index 5c1ddf6..4ed3e2e 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
import akka.actor.{ ExtendedActorSystem, Props }
import akka.io.IO
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala
index d163b0a..7967087 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala
@@ -1,8 +1,7 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
import akka.actor.{ Actor, ActorLogging, OneForOneStrategy }
import akka.actor.SupervisorStrategy.{ Escalate, Stop }
-import internal.{ SerialConnection, Watcher }
import scala.util.{ Failure, Success, Try }
/**
@@ -10,7 +9,7 @@ import scala.util.{ Failure, Success, Try }
* a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
* @see SerialOperator
*/
-class SerialManager extends Actor with ActorLogging {
+private[flow] class SerialManager extends Actor {
import SerialManager._
import context._
@@ -38,9 +37,9 @@ class SerialManager extends Actor with ActorLogging {
}
-object SerialManager {
+private[flow] object SerialManager {
- private def escapePortString(port: String) = port collect {
+ private def escapePortString(port: String) = port map {
case '/' => '-'
case c => c
}
diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala
new file mode 100644
index 0000000..d5c131c
--- /dev/null
+++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala
@@ -0,0 +1,83 @@
+package ch.jodersky.flow
+
+import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated }
+import akka.util.ByteString
+import java.nio.ByteBuffer
+
+/**
+ * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
+ * @see SerialManager
+ */
+private[flow] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor {
+ import SerialOperator._
+ import context._
+
+ case class ReaderDied(ex: Throwable)
+ object Reader extends Thread {
+ val buffer = ByteBuffer.allocateDirect(bufferSize)
+
+ def loop() = {
+ var stop = false
+ while (!connection.isClosed && !stop) {
+ try {
+ buffer.clear()
+ val length = connection.read(buffer)
+ buffer.limit(length)
+ val data = ByteString.fromByteBuffer(buffer)
+ client.tell(Serial.Received(data), self)
+ } catch {
+ // don't do anything if port is interrupted
+ case ex: PortInterruptedException => {}
+
+ //stop and tell operator on other exception
+ case ex: Exception =>
+ stop = true
+ self.tell(ReaderDied(ex), Actor.noSender)
+ }
+ }
+ }
+
+ override def run() {
+ this.setName(s"serial-reader(${connection.port})")
+ loop()
+ }
+
+ }
+
+ val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
+
+ override def preStart() = {
+ context watch client
+ client ! Serial.Opened(connection.port)
+ Reader.start()
+ }
+
+ override def receive: Receive = {
+
+ case Serial.Write(data, ack) =>
+ writeBuffer.clear()
+ data.copyToBuffer(writeBuffer)
+ val sent = connection.write(writeBuffer)
+ if (ack != Serial.NoAck) sender ! ack(sent)
+
+ case Serial.Close =>
+ client ! Serial.Closed
+ context stop self
+
+ case Terminated(`client`) =>
+ context stop self
+
+ // go down with reader thread
+ case ReaderDied(ex) => throw ex
+
+ }
+
+ override def postStop() = {
+ connection.close()
+ }
+
+}
+
+private[flow] object SerialOperator {
+ def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala
index 087fa6e..2d3a6ed 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
/**
* Groups settings used in communication over a serial port.
diff --git a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala
new file mode 100644
index 0000000..3126618
--- /dev/null
+++ b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala
@@ -0,0 +1,108 @@
+package ch.jodersky.flow
+
+import java.nio.ByteBuffer
+
+import ch.jodersky.jni.nativeLoader
+
+/**
+ * Low-level wrapper of native serial backend.
+ *
+ * WARNING: Methods in this class allocate native structures and deal with pointers. These
+ * pointers are handled as longs by java and are NOT checked for correctness, therefore passing
+ * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM.
+ *
+ * See SerialConnection for a higher-level, more secured wrapper
+ * of serial communication.
+ *
+ * @param serialAddr address of natively allocated serial configuration structure
+ */
+@nativeLoader("flow4")
+private[flow] class UnsafeSerial(final val serialAddr: Long) {
+
+ final val ParityNone: Int = 0
+ final val ParityOdd: Int = 1
+ final val ParityEven: Int = 2
+
+ /**
+ * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only
+ * read into the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The read is blocking, however it may be interrupted by calling cancelRead() on the given
+ * serial port.
+ *
+ * @param buffer direct ByteBuffer to read into
+ * @return number of bytes actually read
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws PortInterruptedException if the call to this function was interrupted
+ * @throws IOException on IO error
+ */
+ @native def read(buffer: ByteBuffer): Int
+
+ /**
+ * Cancels a read (any caller to read or readDirect will return with a
+ * PortInterruptedException). This function may be called from any thread.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ @native def cancelRead(): Unit
+
+ /**
+ * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is
+ * only taken from the buffer's allocated memory, its position or limit are not changed.
+ *
+ * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
+ * transmission buffer.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @param buffer direct ByteBuffer from which data is taken
+ * @param length actual amount of data that should be taken from the buffer (this is needed since the native
+ * backend does not provide a way to query the buffer's current limit)
+ * @return number of bytes actually written
+ * @throws IllegalArgumentException if the ByteBuffer is not direct
+ * @throws IOException on IO error
+ */
+ @native def write(buffer: ByteBuffer, length: Int): Int
+
+ /**
+ * Closes an previously open serial port. Natively allocated resources are freed and the serial
+ * pointer becomes invalid, therefore this function should only be called ONCE per open serial
+ * port.
+ *
+ * A port should not be closed while it is used (by a read or write) as this
+ * results in undefined behaviour.
+ *
+ * @param serial address of natively allocated serial configuration structure
+ * @throws IOException on IO error
+ */
+ @native def close(): Unit
+
+}
+
+private[flow] object UnsafeSerial {
+
+ /**
+ * Opens a serial port.
+ *
+ * @param port name of serial port to open
+ * @param characterSize size of a character of the data sent through the serial port
+ * @param twoStopBits set to use two stop bits instead of one
+ * @param parity type of parity to use with serial port
+ * @return address of natively allocated serial configuration structure
+ * @throws NoSuchPortException if the given port does not exist
+ * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
+ * @throws PortInUseException if port is already in use
+ * @throws InvalidSettingsException if any of the specified settings are invalid
+ * @throws IOException on IO error
+ */
+ @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long
+
+ /**
+ * Sets native debugging mode. If debugging is enabled, detailed error messages
+ * are printed (to stderr) from native method calls.
+ *
+ * @param value set to enable debugging
+ */
+ @native def debug(value: Boolean): Unit
+
+}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala
index 8d89fb4..9fa519b 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala
@@ -1,5 +1,4 @@
-package com.github.jodersky.flow
-package internal
+package ch.jodersky.flow
import akka.actor.{ Actor, ActorRef, Props, Terminated }
import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
@@ -8,20 +7,53 @@ import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
import scala.util.{ Failure, Success, Try }
-class Watcher(from: Option[ActorRef]) extends Actor {
+private[flow] class Watcher(from: Option[ActorRef]) extends Actor {
- private val watcher = new Watcher.WatcherThread(self)
+ case class WatcherDied(reason: Throwable)
+ object WatcherThread extends Thread {
+ import Watcher.NewFile
- //directory -> subscribers
+ private val service = FileSystems.getDefault().newWatchService()
+
+ def register(directory: Path) = directory.register(service, ENTRY_CREATE)
+
+ override def run(): Unit = {
+ this.setName("serial-port-watcher")
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ self.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender)
+ }
+ }
+ }
+
+ def close() = service.close // causes the service to throw a ClosedWatchServiceException
+ }
+
+
+ // directory -> subscribers
private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
- //directory -> watchkey
+ // directory -> watchkey
private val keys: Map[String, WatchKey] = Map.empty
def subscribe(directory: String, client: ActorRef): WatchKey = {
val normal = Paths.get(directory).toAbsolutePath
val index = normal.toString
- val key = keys.getOrElseUpdate(index, watcher.register(normal))
+ val key = keys.getOrElseUpdate(index, WatcherThread.register(normal))
clients addBinding (index, client)
key
}
@@ -46,12 +78,11 @@ class Watcher(from: Option[ActorRef]) extends Actor {
}
override def preStart() = {
- watcher.setDaemon(true)
- watcher.setName("flow-watcher")
- watcher.start()
+ WatcherThread.setDaemon(true)
+ WatcherThread.start()
}
- def receive = {
+ override def receive = {
case w @ Serial.Watch(directory, skipInitial) =>
val normalPath = Paths.get(directory).toAbsolutePath
@@ -94,50 +125,19 @@ class Watcher(from: Option[ActorRef]) extends Actor {
reply(Serial.Connected(absFile.toString), client)
}
- case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
+ case WatcherDied(err) => throw err // go down with watcher thread
}
override def postStop() = {
- watcher.close()
+ WatcherThread.close()
}
}
-object Watcher {
+private[flow] object Watcher {
private case class NewFile(directory: Path, file: Path)
- private class WatcherThread(actor: ActorRef) extends Thread {
-
- private val service = FileSystems.getDefault().newWatchService()
-
- def register(directory: Path) = directory.register(service, ENTRY_CREATE)
-
- override def run(): Unit = {
- var stop = false
- while (!stop) {
- try {
- val key = service.take()
- key.pollEvents() foreach { ev =>
- val event = ev.asInstanceOf[WatchEvent[Path]]
- if (event.kind == ENTRY_CREATE) {
- val directory = key.watchable().asInstanceOf[Path]
- val file = event.context()
- actor.tell(NewFile(directory, file), Actor.noSender)
- }
- }
- key.reset()
- } catch {
- case _: InterruptedException => stop = true
- case _: ClosedWatchServiceException => stop = true
- case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
-
- def close() = service.close //causes the service to throw a ClosedWatchServiceException
- }
-
def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala
index ebc0e65..ee087a8 100644
--- a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala
+++ b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala
@@ -1,4 +1,4 @@
-package com.github.jodersky.flow
+package ch.jodersky.flow
/** The requested port could not be found. */
class NoSuchPortException(message: String) extends Exception(message)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
deleted file mode 100644
index ec0ee27..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala }
-import internal.{ Reader, SerialConnection, ThreadDied }
-import java.nio.ByteBuffer
-
-/**
- * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
- * @see SerialManager
- */
-class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging {
- import SerialOperator._
- import context._
-
- val readBuffer = ByteBuffer.allocateDirect(bufferSize)
- val reader = new Reader(connection, readBuffer, self, client)
- val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
-
- context.watch(client)
- client ! Serial.Opened(connection.port)
- reader.start()
-
- override def postStop = {
- connection.close()
- }
-
- def receive: Receive = {
-
- case Serial.Write(data, ack) => {
- writeBuffer.clear()
- data.copyToBuffer(writeBuffer)
- val sent = connection.write(writeBuffer)
- if (ack != Serial.NoAck) sender ! ack(sent)
- }
-
- case Serial.Close => {
- client ! Serial.Closed
- context stop self
- }
-
- case Terminated(`client`) => {
- context stop self
- }
-
- //go down with reader thread
- case ThreadDied(`reader`, ex) => throw ex
-
- }
-
-}
-
-object SerialOperator {
- def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
deleted file mode 100644
index a5fdc40..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import java.io.{ File, FileOutputStream, InputStream, OutputStream }
-
-/** Handles loading of the current platform's native library for flow. */
-object NativeLoader {
-
- private final val BufferSize = 4096
-
- private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "")
-
- private def arch = System.getProperty("os.arch").toLowerCase
-
- /** Extract a resource from this class loader to a temporary file. */
- private def extract(path: String, prefix: String): Option[File] = {
- var in: Option[InputStream] = None
- var out: Option[OutputStream] = None
-
- try {
- in = Option(NativeLoader.getClass.getResourceAsStream(path))
- if (in.isEmpty) return None
-
- val file = File.createTempFile(prefix, "")
- out = Some(new FileOutputStream(file))
-
- val buffer = new Array[Byte](BufferSize)
- var length = -1;
- do {
- length = in.get.read(buffer)
- if (length != -1) out.get.write(buffer, 0, length)
- } while (length != -1)
-
- Some(file)
- } finally {
- in.foreach(_.close)
- out.foreach(_.close)
- }
- }
-
- private def loadFromJar(library: String) = {
- val fqlib = System.mapLibraryName(library) //fully qualified library name
- val path = s"/native/${os}-${arch}/${fqlib}"
- extract(path, fqlib) match {
- case Some(file) => System.load(file.getAbsolutePath)
- case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " +
- "the native library does not exist for your specific architecture/OS combination." +
- "Could not find " + path + ".")
- }
- }
-
- def load(library: String) = try {
- System.loadLibrary(library)
- } catch {
- case ex: UnsatisfiedLinkError => loadFromJar(library)
- }
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
deleted file mode 100644
index 59ad575..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import akka.actor.{ Actor, ActorRef }
-import akka.util.ByteString
-import java.nio.ByteBuffer
-
-class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread {
- def readLoop() = {
- var stop = false
- while (!serial.isClosed && !stop) {
- try {
- buffer.clear()
- val length = serial.read(buffer)
- buffer.limit(length)
- val data = ByteString.fromByteBuffer(buffer)
- client.tell(Serial.Received(data), operator)
- } catch {
-
- //don't do anything if port is interrupted
- case ex: PortInterruptedException => {}
-
- //stop and tell operator on other exception
- case ex: Exception => {
- stop = true
- operator.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
- }
-
- override def run() {
- this.setName("flow-reader " + serial.port)
- readLoop()
- }
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
deleted file mode 100644
index 1470aa5..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-case class ThreadDied(thread: Thread, reason: Exception)