aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jodersky@gmail.com>2015-05-25 11:41:03 +0200
committerJakob Odersky <jodersky@gmail.com>2015-05-25 11:41:03 +0200
commit7db412df69ded4f035cfa21c5a49451d31877d23 (patch)
treea2f02e1f80d71f7116689c061a021c3e30a52a5e
parentb31d86a44b0e3b7804877c2b938dc3cefcf1ab74 (diff)
downloadakka-serial-7db412df69ded4f035cfa21c5a49451d31877d23.tar.gz
akka-serial-7db412df69ded4f035cfa21c5a49451d31877d23.tar.bz2
akka-serial-7db412df69ded4f035cfa21c5a49451d31877d23.zip
add deathwatch to watchers
-rw-r--r--flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala38
1 files changed, 31 insertions, 7 deletions
diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
index c8ca374..8d89fb4 100644
--- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
+++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala
@@ -1,8 +1,8 @@
package com.github.jodersky.flow
package internal
-import akka.actor.{ Actor, ActorRef, Props }
-import java.nio.file.{ ClosedWatchServiceException, Files, FileSystems, Path, Paths, WatchEvent, WatchKey }
+import akka.actor.{ Actor, ActorRef, Props, Terminated }
+import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
@@ -18,6 +18,25 @@ class Watcher(from: Option[ActorRef]) extends Actor {
//directory -> watchkey
private val keys: Map[String, WatchKey] = Map.empty
+ def subscribe(directory: String, client: ActorRef): WatchKey = {
+ val normal = Paths.get(directory).toAbsolutePath
+ val index = normal.toString
+ val key = keys.getOrElseUpdate(index, watcher.register(normal))
+ clients addBinding (index, client)
+ key
+ }
+
+ def unsubscribe(directory: String, client: ActorRef): Unit = {
+ val index = Paths.get(directory).toAbsolutePath.toString
+
+ clients removeBinding (index, sender)
+
+ if (clients.get(index).isEmpty && keys.get(index).isDefined) {
+ keys(index).cancel()
+ keys -= index
+ }
+ }
+
def reply(msg: Any, sender: ActorRef) = {
val origin = from match {
case Some(ref) => ref
@@ -39,11 +58,11 @@ class Watcher(from: Option[ActorRef]) extends Actor {
val normal = normalPath.toString
Try {
- keys.getOrElseUpdate(normal, watcher.watch(normalPath))
+ subscribe(directory, sender)
} match {
case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
case Success(key) =>
- clients addBinding (normal, sender)
+ context watch sender
if (!skipInitial) {
Files.newDirectoryStream(normalPath) foreach { path =>
if (!Files.isDirectory(path)) {
@@ -63,11 +82,16 @@ class Watcher(from: Option[ActorRef]) extends Actor {
keys -= normal
}
+ case Terminated(client) =>
+ for ((directory, c) <- clients if c == client) {
+ unsubscribe(directory, client)
+ }
+
case Watcher.NewFile(directory, file) =>
val normal = directory.toAbsolutePath
val absFile = normal resolve file
- clients.getOrElse(normal.toString, Set.empty) foreach { ref =>
- reply(Serial.Connected(absFile.toString), ref)
+ clients.getOrElse(normal.toString, Set.empty) foreach { client =>
+ reply(Serial.Connected(absFile.toString), client)
}
case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
@@ -87,7 +111,7 @@ object Watcher {
private val service = FileSystems.getDefault().newWatchService()
- def watch(directory: Path) = directory.register(service, ENTRY_CREATE)
+ def register(directory: Path) = directory.register(service, ENTRY_CREATE)
override def run(): Unit = {
var stop = false