aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/akka/serial/Watcher.scala
diff options
context:
space:
mode:
Diffstat (limited to 'core/src/main/scala/akka/serial/Watcher.scala')
-rw-r--r--core/src/main/scala/akka/serial/Watcher.scala143
1 files changed, 143 insertions, 0 deletions
diff --git a/core/src/main/scala/akka/serial/Watcher.scala b/core/src/main/scala/akka/serial/Watcher.scala
new file mode 100644
index 0000000..faaf39a
--- /dev/null
+++ b/core/src/main/scala/akka/serial/Watcher.scala
@@ -0,0 +1,143 @@
+package akka.serial
+
+import akka.actor.{ Actor, ActorRef, Props, Terminated }
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
+import java.nio.file.StandardWatchEventKinds._
+import scala.collection.JavaConversions._
+import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
+import scala.util.{ Failure, Success, Try }
+
+private[serial] class Watcher(from: Option[ActorRef]) extends Actor {
+
+ case class WatcherDied(reason: Throwable)
+ object WatcherThread extends Thread {
+ import Watcher.NewFile
+
+ private val service = FileSystems.getDefault().newWatchService()
+
+ def register(directory: Path) = directory.register(service, ENTRY_CREATE)
+
+ override def run(): Unit = {
+ this.setName("serial-port-watcher")
+ var stop = false
+ while (!stop) {
+ try {
+ val key = service.take()
+ key.pollEvents() foreach { ev =>
+ val event = ev.asInstanceOf[WatchEvent[Path]]
+ if (event.kind == ENTRY_CREATE) {
+ val directory = key.watchable().asInstanceOf[Path]
+ val file = event.context()
+ self.tell(NewFile(directory, file), Actor.noSender)
+ }
+ }
+ key.reset()
+ } catch {
+ case _: InterruptedException => stop = true
+ case _: ClosedWatchServiceException => stop = true
+ case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender)
+ }
+ }
+ }
+
+ def close() = service.close // causes the service to throw a ClosedWatchServiceException
+ }
+
+
+ // directory -> subscribers
+ private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
+
+ // directory -> watchkey
+ private val keys: Map[String, WatchKey] = Map.empty
+
+ def subscribe(directory: String, client: ActorRef): WatchKey = {
+ val normal = Paths.get(directory).toAbsolutePath
+ val index = normal.toString
+ val key = keys.getOrElseUpdate(index, WatcherThread.register(normal))
+ clients addBinding (index, client)
+ key
+ }
+
+ def unsubscribe(directory: String, client: ActorRef): Unit = {
+ val index = Paths.get(directory).toAbsolutePath.toString
+
+ clients removeBinding (index, sender)
+
+ if (clients.get(index).isEmpty && keys.get(index).isDefined) {
+ keys(index).cancel()
+ keys -= index
+ }
+ }
+
+ def reply(msg: Any, sender: ActorRef) = {
+ val origin = from match {
+ case Some(ref) => ref
+ case None => self
+ }
+ sender.tell(msg, origin)
+ }
+
+ override def preStart() = {
+ WatcherThread.setDaemon(true)
+ WatcherThread.start()
+ }
+
+ override def receive = {
+
+ case w @ Serial.Watch(directory, skipInitial) =>
+ val normalPath = Paths.get(directory).toAbsolutePath
+ val normal = normalPath.toString
+
+ Try {
+ subscribe(directory, sender)
+ } match {
+ case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
+ case Success(key) =>
+ context watch sender
+ if (!skipInitial) {
+ Files.newDirectoryStream(normalPath) foreach { path =>
+ if (!Files.isDirectory(path)) {
+ reply(Serial.Connected(path.toString), sender)
+ }
+ }
+ }
+ }
+
+ case u @ Serial.Unwatch(directory) =>
+ val normal = Paths.get(directory).toAbsolutePath.toString
+
+ clients.removeBinding(normal, sender)
+
+ if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
+ keys(normal).cancel()
+ keys -= normal
+ }
+
+ case Terminated(client) =>
+ for ((directory, c) <- clients if c == client) {
+ unsubscribe(directory, client)
+ }
+
+ case Watcher.NewFile(directory, file) =>
+ val normal = directory.toAbsolutePath
+ val absFile = normal resolve file
+ clients.getOrElse(normal.toString, Set.empty) foreach { client =>
+ reply(Serial.Connected(absFile.toString), client)
+ }
+
+ case WatcherDied(err) => throw err // go down with watcher thread
+
+ }
+
+ override def postStop() = {
+ WatcherThread.close()
+ }
+
+}
+
+private[serial] object Watcher {
+ private case class NewFile(directory: Path, file: Path)
+
+ def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
+
+}