From 7db412df69ded4f035cfa21c5a49451d31877d23 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 25 May 2015 11:41:03 +0200 Subject: add deathwatch to watchers --- .../github/jodersky/flow/internal/Watcher.scala | 38 ++++++++++++++++++---- 1 file changed, 31 insertions(+), 7 deletions(-) diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala index c8ca374..8d89fb4 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala @@ -1,8 +1,8 @@ package com.github.jodersky.flow package internal -import akka.actor.{ Actor, ActorRef, Props } -import java.nio.file.{ ClosedWatchServiceException, Files, FileSystems, Path, Paths, WatchEvent, WatchKey } +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } import java.nio.file.StandardWatchEventKinds._ import scala.collection.JavaConversions._ import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } @@ -18,6 +18,25 @@ class Watcher(from: Option[ActorRef]) extends Actor { //directory -> watchkey private val keys: Map[String, WatchKey] = Map.empty + def subscribe(directory: String, client: ActorRef): WatchKey = { + val normal = Paths.get(directory).toAbsolutePath + val index = normal.toString + val key = keys.getOrElseUpdate(index, watcher.register(normal)) + clients addBinding (index, client) + key + } + + def unsubscribe(directory: String, client: ActorRef): Unit = { + val index = Paths.get(directory).toAbsolutePath.toString + + clients removeBinding (index, sender) + + if (clients.get(index).isEmpty && keys.get(index).isDefined) { + keys(index).cancel() + keys -= index + } + } + def reply(msg: Any, sender: ActorRef) = { val origin = from match { case Some(ref) => ref @@ -39,11 +58,11 @@ class Watcher(from: Option[ActorRef]) extends Actor { val normal = normalPath.toString Try { - keys.getOrElseUpdate(normal, watcher.watch(normalPath)) + subscribe(directory, sender) } match { case Failure(err) => reply(Serial.CommandFailed(w, err), sender) case Success(key) => - clients addBinding (normal, sender) + context watch sender if (!skipInitial) { Files.newDirectoryStream(normalPath) foreach { path => if (!Files.isDirectory(path)) { @@ -63,11 +82,16 @@ class Watcher(from: Option[ActorRef]) extends Actor { keys -= normal } + case Terminated(client) => + for ((directory, c) <- clients if c == client) { + unsubscribe(directory, client) + } + case Watcher.NewFile(directory, file) => val normal = directory.toAbsolutePath val absFile = normal resolve file - clients.getOrElse(normal.toString, Set.empty) foreach { ref => - reply(Serial.Connected(absFile.toString), ref) + clients.getOrElse(normal.toString, Set.empty) foreach { client => + reply(Serial.Connected(absFile.toString), client) } case ThreadDied(`watcher`, err) => throw err //go down with watcher thread @@ -87,7 +111,7 @@ object Watcher { private val service = FileSystems.getDefault().newWatchService() - def watch(directory: Path) = directory.register(service, ENTRY_CREATE) + def register(directory: Path) = directory.register(service, ENTRY_CREATE) override def run(): Unit = { var stop = false -- cgit v1.2.3