From 23959966760174477a6b0fcbf9dd1e8ef37c643b Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 8 Jan 2017 21:16:25 +0100 Subject: Rename project to akka-serial --- core/src/main/scala/akka/serial/Watcher.scala | 143 ++++++++++++++++++++++++++ 1 file changed, 143 insertions(+) create mode 100644 core/src/main/scala/akka/serial/Watcher.scala (limited to 'core/src/main/scala/akka/serial/Watcher.scala') diff --git a/core/src/main/scala/akka/serial/Watcher.scala b/core/src/main/scala/akka/serial/Watcher.scala new file mode 100644 index 0000000..faaf39a --- /dev/null +++ b/core/src/main/scala/akka/serial/Watcher.scala @@ -0,0 +1,143 @@ +package akka.serial + +import akka.actor.{ Actor, ActorRef, Props, Terminated } +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } +import scala.util.{ Failure, Success, Try } + +private[serial] class Watcher(from: Option[ActorRef]) extends Actor { + + case class WatcherDied(reason: Throwable) + object WatcherThread extends Thread { + import Watcher.NewFile + + private val service = FileSystems.getDefault().newWatchService() + + def register(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + this.setName("serial-port-watcher") + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + self.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender) + } + } + } + + def close() = service.close // causes the service to throw a ClosedWatchServiceException + } + + + // directory -> subscribers + private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] + + // directory -> watchkey + private val keys: Map[String, WatchKey] = Map.empty + + def subscribe(directory: String, client: ActorRef): WatchKey = { + val normal = Paths.get(directory).toAbsolutePath + val index = normal.toString + val key = keys.getOrElseUpdate(index, WatcherThread.register(normal)) + clients addBinding (index, client) + key + } + + def unsubscribe(directory: String, client: ActorRef): Unit = { + val index = Paths.get(directory).toAbsolutePath.toString + + clients removeBinding (index, sender) + + if (clients.get(index).isEmpty && keys.get(index).isDefined) { + keys(index).cancel() + keys -= index + } + } + + def reply(msg: Any, sender: ActorRef) = { + val origin = from match { + case Some(ref) => ref + case None => self + } + sender.tell(msg, origin) + } + + override def preStart() = { + WatcherThread.setDaemon(true) + WatcherThread.start() + } + + override def receive = { + + case w @ Serial.Watch(directory, skipInitial) => + val normalPath = Paths.get(directory).toAbsolutePath + val normal = normalPath.toString + + Try { + subscribe(directory, sender) + } match { + case Failure(err) => reply(Serial.CommandFailed(w, err), sender) + case Success(key) => + context watch sender + if (!skipInitial) { + Files.newDirectoryStream(normalPath) foreach { path => + if (!Files.isDirectory(path)) { + reply(Serial.Connected(path.toString), sender) + } + } + } + } + + case u @ Serial.Unwatch(directory) => + val normal = Paths.get(directory).toAbsolutePath.toString + + clients.removeBinding(normal, sender) + + if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { + keys(normal).cancel() + keys -= normal + } + + case Terminated(client) => + for ((directory, c) <- clients if c == client) { + unsubscribe(directory, client) + } + + case Watcher.NewFile(directory, file) => + val normal = directory.toAbsolutePath + val absFile = normal resolve file + clients.getOrElse(normal.toString, Set.empty) foreach { client => + reply(Serial.Connected(absFile.toString), client) + } + + case WatcherDied(err) => throw err // go down with watcher thread + + } + + override def postStop() = { + WatcherThread.close() + } + +} + +private[serial] object Watcher { + private case class NewFile(directory: Path, file: Path) + + def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) + +} -- cgit v1.2.3