diff options
Diffstat (limited to 'flow-core')
16 files changed, 282 insertions, 406 deletions
diff --git a/flow-core/build.sbt b/flow-core/build.sbt index 7a29d1e..487afe0 100644 --- a/flow-core/build.sbt +++ b/flow-core/build.sbt @@ -4,9 +4,4 @@ FlowBuild.commonSettings libraryDependencies += Dependencies.akkaActor -//there are also java sources in this project -compileOrder in Compile := CompileOrder.Mixed - -enablePlugins(JniLoading) - -target in javah in Compile := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include" +target in javah := (baseDirectory in ThisBuild).value / "flow-native" / "src" / "include" diff --git a/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java b/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java deleted file mode 100644 index 6fac8da..0000000 --- a/flow-core/src/main/java/com/github/jodersky/flow/internal/NativeSerial.java +++ /dev/null @@ -1,147 +0,0 @@ -package com.github.jodersky.flow.internal; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import ch.jodersky.jni.NativeLoader; - -import com.github.jodersky.flow.AccessDeniedException; -import com.github.jodersky.flow.InvalidSettingsException; -import com.github.jodersky.flow.NoSuchPortException; -import com.github.jodersky.flow.PortInUseException; -import com.github.jodersky.flow.PortInterruptedException; - -/** - * Low-level wrapper on top of native serial backend. - * - * WARNING: Methods in this class allocate native structures and deal with pointers. - * These pointers are handled as longs by java and are NOT checked for correctness, - * therefore passing invalid pointers may have unexpected results, including but not - * limited to crashing the VM. - * - * See SerialConnection for a higher level, more secured wrapper - * of serial communication. - * - * @see com.github.jodersky.flow.internal.SerialConnection - */ -final class NativeSerial { - - static { - NativeLoader.load("/com/github/jodersky/flow", "flow3"); - } - - final static int PARITY_NONE = 0; - final static int PARITY_ODD = 1; - final static int PARITY_EVEN = 2; - - /** - * Opens a serial port. - * - * @param port name of serial port to open - * @param characterSize size of a character of the data sent through the serial port - * @param twoStopBits set to use two stop bits instead of one - * @param parity type of parity to use with serial port - * @return address of natively allocated serial configuration structure - * @throws NoSuchPortException if the given port does not exist - * @throws AccessDeniedException if permissions of the current user are not sufficient to open port - * @throws PortInUseException if port is already in use - * @throws InvalidSettingsException if any of the specified settings are invalid - * @throws IOException on IO error - */ - native static long open(String port, int baud, int characterSize, boolean twoStopBits, int parity) - throws NoSuchPortException, AccessDeniedException, PortInUseException, InvalidSettingsException, IOException; - - /** - * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only read into the - * buffer's allocated memory, its position or limit are not changed. - * - * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer direct ByteBuffer to read into - * @return number of bytes actually read - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws PortInterruptedException if the call to this function was interrupted - * @throws IOException on IO error - */ - native static int readDirect(long serial, ByteBuffer buffer) - throws IllegalArgumentException, PortInterruptedException, IOException; - - /** - * Reads data from a previously opened serial port into an array. - * - * The read is blocking, however it may be interrupted by calling cancelRead() on the given serial port. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer array to read data into - * @return number of bytes actually read - * @throws PortInterruptedException if the call to this function was interrupted - * @throws IOException on IO error - */ - native static int read(long serial, byte[] buffer) - throws PortInterruptedException, IOException; - - /** - * Cancels a read (any caller to read or readDirect will return with a PortInterruptedException). This function may be called from any thread. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - native static void cancelRead(long serial) - throws IOException; - - /** - * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is only taken from - * the buffer's allocated memory, its position or limit are not changed. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer direct ByteBuffer from which data is taken - * @param length actual amount of data that should be taken from the buffer (this is needed since the native - * backend does not provide a way to query the buffer's current limit) - * @return number of bytes actually written - * @throws IllegalArgumentException if the ByteBuffer is not direct - * @throws IOException on IO error - */ - native static int writeDirect(long serial, ByteBuffer buffer, int length) - throws IllegalArgumentException, IOException; - - /** - * Writes data from an array to a previously opened serial port. - * - * The write is non-blocking, this function returns as soon as the data is copied into the kernel's - * transmission buffer. - * - * @param serial address of natively allocated serial configuration structure - * @param buffer array from which data is taken - * @param length actual amount of data that should be taken from the buffer - * @return number of bytes actually written - * @throws IOException on IO error - */ - native static int write(long serial, byte[] buffer, int length) - throws IOException; - - /** - * Closes an previously open serial port. Natively allocated resources are freed and the serial pointer becomes invalid, - * therefore this function should only be called ONCE per open serial port. - * - * A port should not be closed while it is used (by a read or write) as this - * results in undefined behaviour. - * - * @param serial address of natively allocated serial configuration structure - * @throws IOException on IO error - */ - native static void close(long serial) - throws IOException; - - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - native static void debug(boolean value); - -} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala index 04d64a9..30596d2 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/Parity.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package ch.jodersky.flow /** Specifies available parities used in serial communication. */ object Parity extends Enumeration { diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala index d3b8873..43b1d19 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/Serial.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package ch.jodersky.flow import akka.actor.ExtensionKey import akka.util.ByteString @@ -121,4 +121,12 @@ object Serial extends ExtensionKey[SerialExt] { */ case class Connected(port: String) extends Event + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + def debug(value: Boolean) = UnsafeSerial.debug(value) + } diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala index 73416e3..1cd1046 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialConnection.scala @@ -1,24 +1,20 @@ -package com.github.jodersky.flow -package internal +package ch.jodersky.flow import java.nio.ByteBuffer import java.util.concurrent.atomic.AtomicBoolean /** - * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast - * to the latter, this class encapsulates and secures any pointers used to communicate with the native - * backend and is thread-safe. + * Represents a serial connection in a more secure and object-oriented style than `UnsafeSerial`. In + * contrast to the latter, this class encapsulates and secures any pointers used to communicate with + * the native backend and is thread-safe. * * The underlying serial port is assumed open when this class is initialized. */ -class SerialConnection private ( - val port: String, - val settings: SerialSettings, - private val pointer: Long +private[flow] class SerialConnection private ( + unsafe: UnsafeSerial, + val port: String ) { - import SerialConnection._ - private var reading: Boolean = false private val readLock = new Object @@ -40,14 +36,14 @@ class SerialConnection private ( def close(): Unit = this.synchronized { if (!closed.get) { closed.set(true) - NativeSerial.cancelRead(pointer) + unsafe.cancelRead() readLock.synchronized { while (reading) this.wait() } writeLock.synchronized { while (writing) this.wait() } - NativeSerial.close(pointer) + unsafe.close() } } @@ -69,18 +65,15 @@ class SerialConnection private ( */ def read(buffer: ByteBuffer): Int = readLock.synchronized { if (!closed.get) { - reading = true try { - transfer( - b => NativeSerial.readDirect(pointer, b), - b => NativeSerial.read(pointer, b.array()) - )(buffer) + reading = true + unsafe.read(buffer) } finally { reading = false if (closed.get) readLock.notify() } } else { - throw new PortClosedException(s"port ${port} is closed") + throw new PortClosedException(s"${port} is closed") } } @@ -101,33 +94,21 @@ class SerialConnection private ( */ def write(buffer: ByteBuffer): Int = writeLock.synchronized { if (!closed.get) { - writing = true try { - transfer( - b => NativeSerial.writeDirect(pointer, b, b.position()), - b => NativeSerial.write(pointer, b.array(), b.position()) - )(buffer) + writing = true + unsafe.write(buffer, buffer.position) } finally { writing = false if (closed.get) writeLock.notify() } } else { - throw new PortClosedException(s"port ${port} is closed") + throw new PortClosedException(s"${port} is closed") } } - private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) { - direct(buffer) - } else if (buffer.hasArray()) { - indirect(buffer) - } else { - throw new IllegalArgumentException("buffer is not direct and has no array"); - } - } -object SerialConnection { - import NativeSerial._ +private[flow] object SerialConnection { /** * Opens a new connection to a serial port. @@ -142,17 +123,18 @@ object SerialConnection { * @throws InvalidSettingsException if any of the specified settings are invalid * @throws IOException on IO error */ - def open(port: String, settings: SerialSettings): SerialConnection = synchronized { - val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id) - new SerialConnection(port, settings, pointer) + def open( + port: String, + settings: SerialSettings + ): SerialConnection = synchronized { + val pointer = UnsafeSerial.open( + port, + settings.baud, + settings.characterSize, + settings.twoStopBits, + settings.parity.id + ) + new SerialConnection(new UnsafeSerial(pointer), port) } - /** - * Sets native debugging mode. If debugging is enabled, detailed error messages - * are printed (to stderr) from native method calls. - * - * @param value set to enable debugging - */ - def debug(value: Boolean) = NativeSerial.debug(value) - } diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala index 5c1ddf6..4ed3e2e 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialExt.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package ch.jodersky.flow import akka.actor.{ ExtendedActorSystem, Props } import akka.io.IO diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala index d163b0a..7967087 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialManager.scala @@ -1,8 +1,7 @@ -package com.github.jodersky.flow +package ch.jodersky.flow import akka.actor.{ Actor, ActorLogging, OneForOneStrategy } import akka.actor.SupervisorStrategy.{ Escalate, Stop } -import internal.{ SerialConnection, Watcher } import scala.util.{ Failure, Success, Try } /** @@ -10,7 +9,7 @@ import scala.util.{ Failure, Success, Try } * a dedicated operator actor that acts as an intermediate between client code and the native system serial port. * @see SerialOperator */ -class SerialManager extends Actor with ActorLogging { +private[flow] class SerialManager extends Actor { import SerialManager._ import context._ @@ -38,9 +37,9 @@ class SerialManager extends Actor with ActorLogging { } -object SerialManager { +private[flow] object SerialManager { - private def escapePortString(port: String) = port collect { + private def escapePortString(port: String) = port map { case '/' => '-' case c => c } diff --git a/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala new file mode 100644 index 0000000..d5c131c --- /dev/null +++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialOperator.scala @@ -0,0 +1,83 @@ +package ch.jodersky.flow + +import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated } +import akka.util.ByteString +import java.nio.ByteBuffer + +/** + * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. + * @see SerialManager + */ +private[flow] class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor { + import SerialOperator._ + import context._ + + case class ReaderDied(ex: Throwable) + object Reader extends Thread { + val buffer = ByteBuffer.allocateDirect(bufferSize) + + def loop() = { + var stop = false + while (!connection.isClosed && !stop) { + try { + buffer.clear() + val length = connection.read(buffer) + buffer.limit(length) + val data = ByteString.fromByteBuffer(buffer) + client.tell(Serial.Received(data), self) + } catch { + // don't do anything if port is interrupted + case ex: PortInterruptedException => {} + + //stop and tell operator on other exception + case ex: Exception => + stop = true + self.tell(ReaderDied(ex), Actor.noSender) + } + } + } + + override def run() { + this.setName(s"serial-reader(${connection.port})") + loop() + } + + } + + val writeBuffer = ByteBuffer.allocateDirect(bufferSize) + + override def preStart() = { + context watch client + client ! Serial.Opened(connection.port) + Reader.start() + } + + override def receive: Receive = { + + case Serial.Write(data, ack) => + writeBuffer.clear() + data.copyToBuffer(writeBuffer) + val sent = connection.write(writeBuffer) + if (ack != Serial.NoAck) sender ! ack(sent) + + case Serial.Close => + client ! Serial.Closed + context stop self + + case Terminated(`client`) => + context stop self + + // go down with reader thread + case ReaderDied(ex) => throw ex + + } + + override def postStop() = { + connection.close() + } + +} + +private[flow] object SerialOperator { + def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) +} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala index 087fa6e..2d3a6ed 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/SerialSettings.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package ch.jodersky.flow /** * Groups settings used in communication over a serial port. diff --git a/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala new file mode 100644 index 0000000..3126618 --- /dev/null +++ b/flow-core/src/main/scala/ch/jodersky/flow/UnsafeSerial.scala @@ -0,0 +1,108 @@ +package ch.jodersky.flow + +import java.nio.ByteBuffer + +import ch.jodersky.jni.nativeLoader + +/** + * Low-level wrapper of native serial backend. + * + * WARNING: Methods in this class allocate native structures and deal with pointers. These + * pointers are handled as longs by java and are NOT checked for correctness, therefore passing + * invalid pointers may have unexpected results, including but not limited to SEGFAULTing the VM. + * + * See SerialConnection for a higher-level, more secured wrapper + * of serial communication. + * + * @param serialAddr address of natively allocated serial configuration structure + */ +@nativeLoader("flow4") +private[flow] class UnsafeSerial(final val serialAddr: Long) { + + final val ParityNone: Int = 0 + final val ParityOdd: Int = 1 + final val ParityEven: Int = 2 + + /** + * Reads from a previously opened serial port into a direct ByteBuffer. Note that data is only + * read into the buffer's allocated memory, its position or limit are not changed. + * + * The read is blocking, however it may be interrupted by calling cancelRead() on the given + * serial port. + * + * @param buffer direct ByteBuffer to read into + * @return number of bytes actually read + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws PortInterruptedException if the call to this function was interrupted + * @throws IOException on IO error + */ + @native def read(buffer: ByteBuffer): Int + + /** + * Cancels a read (any caller to read or readDirect will return with a + * PortInterruptedException). This function may be called from any thread. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def cancelRead(): Unit + + /** + * Writes data from a direct ByteBuffer to a previously opened serial port. Note that data is + * only taken from the buffer's allocated memory, its position or limit are not changed. + * + * The write is non-blocking, this function returns as soon as the data is copied into the kernel's + * transmission buffer. + * + * @param serial address of natively allocated serial configuration structure + * @param buffer direct ByteBuffer from which data is taken + * @param length actual amount of data that should be taken from the buffer (this is needed since the native + * backend does not provide a way to query the buffer's current limit) + * @return number of bytes actually written + * @throws IllegalArgumentException if the ByteBuffer is not direct + * @throws IOException on IO error + */ + @native def write(buffer: ByteBuffer, length: Int): Int + + /** + * Closes an previously open serial port. Natively allocated resources are freed and the serial + * pointer becomes invalid, therefore this function should only be called ONCE per open serial + * port. + * + * A port should not be closed while it is used (by a read or write) as this + * results in undefined behaviour. + * + * @param serial address of natively allocated serial configuration structure + * @throws IOException on IO error + */ + @native def close(): Unit + +} + +private[flow] object UnsafeSerial { + + /** + * Opens a serial port. + * + * @param port name of serial port to open + * @param characterSize size of a character of the data sent through the serial port + * @param twoStopBits set to use two stop bits instead of one + * @param parity type of parity to use with serial port + * @return address of natively allocated serial configuration structure + * @throws NoSuchPortException if the given port does not exist + * @throws AccessDeniedException if permissions of the current user are not sufficient to open port + * @throws PortInUseException if port is already in use + * @throws InvalidSettingsException if any of the specified settings are invalid + * @throws IOException on IO error + */ + @native def open(port: String, baud: Int, characterSize: Int, twoStopBits: Boolean, parity: Int): Long + + /** + * Sets native debugging mode. If debugging is enabled, detailed error messages + * are printed (to stderr) from native method calls. + * + * @param value set to enable debugging + */ + @native def debug(value: Boolean): Unit + +} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala index 8d89fb4..9fa519b 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/Watcher.scala @@ -1,5 +1,4 @@ -package com.github.jodersky.flow -package internal +package ch.jodersky.flow import akka.actor.{ Actor, ActorRef, Props, Terminated } import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } @@ -8,20 +7,53 @@ import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } import scala.util.{ Failure, Success, Try } -class Watcher(from: Option[ActorRef]) extends Actor { +private[flow] class Watcher(from: Option[ActorRef]) extends Actor { - private val watcher = new Watcher.WatcherThread(self) + case class WatcherDied(reason: Throwable) + object WatcherThread extends Thread { + import Watcher.NewFile - //directory -> subscribers + private val service = FileSystems.getDefault().newWatchService() + + def register(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + this.setName("serial-port-watcher") + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + self.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) + } + } + } + + def close() = service.close // causes the service to throw a ClosedWatchServiceException + } + + + // directory -> subscribers private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] - //directory -> watchkey + // directory -> watchkey private val keys: Map[String, WatchKey] = Map.empty def subscribe(directory: String, client: ActorRef): WatchKey = { val normal = Paths.get(directory).toAbsolutePath val index = normal.toString - val key = keys.getOrElseUpdate(index, watcher.register(normal)) + val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) clients addBinding (index, client) key } @@ -46,12 +78,11 @@ class Watcher(from: Option[ActorRef]) extends Actor { } override def preStart() = { - watcher.setDaemon(true) - watcher.setName("flow-watcher") - watcher.start() + WatcherThread.setDaemon(true) + WatcherThread.start() } - def receive = { + override def receive = { case w @ Serial.Watch(directory, skipInitial) => val normalPath = Paths.get(directory).toAbsolutePath @@ -94,50 +125,19 @@ class Watcher(from: Option[ActorRef]) extends Actor { reply(Serial.Connected(absFile.toString), client) } - case ThreadDied(`watcher`, err) => throw err //go down with watcher thread + case WatcherDied(err) => throw err // go down with watcher thread } override def postStop() = { - watcher.close() + WatcherThread.close() } } -object Watcher { +private[flow] object Watcher { private case class NewFile(directory: Path, file: Path) - private class WatcherThread(actor: ActorRef) extends Thread { - - private val service = FileSystems.getDefault().newWatchService() - - def register(directory: Path) = directory.register(service, ENTRY_CREATE) - - override def run(): Unit = { - var stop = false - while (!stop) { - try { - val key = service.take() - key.pollEvents() foreach { ev => - val event = ev.asInstanceOf[WatchEvent[Path]] - if (event.kind == ENTRY_CREATE) { - val directory = key.watchable().asInstanceOf[Path] - val file = event.context() - actor.tell(NewFile(directory, file), Actor.noSender) - } - } - key.reset() - } catch { - case _: InterruptedException => stop = true - case _: ClosedWatchServiceException => stop = true - case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender) - } - } - } - - def close() = service.close //causes the service to throw a ClosedWatchServiceException - } - def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) } diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala index ebc0e65..ee087a8 100644 --- a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala +++ b/flow-core/src/main/scala/ch/jodersky/flow/exceptions.scala @@ -1,4 +1,4 @@ -package com.github.jodersky.flow +package ch.jodersky.flow /** The requested port could not be found. */ class NoSuchPortException(message: String) extends Exception(message) diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala deleted file mode 100644 index ec0ee27..0000000 --- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ /dev/null @@ -1,54 +0,0 @@ -package com.github.jodersky.flow - -import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala } -import internal.{ Reader, SerialConnection, ThreadDied } -import java.nio.ByteBuffer - -/** - * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager. - * @see SerialManager - */ -class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging { - import SerialOperator._ - import context._ - - val readBuffer = ByteBuffer.allocateDirect(bufferSize) - val reader = new Reader(connection, readBuffer, self, client) - val writeBuffer = ByteBuffer.allocateDirect(bufferSize) - - context.watch(client) - client ! Serial.Opened(connection.port) - reader.start() - - override def postStop = { - connection.close() - } - - def receive: Receive = { - - case Serial.Write(data, ack) => { - writeBuffer.clear() - data.copyToBuffer(writeBuffer) - val sent = connection.write(writeBuffer) - if (ack != Serial.NoAck) sender ! ack(sent) - } - - case Serial.Close => { - client ! Serial.Closed - context stop self - } - - case Terminated(`client`) => { - context stop self - } - - //go down with reader thread - case ThreadDied(`reader`, ex) => throw ex - - } - -} - -object SerialOperator { - def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) -} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala deleted file mode 100644 index a5fdc40..0000000 --- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala +++ /dev/null @@ -1,58 +0,0 @@ -package com.github.jodersky.flow -package internal - -import java.io.{ File, FileOutputStream, InputStream, OutputStream } - -/** Handles loading of the current platform's native library for flow. */ -object NativeLoader { - - private final val BufferSize = 4096 - - private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "") - - private def arch = System.getProperty("os.arch").toLowerCase - - /** Extract a resource from this class loader to a temporary file. */ - private def extract(path: String, prefix: String): Option[File] = { - var in: Option[InputStream] = None - var out: Option[OutputStream] = None - - try { - in = Option(NativeLoader.getClass.getResourceAsStream(path)) - if (in.isEmpty) return None - - val file = File.createTempFile(prefix, "") - out = Some(new FileOutputStream(file)) - - val buffer = new Array[Byte](BufferSize) - var length = -1; - do { - length = in.get.read(buffer) - if (length != -1) out.get.write(buffer, 0, length) - } while (length != -1) - - Some(file) - } finally { - in.foreach(_.close) - out.foreach(_.close) - } - } - - private def loadFromJar(library: String) = { - val fqlib = System.mapLibraryName(library) //fully qualified library name - val path = s"/native/${os}-${arch}/${fqlib}" - extract(path, fqlib) match { - case Some(file) => System.load(file.getAbsolutePath) - case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " + - "the native library does not exist for your specific architecture/OS combination." + - "Could not find " + path + ".") - } - } - - def load(library: String) = try { - System.loadLibrary(library) - } catch { - case ex: UnsatisfiedLinkError => loadFromJar(library) - } - -} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala deleted file mode 100644 index 59ad575..0000000 --- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala +++ /dev/null @@ -1,36 +0,0 @@ -package com.github.jodersky.flow -package internal - -import akka.actor.{ Actor, ActorRef } -import akka.util.ByteString -import java.nio.ByteBuffer - -class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread { - def readLoop() = { - var stop = false - while (!serial.isClosed && !stop) { - try { - buffer.clear() - val length = serial.read(buffer) - buffer.limit(length) - val data = ByteString.fromByteBuffer(buffer) - client.tell(Serial.Received(data), operator) - } catch { - - //don't do anything if port is interrupted - case ex: PortInterruptedException => {} - - //stop and tell operator on other exception - case ex: Exception => { - stop = true - operator.tell(ThreadDied(this, ex), Actor.noSender) - } - } - } - } - - override def run() { - this.setName("flow-reader " + serial.port) - readLoop() - } -} diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala deleted file mode 100644 index 1470aa5..0000000 --- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala +++ /dev/null @@ -1,4 +0,0 @@ -package com.github.jodersky.flow -package internal - -case class ThreadDied(thread: Thread, reason: Exception) |