path: root/flow-main
diff options
Diffstat (limited to 'flow-main')
7 files changed, 162 insertions, 14 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
index b6e9d62..2abe242 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -8,7 +8,7 @@ object Serial extends ExtensionKey[SerialExt] {
/** Base trait for any flow-related messages. */
sealed trait Message
/** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */
trait Command extends Message
@@ -90,4 +90,34 @@ object Serial extends ExtensionKey[SerialExt] {
case object Closed extends Event
+ /**
+ * Watch a directory for new ports.
+ *
+ * Send this command to the manager to get notifications when a new port (i.e. file) is created in
+ * the given directory.
+ * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message.
+ *
+ * Note: the directory must exist when this message is sent.
+ *
+ * @param directory the directory to watch
+ *
+ * @see Unwatch
+ * @see Connected
+ */
+ case class Watch(directory: String = "/dev") extends Command
+ /**
+ * Stop receiving notifications about a previously watched directory.
+ *
+ * @param directory the directory to unwatch
+ */
+ case class Unwatch(directory: String = "/dev") extends Command
+ /**
+ * A new port (i.e. file) has been detected.
+ *
+ * @param port the absolute file name of the connected port
+ */
+ case class Connected(port: String) extends Event
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
index eb8c44e..40739ca 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -5,9 +5,8 @@ import scala.util.Success
import scala.util.Try
import com.github.jodersky.flow.internal.SerialConnection
+import com.github.jodersky.flow.internal.Watcher
-import Serial.CommandFailed
-import Serial.Open
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
@@ -27,13 +26,21 @@ class SerialManager extends Actor with ActorLogging {
case _: Exception => Stop
+ private val watcher = actorOf(Watcher(self), "watcher")
def receive = {
- case open @ Open(port, settings, bufferSize) => Try {
+ case open @ Serial.Open(port, settings, bufferSize) => Try {
SerialConnection.open(port, settings)
} match {
case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
- case Failure(err) => sender ! CommandFailed(open, err)
+ case Failure(err) => sender ! Serial.CommandFailed(open, err)
+ case w: Serial.Watch => watcher.forward(w)
+ case u: Serial.Unwatch => watcher.forward(u)
@@ -45,4 +52,4 @@ object SerialManager {
case c => c
-} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 5524125..f40f4de 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -3,7 +3,7 @@ package com.github.jodersky.flow
import java.nio.ByteBuffer
import com.github.jodersky.flow.internal.Reader
-import com.github.jodersky.flow.internal.ReaderDied
+import com.github.jodersky.flow.internal.ThreadDied
import com.github.jodersky.flow.internal.SerialConnection
import Serial.Close
@@ -57,7 +57,7 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto
//go down with reader thread
- case ReaderDied(ex) => throw ex
+ case ThreadDied(`reader`, ex) => throw ex
@@ -65,4 +65,4 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto
object SerialOperator {
def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
-} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
index 7b3f2ef..42400c8 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
@@ -27,7 +27,7 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c
//stop and tell operator on other exception
case ex: Exception => {
stop = true
- operator.tell(ReaderDied(ex), Actor.noSender)
+ operator.tell(ThreadDied(this, ex), Actor.noSender)
@@ -37,4 +37,4 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c
this.setName("flow-reader " + serial.port)
-} \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
deleted file mode 100644
index 7dd9954..0000000
--- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.flow.internal
-case class ReaderDied(reason: Exception) \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
new file mode 100644
index 0000000..0674354
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
@@ -0,0 +1,3 @@
+package com.github.jodersky.flow.internal
+case class ThreadDied(thread: Thread, reason: Exception)
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
new file mode 100644
index 0000000..4548aa0
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
@@ -0,0 +1,111 @@
+package com.github.jodersky.flow.internal
+import akka.actor.{ Actor, ActorRef, Props }
+import com.github.jodersky.flow.Serial
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Path, Paths, WatchEvent, WatchKey }
+import java.nio.file.StandardWatchEventKinds._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
+import scala.util.{ Failure, Success, Try }
+class Watcher(from: Option[ActorRef]) extends Actor {
+ private val watcher = new Watcher.WatcherThread(self)
+ //directory -> subscribers
+ private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
+ //directory -> watchkey
+ private val keys: Map[String, WatchKey] = Map.empty
+ def reply(msg: Any, sender: ActorRef) = {
+ val origin = from match {
+ case Some(ref) => ref
+ case None => self
+ }
+ sender.tell(msg, origin)
+ }
+ override def preStart() = {
+ watcher.setDaemon(true)
+ watcher.setName("flow-watcher")
+ watcher.start()
+ }
+ def receive = {
+ case w @ Serial.Watch(directory) =>
+ val normalPath = Paths.get(directory).toAbsolutePath
+ val normal = normalPath.toString
+ Try {
+ keys.getOrElseUpdate(normal, watcher.watch(normalPath))
+ } match {
+ case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
+ case Success(key) => clients addBinding (normal, sender)
+ }
+ case u @ Serial.Unwatch(directory) =>
+ val normal = Paths.get(directory).toAbsolutePath.toString
+ clients.removeBinding(normal, sender)
+ if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
+ keys(normal).cancel()
+ keys -= normal
+ }
+ case Watcher.NewFile(directory, file) =>
+ val normal = directory.toAbsolutePath
+ val absFile = normal resolve file
+ clients.getOrElse(normal.toString, Set.empty) foreach { ref =>
+ reply(Serial.Connected(absFile.toString), ref)
+ }
+ case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
+ }
+ override def postStop() = {
+ watcher.close()
+ }
+object Watcher {
+ private case class NewFile(directory: Path, file: Path)
+ private class WatcherThread(actor: ActorRef) extends Thread {
+ private val service = FileSystems.getDefault().newWatchService()
+ def watch(directory: Path) = directory.register(service, ENTRY_CREATE)
+ override def run(): Unit = {
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ actor.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
+ }
+ }
+ }
+ def close() = service.close //causes the service to throw a ClosedWatchServiceException
+ }
+ def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))