aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2015-05-24 13:35:03 +0200
committerJakob Odersky <jodersky@gmail.com>2015-05-24 15:53:33 +0200
commitde9db5ec62a649fa36963d9fe01543bd612c0105 (patch)
tree099c4d1ad4cb51addbf222426b5b38dcbee243ed
parenta6a4f44866115e0d9cf12e28877019fbf0a237f5 (diff)
downloadakka-serial-de9db5ec62a649fa36963d9fe01543bd612c0105.tar.gz
akka-serial-de9db5ec62a649fa36963d9fe01543bd612c0105.tar.bz2
akka-serial-de9db5ec62a649fa36963d9fe01543bd612c0105.zip
implement file watcher actor
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala32
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala17
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala6
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala4
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala3
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala3
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala111
7 files changed, 162 insertions, 14 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
index b6e9d62..2abe242 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala
@@ -8,7 +8,7 @@ object Serial extends ExtensionKey[SerialExt] {
/** Base trait for any flow-related messages. */
sealed trait Message
-
+
/** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */
trait Command extends Message
@@ -90,4 +90,34 @@ object Serial extends ExtensionKey[SerialExt] {
*/
case object Closed extends Event
+ /**
+ * Watch a directory for new ports.
+ *
+ * Send this command to the manager to get notifications when a new port (i.e. file) is created in
+ * the given directory.
+ * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message.
+ *
+ * Note: the directory must exist when this message is sent.
+ *
+ * @param directory the directory to watch
+ *
+ * @see Unwatch
+ * @see Connected
+ */
+ case class Watch(directory: String = "/dev") extends Command
+
+ /**
+ * Stop receiving notifications about a previously watched directory.
+ *
+ * @param directory the directory to unwatch
+ */
+ case class Unwatch(directory: String = "/dev") extends Command
+
+ /**
+ * A new port (i.e. file) has been detected.
+ *
+ * @param port the absolute file name of the connected port
+ */
+ case class Connected(port: String) extends Event
+
}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
index eb8c44e..40739ca 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala
@@ -5,9 +5,8 @@ import scala.util.Success
import scala.util.Try
import com.github.jodersky.flow.internal.SerialConnection
+import com.github.jodersky.flow.internal.Watcher
-import Serial.CommandFailed
-import Serial.Open
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.OneForOneStrategy
@@ -27,13 +26,21 @@ class SerialManager extends Actor with ActorLogging {
case _: Exception => Stop
}
+ private val watcher = actorOf(Watcher(self), "watcher")
+
def receive = {
- case open @ Open(port, settings, bufferSize) => Try {
+
+ case open @ Serial.Open(port, settings, bufferSize) => Try {
SerialConnection.open(port, settings)
} match {
case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port))
- case Failure(err) => sender ! CommandFailed(open, err)
+ case Failure(err) => sender ! Serial.CommandFailed(open, err)
}
+
+ case w: Serial.Watch => watcher.forward(w)
+
+ case u: Serial.Unwatch => watcher.forward(u)
+
}
}
@@ -45,4 +52,4 @@ object SerialManager {
case c => c
}
-} \ No newline at end of file
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
index 5524125..f40f4de 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala
@@ -3,7 +3,7 @@ package com.github.jodersky.flow
import java.nio.ByteBuffer
import com.github.jodersky.flow.internal.Reader
-import com.github.jodersky.flow.internal.ReaderDied
+import com.github.jodersky.flow.internal.ThreadDied
import com.github.jodersky.flow.internal.SerialConnection
import Serial.Close
@@ -57,7 +57,7 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto
}
//go down with reader thread
- case ReaderDied(ex) => throw ex
+ case ThreadDied(`reader`, ex) => throw ex
}
@@ -65,4 +65,4 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto
object SerialOperator {
def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client)
-} \ No newline at end of file
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
index 7b3f2ef..42400c8 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala
@@ -27,7 +27,7 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c
//stop and tell operator on other exception
case ex: Exception => {
stop = true
- operator.tell(ReaderDied(ex), Actor.noSender)
+ operator.tell(ThreadDied(this, ex), Actor.noSender)
}
}
}
@@ -37,4 +37,4 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c
this.setName("flow-reader " + serial.port)
readLoop()
}
-} \ No newline at end of file
+}
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
deleted file mode 100644
index 7dd9954..0000000
--- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala
+++ /dev/null
@@ -1,3 +0,0 @@
-package com.github.jodersky.flow.internal
-
-case class ReaderDied(reason: Exception) \ No newline at end of file
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
new file mode 100644
index 0000000..0674354
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala
@@ -0,0 +1,3 @@
+package com.github.jodersky.flow.internal
+
+case class ThreadDied(thread: Thread, reason: Exception)
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
new file mode 100644
index 0000000..4548aa0
--- /dev/null
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
@@ -0,0 +1,111 @@
+package com.github.jodersky.flow.internal
+
+import akka.actor.{ Actor, ActorRef, Props }
+import com.github.jodersky.flow.Serial
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Path, Paths, WatchEvent, WatchKey }
+import java.nio.file.StandardWatchEventKinds._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
+import scala.util.{ Failure, Success, Try }
+
+class Watcher(from: Option[ActorRef]) extends Actor {
+
+ private val watcher = new Watcher.WatcherThread(self)
+
+ //directory -> subscribers
+ private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
+
+ //directory -> watchkey
+ private val keys: Map[String, WatchKey] = Map.empty
+
+ def reply(msg: Any, sender: ActorRef) = {
+ val origin = from match {
+ case Some(ref) => ref
+ case None => self
+ }
+ sender.tell(msg, origin)
+ }
+
+ override def preStart() = {
+ watcher.setDaemon(true)
+ watcher.setName("flow-watcher")
+ watcher.start()
+ }
+
+ def receive = {
+
+ case w @ Serial.Watch(directory) =>
+ val normalPath = Paths.get(directory).toAbsolutePath
+ val normal = normalPath.toString
+
+ Try {
+ keys.getOrElseUpdate(normal, watcher.watch(normalPath))
+ } match {
+ case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
+ case Success(key) => clients addBinding (normal, sender)
+ }
+
+ case u @ Serial.Unwatch(directory) =>
+ val normal = Paths.get(directory).toAbsolutePath.toString
+
+ clients.removeBinding(normal, sender)
+
+ if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
+ keys(normal).cancel()
+ keys -= normal
+ }
+
+ case Watcher.NewFile(directory, file) =>
+ val normal = directory.toAbsolutePath
+ val absFile = normal resolve file
+ clients.getOrElse(normal.toString, Set.empty) foreach { ref =>
+ reply(Serial.Connected(absFile.toString), ref)
+ }
+
+ case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
+
+ }
+
+ override def postStop() = {
+ watcher.close()
+ }
+
+}
+
+object Watcher {
+ private case class NewFile(directory: Path, file: Path)
+
+ private class WatcherThread(actor: ActorRef) extends Thread {
+
+ private val service = FileSystems.getDefault().newWatchService()
+
+ def watch(directory: Path) = directory.register(service, ENTRY_CREATE)
+
+ override def run(): Unit = {
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ actor.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
+ }
+ }
+ }
+
+ def close() = service.close //causes the service to throw a ClosedWatchServiceException
+ }
+
+ def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
+
+}