1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
|
package com.github.jodersky.flow
package internal
import akka.actor.{ Actor, ActorRef, Props }
import java.nio.file.{ ClosedWatchServiceException, Files, FileSystems, Path, Paths, WatchEvent, WatchKey }
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
import scala.util.{ Failure, Success, Try }
class Watcher(from: Option[ActorRef]) extends Actor {
private val watcher = new Watcher.WatcherThread(self)
//directory -> subscribers
private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]
//directory -> watchkey
private val keys: Map[String, WatchKey] = Map.empty
def reply(msg: Any, sender: ActorRef) = {
val origin = from match {
case Some(ref) => ref
case None => self
}
sender.tell(msg, origin)
}
override def preStart() = {
watcher.setDaemon(true)
watcher.setName("flow-watcher")
watcher.start()
}
def receive = {
case w @ Serial.Watch(directory, skipInitial) =>
val normalPath = Paths.get(directory).toAbsolutePath
val normal = normalPath.toString
Try {
keys.getOrElseUpdate(normal, watcher.watch(normalPath))
} match {
case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
case Success(key) =>
clients addBinding (normal, sender)
if (!skipInitial) {
Files.newDirectoryStream(normalPath) foreach { path =>
if (!Files.isDirectory(path)) {
reply(Serial.Connected(path.toString), sender)
}
}
}
}
case u @ Serial.Unwatch(directory) =>
val normal = Paths.get(directory).toAbsolutePath.toString
clients.removeBinding(normal, sender)
if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
keys(normal).cancel()
keys -= normal
}
case Watcher.NewFile(directory, file) =>
val normal = directory.toAbsolutePath
val absFile = normal resolve file
clients.getOrElse(normal.toString, Set.empty) foreach { ref =>
reply(Serial.Connected(absFile.toString), ref)
}
case ThreadDied(`watcher`, err) => throw err //go down with watcher thread
}
override def postStop() = {
watcher.close()
}
}
object Watcher {
private case class NewFile(directory: Path, file: Path)
private class WatcherThread(actor: ActorRef) extends Thread {
private val service = FileSystems.getDefault().newWatchService()
def watch(directory: Path) = directory.register(service, ENTRY_CREATE)
override def run(): Unit = {
var stop = false
while (!stop) {
try {
val key = service.take()
key.pollEvents() foreach { ev =>
val event = ev.asInstanceOf[WatchEvent[Path]]
if (event.kind == ENTRY_CREATE) {
val directory = key.watchable().asInstanceOf[Path]
val file = event.context()
actor.tell(NewFile(directory, file), Actor.noSender)
}
}
key.reset()
} catch {
case _: InterruptedException => stop = true
case _: ClosedWatchServiceException => stop = true
case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender)
}
}
}
def close() = service.close //causes the service to throw a ClosedWatchServiceException
}
def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))
}
|