aboutsummaryrefslogtreecommitdiff
path: root/flow-core/src/main/scala/com/github/jodersky/flow
diff options
context:
space:
mode:
Diffstat (limited to 'flow-core/src/main/scala/com/github/jodersky/flow')
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala9
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala124
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala9
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala48
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala54
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala10
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala19
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala58
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala36
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala158
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala4
-rw-r--r--flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala143
12 files changed, 0 insertions, 672 deletions
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala b/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala
deleted file mode 100644
index 04d64a9..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/Parity.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.github.jodersky.flow
-
-/** Specifies available parities used in serial communication. */
-object Parity extends Enumeration {
- type Parity = Value
- val None = Value(0)
- val Odd = Value(1)
- val Even = Value(2)
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala
deleted file mode 100644
index d3b8873..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ /dev/null
@@ -1,124 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.actor.ExtensionKey
-import akka.util.ByteString
-
-/** Defines messages used by flow's serial IO layer. */
-object Serial extends ExtensionKey[SerialExt] {
-
- /** Base trait for any flow-related messages. */
- sealed trait Message
-
- /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */
- trait Command extends Message
-
- /** A message extending this trait is to be viewed as an event, an in-bound message issued by flow to the client. */
- trait Event extends Message
-
- /** A command has failed. */
- case class CommandFailed(command: Command, reason: Throwable) extends Event
-
- /**
- * Open a new serial port.
- *
- * Send this command to the serial manager to request the opening of a serial port. The manager will
- * attempt to open a serial port with the specified parameters and, if successful, create a `SerialOperator` actor associated to the port.
- * The operator actor acts as an intermediate to the underlying native serial port, dealing with threading issues and dispatching messages.
- *
- * In case the port is successfully opened, the operator will respond with an `Opened` message.
- * In case the port cannot be opened, the manager will respond with a `CommandFailed` message.
- *
- * @param port name of serial port to open
- * @param settings settings of serial port to open
- * @param bufferSize maximum read and write buffer sizes
- */
- case class Open(port: String, settings: SerialSettings, bufferSize: Int = 1024) extends Command
-
- /**
- * A port has been successfully opened.
- *
- * Event sent by a port operator, indicating that a serial port was successfully opened. The sender
- * of this message is the operator associated to the given serial port.
- *
- * @param port name of opened serial port
- */
- case class Opened(port: String) extends Event
-
- /**
- * Data has been received.
- *
- * Event sent by an operator, indicating that data was received on the operator's serial port.
- *
- * @param data data received on the port
- */
- case class Received(data: ByteString) extends Event
-
- /**
- * Write data to a serial port.
- *
- * Send this command to an operator to write the given data to its associated serial port.
- * An acknowledgment may be set, in which case it is sent back to the sender on a successful write.
- * Note that a successful write does not guarantee the actual transmission of data through the serial port,
- * it merely guarantees that the data has been stored in the operating system's kernel buffer, ready to
- * be transmitted.
- *
- * @param data data to be written to port
- * @param ack acknowledgment sent back to sender once data has been enqueued in kernel for sending (the acknowledgment
- * is a function 'number of bytes written => event')
- */
- case class Write(data: ByteString, ack: Int => Event = NoAck) extends Command
-
- /**
- * Special type of acknowledgment that is not sent back.
- */
- case object NoAck extends Function1[Int, Event] {
- def apply(length: Int) = sys.error("cannot apply NoAck")
- }
-
- /**
- * Request closing of port.
- *
- * Send this command to an operator to close its associated port. The operator will respond
- * with a `Closed` message upon closing the serial port.
- */
- case object Close extends Command
-
- /**
- * A port has been closed.
- *
- * Event sent from operator, indicating that its port has been closed.
- */
- case object Closed extends Event
-
- /**
- * Watch a directory for new ports.
- *
- * Send this command to the manager to get notifications when a new port (i.e. file) is created in
- * the given directory.
- * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message.
- *
- * Note: the sender is also notified of currently existing ports.
- *
- * @param directory the directory to watch
- * @param skipInitial don't get notified of already existing ports
- *
- * @see Unwatch
- * @see Connected
- */
- case class Watch(directory: String = "/dev", skipInitial: Boolean = false) extends Command
-
- /**
- * Stop receiving notifications about a previously watched directory.
- *
- * @param directory the directory to unwatch
- */
- case class Unwatch(directory: String = "/dev") extends Command
-
- /**
- * A new port (i.e. file) has been detected.
- *
- * @param port the absolute file name of the connected port
- */
- case class Connected(port: String) extends Event
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala
deleted file mode 100644
index 5c1ddf6..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialExt.scala
+++ /dev/null
@@ -1,9 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.actor.{ ExtendedActorSystem, Props }
-import akka.io.IO
-
-/** Provides the serial IO manager. */
-class SerialExt(system: ExtendedActorSystem) extends IO.Extension {
- lazy val manager = system.systemActorOf(Props(classOf[SerialManager]), name = "IO-SERIAL")
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala
deleted file mode 100644
index d163b0a..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ /dev/null
@@ -1,48 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.actor.{ Actor, ActorLogging, OneForOneStrategy }
-import akka.actor.SupervisorStrategy.{ Escalate, Stop }
-import internal.{ SerialConnection, Watcher }
-import scala.util.{ Failure, Success, Try }
-
-/**
- * Entry point to the serial API. Actor that manages serial port creation. Once opened, a serial port is handed over to
- * a dedicated operator actor that acts as an intermediate between client code and the native system serial port.
- * @see SerialOperator
- */
-class SerialManager extends Actor with ActorLogging {
- import SerialManager._
- import context._
-
- override val supervisorStrategy = OneForOneStrategy() {
- case _: Exception if sender == watcher => Escalate
- case _: Exception => Stop
- }
-
- private val watcher = actorOf(Watcher(self), "watcher")
-
- def receive = {
-
- case open @ Serial.Open(port, settings, bufferSize) => Try {
- SerialConnection.open(port, settings)
- } match {
- case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
- case Failure(err) => sender ! Serial.CommandFailed(open, err)
- }
-
- case w: Serial.Watch => watcher.forward(w)
-
- case u: Serial.Unwatch => watcher.forward(u)
-
- }
-
-}
-
-object SerialManager {
-
- private def escapePortString(port: String) = port collect {
- case '/' => '-'
- case c => c
- }
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
deleted file mode 100644
index ec0ee27..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-package com.github.jodersky.flow
-
-import akka.actor.{ Actor, ActorLogging, ActorRef, Props, Terminated, actorRef2Scala }
-import internal.{ Reader, SerialConnection, ThreadDied }
-import java.nio.ByteBuffer
-
-/**
- * Operator associated to an open serial port. All communication with a port is done via an operator. Operators are created though the serial manager.
- * @see SerialManager
- */
-class SerialOperator(connection: SerialConnection, bufferSize: Int, client: ActorRef) extends Actor with ActorLogging {
- import SerialOperator._
- import context._
-
- val readBuffer = ByteBuffer.allocateDirect(bufferSize)
- val reader = new Reader(connection, readBuffer, self, client)
- val writeBuffer = ByteBuffer.allocateDirect(bufferSize)
-
- context.watch(client)
- client ! Serial.Opened(connection.port)
- reader.start()
-
- override def postStop = {
- connection.close()
- }
-
- def receive: Receive = {
-
- case Serial.Write(data, ack) => {
- writeBuffer.clear()
- data.copyToBuffer(writeBuffer)
- val sent = connection.write(writeBuffer)
- if (ack != Serial.NoAck) sender ! ack(sent)
- }
-
- case Serial.Close => {
- client ! Serial.Closed
- context stop self
- }
-
- case Terminated(`client`) => {
- context stop self
- }
-
- //go down with reader thread
- case ThreadDied(`reader`, ex) => throw ex
-
- }
-
-}
-
-object SerialOperator {
- def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala b/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
deleted file mode 100644
index 087fa6e..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/SerialSettings.scala
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.github.jodersky.flow
-
-/**
- * Groups settings used in communication over a serial port.
- * @param baud baud rate to use with serial port
- * @param characterSize size of a character of the data sent through the serial port
- * @param twoStopBits set to use two stop bits instead of one
- * @param parity type of parity to use with serial port
- */
-case class SerialSettings(baud: Int, characterSize: Int = 8, twoStopBits: Boolean = false, parity: Parity.Parity = Parity.None)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala b/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala
deleted file mode 100644
index ebc0e65..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/exceptions.scala
+++ /dev/null
@@ -1,19 +0,0 @@
-package com.github.jodersky.flow
-
-/** The requested port could not be found. */
-class NoSuchPortException(message: String) extends Exception(message)
-
-/** The requested port is in use by someone else. */
-class PortInUseException(message: String) extends Exception(message)
-
-/** Permissions are not sufficient to open a serial port. */
-class AccessDeniedException(message: String) extends Exception(message)
-
-/** The settings specified are invalid. */
-class InvalidSettingsException(message: String) extends Exception(message)
-
-/** A blocking operation on a port was interrupted, most likely indicating that the port is closing. */
-class PortInterruptedException(message: String) extends Exception(message)
-
-/** The specified port has been closed. */
-class PortClosedException(message: String) extends Exception(message)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
deleted file mode 100644
index a5fdc40..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/NativeLoader.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import java.io.{ File, FileOutputStream, InputStream, OutputStream }
-
-/** Handles loading of the current platform's native library for flow. */
-object NativeLoader {
-
- private final val BufferSize = 4096
-
- private def os = System.getProperty("os.name").toLowerCase.replaceAll("\\s", "")
-
- private def arch = System.getProperty("os.arch").toLowerCase
-
- /** Extract a resource from this class loader to a temporary file. */
- private def extract(path: String, prefix: String): Option[File] = {
- var in: Option[InputStream] = None
- var out: Option[OutputStream] = None
-
- try {
- in = Option(NativeLoader.getClass.getResourceAsStream(path))
- if (in.isEmpty) return None
-
- val file = File.createTempFile(prefix, "")
- out = Some(new FileOutputStream(file))
-
- val buffer = new Array[Byte](BufferSize)
- var length = -1;
- do {
- length = in.get.read(buffer)
- if (length != -1) out.get.write(buffer, 0, length)
- } while (length != -1)
-
- Some(file)
- } finally {
- in.foreach(_.close)
- out.foreach(_.close)
- }
- }
-
- private def loadFromJar(library: String) = {
- val fqlib = System.mapLibraryName(library) //fully qualified library name
- val path = s"/native/${os}-${arch}/${fqlib}"
- extract(path, fqlib) match {
- case Some(file) => System.load(file.getAbsolutePath)
- case None => throw new UnsatisfiedLinkError("Cannot extract flow's native library, " +
- "the native library does not exist for your specific architecture/OS combination." +
- "Could not find " + path + ".")
- }
- }
-
- def load(library: String) = try {
- System.loadLibrary(library)
- } catch {
- case ex: UnsatisfiedLinkError => loadFromJar(library)
- }
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
deleted file mode 100644
index 59ad575..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import akka.actor.{ Actor, ActorRef }
-import akka.util.ByteString
-import java.nio.ByteBuffer
-
-class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, client: ActorRef) extends Thread {
- def readLoop() = {
- var stop = false
- while (!serial.isClosed && !stop) {
- try {
- buffer.clear()
- val length = serial.read(buffer)
- buffer.limit(length)
- val data = ByteString.fromByteBuffer(buffer)
- client.tell(Serial.Received(data), operator)
- } catch {
-
- //don't do anything if port is interrupted
- case ex: PortInterruptedException => {}
-
- //stop and tell operator on other exception
- case ex: Exception => {
- stop = true
- operator.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
- }
-
- override def run() {
- this.setName("flow-reader " + serial.port)
- readLoop()
- }
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
deleted file mode 100644
index 73416e3..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/SerialConnection.scala
+++ /dev/null
@@ -1,158 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import java.nio.ByteBuffer
-import java.util.concurrent.atomic.AtomicBoolean
-
-/**
- * Represents a serial connection in a more secure and object-oriented style than `NativeSerial`. In contrast
- * to the latter, this class encapsulates and secures any pointers used to communicate with the native
- * backend and is thread-safe.
- *
- * The underlying serial port is assumed open when this class is initialized.
- */
-class SerialConnection private (
- val port: String,
- val settings: SerialSettings,
- private val pointer: Long
-) {
-
- import SerialConnection._
-
- private var reading: Boolean = false
- private val readLock = new Object
-
- private var writing: Boolean = false
- private val writeLock = new Object
-
- private val closed = new AtomicBoolean(false)
-
- /**
- * Checks if this serial port is closed.
- */
- def isClosed = closed.get()
-
- /**
- * Closes the underlying serial connection. Any callers blocked on read or write will return.
- * A call of this method has no effect if the serial port is already closed.
- * @throws IOException on IO error
- */
- def close(): Unit = this.synchronized {
- if (!closed.get) {
- closed.set(true)
- NativeSerial.cancelRead(pointer)
- readLock.synchronized {
- while (reading) this.wait()
- }
- writeLock.synchronized {
- while (writing) this.wait()
- }
- NativeSerial.close(pointer)
- }
- }
-
- /**
- * Reads data from underlying serial connection into a ByteBuffer.
- * Note that data is read into the buffer's memory, its attributes
- * such as position and limit are not modified.
- *
- * A call to this method is blocking, however it is interrupted
- * if the connection is closed.
- *
- * This method works for direct and indirect buffers but is optimized
- * for the former.
- *
- * @param buffer a ByteBuffer into which data is read
- * @return the actual number of bytes read
- * @throws PortInterruptedException if port is closed while reading
- * @throws IOException on IO error
- */
- def read(buffer: ByteBuffer): Int = readLock.synchronized {
- if (!closed.get) {
- reading = true
- try {
- transfer(
- b => NativeSerial.readDirect(pointer, b),
- b => NativeSerial.read(pointer, b.array())
- )(buffer)
- } finally {
- reading = false
- if (closed.get) readLock.notify()
- }
- } else {
- throw new PortClosedException(s"port ${port} is closed")
- }
- }
-
- /**
- * Writes data from a ByteBuffer to underlying serial connection.
- * Note that data is read from the buffer's memory, its attributes
- * such as position and limit are not modified.
- *
- * The write is non-blocking, this function returns as soon as the data is copied into the kernel's
- * transmission buffer.
- *
- * This method works for direct and indirect buffers but is optimized
- * for the former.
- *
- * @param buffer a ByteBuffer from which data is taken
- * @return the actual number of bytes written
- * @throws IOException on IO error
- */
- def write(buffer: ByteBuffer): Int = writeLock.synchronized {
- if (!closed.get) {
- writing = true
- try {
- transfer(
- b => NativeSerial.writeDirect(pointer, b, b.position()),
- b => NativeSerial.write(pointer, b.array(), b.position())
- )(buffer)
- } finally {
- writing = false
- if (closed.get) writeLock.notify()
- }
- } else {
- throw new PortClosedException(s"port ${port} is closed")
- }
- }
-
- private def transfer[A](direct: ByteBuffer => A, indirect: ByteBuffer => A)(buffer: ByteBuffer): A = if (buffer.isDirect()) {
- direct(buffer)
- } else if (buffer.hasArray()) {
- indirect(buffer)
- } else {
- throw new IllegalArgumentException("buffer is not direct and has no array");
- }
-
-}
-
-object SerialConnection {
- import NativeSerial._
-
- /**
- * Opens a new connection to a serial port.
- * This method acts as a factory to creating serial connections.
- *
- * @param port name of serial port to open
- * @param settings settings with which to initialize the connection
- * @return an instance of the open serial connection
- * @throws NoSuchPortException if the given port does not exist
- * @throws AccessDeniedException if permissions of the current user are not sufficient to open port
- * @throws PortInUseException if port is already in use
- * @throws InvalidSettingsException if any of the specified settings are invalid
- * @throws IOException on IO error
- */
- def open(port: String, settings: SerialSettings): SerialConnection = synchronized {
- val pointer = NativeSerial.open(port, settings.baud, settings.characterSize, settings.twoStopBits, settings.parity.id)
- new SerialConnection(port, settings, pointer)
- }
-
- /**
- * Sets native debugging mode. If debugging is enabled, detailed error messages
- * are printed (to stderr) from native method calls.
- *
- * @param value set to enable debugging
- */
- def debug(value: Boolean) = NativeSerial.debug(value)
-
-}
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
deleted file mode 100644
index 1470aa5..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-case class ThreadDied(thread: Thread, reason: Exception)
diff --git a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
deleted file mode 100644
index 8d89fb4..0000000
--- a/flow-core/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-package com.github.jodersky.flow
-package internal
-
-import akka.actor.{ Actor, ActorRef, Props, Terminated }
-import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
-import java.nio.file.StandardWatchEventKinds._
-import scala.collection.JavaConversions._
-import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
-import scala.util.{ Failure, Success, Try }
-
-class Watcher(from: Option[ActorRef]) extends Actor {
-
- private val watcher = new Watcher.WatcherThread(self)
-
- //directory -> subscribers
- private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
-
- //directory -> watchkey
- private val keys: Map[String, WatchKey] = Map.empty
-
- def subscribe(directory: String, client: ActorRef): WatchKey = {
- val normal = Paths.get(directory).toAbsolutePath
- val index = normal.toString
- val key = keys.getOrElseUpdate(index, watcher.register(normal))
- clients addBinding (index, client)
- key
- }
-
- def unsubscribe(directory: String, client: ActorRef): Unit = {
- val index = Paths.get(directory).toAbsolutePath.toString
-
- clients removeBinding (index, sender)
-
- if (clients.get(index).isEmpty && keys.get(index).isDefined) {
- keys(index).cancel()
- keys -= index
- }
- }
-
- def reply(msg: Any, sender: ActorRef) = {
- val origin = from match {
- case Some(ref) => ref
- case None => self
- }
- sender.tell(msg, origin)
- }
-
- override def preStart() = {
- watcher.setDaemon(true)
- watcher.setName("flow-watcher")
- watcher.start()
- }
-
- def receive = {
-
- case w @ Serial.Watch(directory, skipInitial) =>
- val normalPath = Paths.get(directory).toAbsolutePath
- val normal = normalPath.toString
-
- Try {
- subscribe(directory, sender)
- } match {
- case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
- case Success(key) =>
- context watch sender
- if (!skipInitial) {
- Files.newDirectoryStream(normalPath) foreach { path =>
- if (!Files.isDirectory(path)) {
- reply(Serial.Connected(path.toString), sender)
- }
- }
- }
- }
-
- case u @ Serial.Unwatch(directory) =>
- val normal = Paths.get(directory).toAbsolutePath.toString
-
- clients.removeBinding(normal, sender)
-
- if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
- keys(normal).cancel()
- keys -= normal
- }
-
- case Terminated(client) =>
- for ((directory, c) <- clients if c == client) {
- unsubscribe(directory, client)
- }
-
- case Watcher.NewFile(directory, file) =>
- val normal = directory.toAbsolutePath
- val absFile = normal resolve file
- clients.getOrElse(normal.toString, Set.empty) foreach { client =>
- reply(Serial.Connected(absFile.toString), client)
- }
-
- case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
-
- }
-
- override def postStop() = {
- watcher.close()
- }
-
-}
-
-object Watcher {
- private case class NewFile(directory: Path, file: Path)
-
- private class WatcherThread(actor: ActorRef) extends Thread {
-
- private val service = FileSystems.getDefault().newWatchService()
-
- def register(directory: Path) = directory.register(service, ENTRY_CREATE)
-
- override def run(): Unit = {
- var stop = false
- while (!stop) {
- try {
- val key = service.take()
- key.pollEvents() foreach { ev =>
- val event = ev.asInstanceOf[WatchEvent[Path]]
- if (event.kind == ENTRY_CREATE) {
- val directory = key.watchable().asInstanceOf[Path]
- val file = event.context()
- actor.tell(NewFile(directory, file), Actor.noSender)
- }
- }
- key.reset()
- } catch {
- case _: InterruptedException => stop = true
- case _: ClosedWatchServiceException => stop = true
- case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
- }
- }
- }
-
- def close() = service.close //causes the service to throw a ClosedWatchServiceException
- }
-
- def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
-
-}