aboutsummaryrefslogtreecommitdiff
path: root/flow-core/src/main/scala/com/github/jodersky/flow/internal
diff options
context:
space:
mode:
Diffstat (limited to 'flow-core/src/main/scala/com/github/jodersky/flow/internal')
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala58
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala36
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala158
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala4
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala143
5 files changed, 0 insertions, 399 deletions
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
deleted file mode 100644
index a5fdc40..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import java.io.{ File, FileOutputStream, InputStream, OutputStream }
-
-/** Handles loading of the current platform's native library for flow. */
-object NativeLoader {
-
- private final val BufferSize = 4096
-
- private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "")
-
- private def arch = System.getProperty("os.arch").toLowerCase
-
- /** Extract a resource from this class loader to a temporary file. */
- private def extract(path: String, prefix: String): Option[File] = {
- var in: Option[InputStream] = None
- var out: Option[OutputStream] = None
-
- try {
- in = Option(NativeLoader.getClass.getResourceAsStream(path))
- if (in.isEmpty) return None
-
- val file = File.createTempFile(prefix, "")
- out = Some(new FileOutputStream(file))
-
- val buffer = new Array[Byte](BufferSize)
- var length = -1;
- do {
- length = in.get.read(buffer)
- if (length != -1) out.get.write(buffer, 0, length)
- } while (length != -1)
-
- Some(file)
- } finally {
- in.foreach(_.close)
- out.foreach(_.close)
- }
- }
-
- private def loadFromJar(library: String) = {
- val fqlib = System.mapLibraryName(library) //fully qualified library name
- val path = s"/native/${os}-${arch}/${fqlib}"
- extract(path, fqlib) match {
- case Some(file) => System.load(file.getAbsolutePath)
- case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " +
- "the native library does not exist for your specific architecture/OS combination." +
- "Could not find " + path + ".")
- }
- }
-
- def load(library: String) = try {
- System.loadLibrary(library)
- } catch {
- case ex: UnsatisfiedLinkError => loadFromJar(library)
- }
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
deleted file mode 100644
index 59ad575..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import akka.actor.{ Actor, ActorRef }
-import akka.util.ByteString
-import java.nio.ByteBuffer
-
-class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread {
- def readLoop() = {
- var stop = false
- while (!serial.isClosed && !stop) {
- try {
- buffer.clear()
- val length = serial.read(buffer)
- buffer.limit(length)
- val data = ByteString.fromByteBuffer(buffer)
- client.tell(Serial.Received(data), operator)
- } catch {
-
- //don't do anything if port is interrupted
- case ex: PortInterruptedException => {}
-
- //stop and tell operator on other exception
- case ex: Exception => {
- stop = true
- operator.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
- }
-
- override def run() {
- this.setName("flow-reader " + serial.port)
- readLoop()
- }
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
deleted file mode 100644
index 73416e3..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicBoolean
-
-/**
- * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast
- * to the latter, this class encapsulates and secures any pointers used to communicate with the native
- * backend and is thread-safe.
- *
- * The underlying serial port is assumed open when this class is initialized.
- */
-class SerialConnection private (
- val port: String,
- val settings: SerialSettings,
- private val pointer: Long
-) {
-
- import SerialConnection._
-
- private var reading: Boolean = false
- private val readLock = new Object
-
- private var writing: Boolean = false
- private val writeLock = new Object
-
- private val closed = new AtomicBoolean(false)
-
- /**
- * Checks if this serial port is closed.
- */
- def isClosed = closed.get()
-
- /**
- * Closes the underlying serial connection. Any callers blocked on read or write will return.
- * A call of this method has no effect if the serial port is already closed.
- * @throws IOException on IO error
- */
- def close(): Unit = this.synchronized {
- if (!closed.get) {
- closed.set(true)
- NativeSerial.cancelRead(pointer)
- readLock.synchronized {
- while (reading) this.wait()
- }
- writeLock.synchronized {
- while (writing) this.wait()
- }
- NativeSerial.close(pointer)
- }
- }
-
- /**
- * Reads data from underlying serial connection into a ByteBuffer.
- * Note that data is read into the buffer's memory, its attributes
- * such as position and limit are not modified.
- *
- * A call to this method is blocking, however it is interrupted
- * if the connection is closed.
- *
- * This method works for direct and indirect buffers but is optimized
- * for the former.
- *
- * @param buffer a ByteBuffer into which data is read
- * @return the actual number of bytes read
- * @throws PortInterruptedException if port is closed while reading
- * @throws IOException on IO error
- */
- def read(buffer: ByteBuffer): Int = readLock.synchronized {
- if (!closed.get) {
- reading = true
- try {
- transfer(
- b => NativeSerial.readDirect(pointer, b),
- b => NativeSerial.read(pointer, b.array())
- )(buffer)
- } finally {
- reading = false
- if (closed.get) readLock.notify()
- }
- } else {
- throw new PortClosedException(s"port ${port} is closed")
- }
- }
-
- /**
- * Writes data from a ByteBuffer to underlying serial connection.
- * Note that data is read from the buffer's memory, its attributes
- * such as position and limit are not modified.
- *
- * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
- * transmission buffer.
- *
- * This method works for direct and indirect buffers but is optimized
- * for the former.
- *
- * @param buffer a ByteBuffer from which data is taken
- * @return the actual number of bytes written
- * @throws IOException on IO error
- */
- def write(buffer: ByteBuffer): Int = writeLock.synchronized {
- if (!closed.get) {
- writing = true
- try {
- transfer(
- b => NativeSerial.writeDirect(pointer, b, b.position()),
- b => NativeSerial.write(pointer, b.array(), b.position())
- )(buffer)
- } finally {
- writing = false
- if (closed.get) writeLock.notify()
- }
- } else {
- throw new PortClosedException(s"port ${port} is closed")
- }
- }
-
- private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) {
- direct(buffer)
- } else if (buffer.hasArray()) {
- indirect(buffer)
- } else {
- throw new IllegalArgumentException("buffer is not direct and has no array");
- }
-
-}
-
-object SerialConnection {
- import NativeSerial._
-
- /**
- * Opens a new connection to a serial port.
- * This method acts as a factory to creating serial connections.
- *
- * @param port name of serial port to open
- * @param settings settings with which to initialize the connection
- * @return an instance of the open serial connection
- * @throws NoSuchPortException if the given port does not exist
- * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
- * @throws PortInUseException if port is already in use
- * @throws InvalidSettingsException if any of the specified settings are invalid
- * @throws IOException on IO error
- */
- def open(port: String, settings: SerialSettings): SerialConnection = synchronized {
- val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id)
- new SerialConnection(port, settings, pointer)
- }
-
- /**
- * Sets native debugging mode. If debugging is enabled, detailed error messages
- * are printed (to stderr) from native method calls.
- *
- * @param value set to enable debugging
- */
- def debug(value: Boolean) = NativeSerial.debug(value)
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
deleted file mode 100644
index 1470aa5..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-case class ThreadDied(thread: Thread, reason: Exception)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
deleted file mode 100644
index 8d89fb4..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import akka.actor.{ Actor, ActorRef, Props, Terminated }
-import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
-import java.nio.file.StandardWatchEventKinds._
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
-import scala.util.{ Failure, Success, Try }
-
-class Watcher(from: Option[ActorRef]) extends Actor {
-
- private val watcher = new Watcher.WatcherThread(self)
-
- //directory -> subscribers
- private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
-
- //directory -> watchkey
- private val keys: Map[String, WatchKey] = Map.empty
-
- def subscribe(directory: String, client: ActorRef): WatchKey = {
- val normal = Paths.get(directory).toAbsolutePath
- val index = normal.toString
- val key = keys.getOrElseUpdate(index, watcher.register(normal))
- clients addBinding (index, client)
- key
- }
-
- def unsubscribe(directory: String, client: ActorRef): Unit = {
- val index = Paths.get(directory).toAbsolutePath.toString
-
- clients removeBinding (index, sender)
-
- if (clients.get(index).isEmpty && keys.get(index).isDefined) {
- keys(index).cancel()
- keys -= index
- }
- }
-
- def reply(msg: Any, sender: ActorRef) = {
- val origin = from match {
- case Some(ref) => ref
- case None => self
- }
- sender.tell(msg, origin)
- }
-
- override def preStart() = {
- watcher.setDaemon(true)
- watcher.setName("flow-watcher")
- watcher.start()
- }
-
- def receive = {
-
- case w @ Serial.Watch(directory, skipInitial) =>
- val normalPath = Paths.get(directory).toAbsolutePath
- val normal = normalPath.toString
-
- Try {
- subscribe(directory, sender)
- } match {
- case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
- case Success(key) =>
- context watch sender
- if (!skipInitial) {
- Files.newDirectoryStream(normalPath) foreach { path =>
- if (!Files.isDirectory(path)) {
- reply(Serial.Connected(path.toString), sender)
- }
- }
- }
- }
-
- case u @ Serial.Unwatch(directory) =>
- val normal = Paths.get(directory).toAbsolutePath.toString
-
- clients.removeBinding(normal, sender)
-
- if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
- keys(normal).cancel()
- keys -= normal
- }
-
- case Terminated(client) =>
- for ((directory, c) <- clients if c == client) {
- unsubscribe(directory, client)
- }
-
- case Watcher.NewFile(directory, file) =>
- val normal = directory.toAbsolutePath
- val absFile = normal resolve file
- clients.getOrElse(normal.toString, Set.empty) foreach { client =>
- reply(Serial.Connected(absFile.toString), client)
- }
-
- case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
-
- }
-
- override def postStop() = {
- watcher.close()
- }
-
-}
-
-object Watcher {
- private case class NewFile(directory: Path, file: Path)
-
- private class WatcherThread(actor: ActorRef) extends Thread {
-
- private val service = FileSystems.getDefault().newWatchService()
-
- def register(directory: Path) = directory.register(service, ENTRY_CREATE)
-
- override def run(): Unit = {
- var stop = false
- while (!stop) {
- try {
- val key = service.take()
- key.pollEvents() foreach { ev =>
- val event = ev.asInstanceOf[WatchEvent[Path]]
- if (event.kind == ENTRY_CREATE) {
- val directory = key.watchable().asInstanceOf[Path]
- val file = event.context()
- actor.tell(NewFile(directory, file), Actor.noSender)
- }
- }
- key.reset()
- } catch {
- case _: InterruptedException => stop = true
- case _: ClosedWatchServiceException => stop = true
- case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
-
- def close() = service.close //causes the service to throw a ClosedWatchServiceException
- }
-
- def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
-
-}