From de9db5ec62a649fa36963d9fe01543bd612c0105 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sun, 24 May 2015 13:35:03 +0200 Subject: implement file watcher actor --- .../scala/com/github/jodersky/flow/Serial.scala | 32 +++++- .../com/github/jodersky/flow/SerialManager.scala | 17 +++- .../com/github/jodersky/flow/SerialOperator.scala | 6 +- .../com/github/jodersky/flow/internal/Reader.scala | 4 +- .../github/jodersky/flow/internal/ReaderDied.scala | 3 - .../github/jodersky/flow/internal/ThreadDied.scala | 3 + .../github/jodersky/flow/internal/Watcher.scala | 111 +++++++++++++++++++++ 7 files changed, 162 insertions(+), 14 deletions(-) delete mode 100644 flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala create mode 100644 flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala create mode 100644 flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala index b6e9d62..2abe242 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/Serial.scala @@ -8,7 +8,7 @@ object Serial extends ExtensionKey[SerialExt] { /** Base trait for any flow-related messages. */ sealed trait Message - + /** A message extending this trait is to be viewed as a command, an out-bound message issued by the client to flow's API. */ trait Command extends Message @@ -90,4 +90,34 @@ object Serial extends ExtensionKey[SerialExt] { */ case object Closed extends Event + /** + * Watch a directory for new ports. + * + * Send this command to the manager to get notifications when a new port (i.e. file) is created in + * the given directory. + * In case the given directory cannot be watched, the manager responds with a `CommandFailed` message. + * + * Note: the directory must exist when this message is sent. + * + * @param directory the directory to watch + * + * @see Unwatch + * @see Connected + */ + case class Watch(directory: String = "/dev") extends Command + + /** + * Stop receiving notifications about a previously watched directory. + * + * @param directory the directory to unwatch + */ + case class Unwatch(directory: String = "/dev") extends Command + + /** + * A new port (i.e. file) has been detected. + * + * @param port the absolute file name of the connected port + */ + case class Connected(port: String) extends Event + } diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala index eb8c44e..40739ca 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialManager.scala @@ -5,9 +5,8 @@ import scala.util.Success import scala.util.Try import com.github.jodersky.flow.internal.SerialConnection +import com.github.jodersky.flow.internal.Watcher -import Serial.CommandFailed -import Serial.Open import akka.actor.Actor import akka.actor.ActorLogging import akka.actor.OneForOneStrategy @@ -27,13 +26,21 @@ class SerialManager extends Actor with ActorLogging { case _: Exception => Stop } + private val watcher = actorOf(Watcher(self), "watcher") + def receive = { - case open @ Open(port, settings, bufferSize) => Try { + + case open @ Serial.Open(port, settings, bufferSize) => Try { SerialConnection.open(port, settings) } match { case Success(connection) => context.actorOf(SerialOperator(connection, bufferSize, sender), name = escapePortString(connection.port)) - case Failure(err) => sender ! CommandFailed(open, err) + case Failure(err) => sender ! Serial.CommandFailed(open, err) } + + case w: Serial.Watch => watcher.forward(w) + + case u: Serial.Unwatch => watcher.forward(u) + } } @@ -45,4 +52,4 @@ object SerialManager { case c => c } -} \ No newline at end of file +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala index 5524125..f40f4de 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/SerialOperator.scala @@ -3,7 +3,7 @@ package com.github.jodersky.flow import java.nio.ByteBuffer import com.github.jodersky.flow.internal.Reader -import com.github.jodersky.flow.internal.ReaderDied +import com.github.jodersky.flow.internal.ThreadDied import com.github.jodersky.flow.internal.SerialConnection import Serial.Close @@ -57,7 +57,7 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto } //go down with reader thread - case ReaderDied(ex) => throw ex + case ThreadDied(`reader`, ex) => throw ex } @@ -65,4 +65,4 @@ class SerialOperator(connection: SerialConnection, bufferSize: Int, client: Acto object SerialOperator { def apply(connection: SerialConnection, bufferSize: Int, client: ActorRef) = Props(classOf[SerialOperator], connection, bufferSize, client) -} \ No newline at end of file +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala index 7b3f2ef..42400c8 100644 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Reader.scala @@ -27,7 +27,7 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c //stop and tell operator on other exception case ex: Exception => { stop = true - operator.tell(ReaderDied(ex), Actor.noSender) + operator.tell(ThreadDied(this, ex), Actor.noSender) } } } @@ -37,4 +37,4 @@ class Reader(serial: SerialConnection, buffer: ByteBuffer, operator: ActorRef, c this.setName("flow-reader " + serial.port) readLoop() } -} \ No newline at end of file +} diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala deleted file mode 100644 index 7dd9954..0000000 --- a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ReaderDied.scala +++ /dev/null @@ -1,3 +0,0 @@ -package com.github.jodersky.flow.internal - -case class ReaderDied(reason: Exception) \ No newline at end of file diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala new file mode 100644 index 0000000..0674354 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/ThreadDied.scala @@ -0,0 +1,3 @@ +package com.github.jodersky.flow.internal + +case class ThreadDied(thread: Thread, reason: Exception) diff --git a/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala new file mode 100644 index 0000000..4548aa0 --- /dev/null +++ b/flow-main/src/main/scala/com/github/jodersky/flow/internal/Watcher.scala @@ -0,0 +1,111 @@ +package com.github.jodersky.flow.internal + +import akka.actor.{ Actor, ActorRef, Props } +import com.github.jodersky.flow.Serial +import java.nio.file.{ ClosedWatchServiceException, FileSystems, Path, Paths, WatchEvent, WatchKey } +import java.nio.file.StandardWatchEventKinds._ +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap, Map, MultiMap, Set } +import scala.util.{ Failure, Success, Try } + +class Watcher(from: Option[ActorRef]) extends Actor { + + private val watcher = new Watcher.WatcherThread(self) + + //directory -> subscribers + private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef] + + //directory -> watchkey + private val keys: Map[String, WatchKey] = Map.empty + + def reply(msg: Any, sender: ActorRef) = { + val origin = from match { + case Some(ref) => ref + case None => self + } + sender.tell(msg, origin) + } + + override def preStart() = { + watcher.setDaemon(true) + watcher.setName("flow-watcher") + watcher.start() + } + + def receive = { + + case w @ Serial.Watch(directory) => + val normalPath = Paths.get(directory).toAbsolutePath + val normal = normalPath.toString + + Try { + keys.getOrElseUpdate(normal, watcher.watch(normalPath)) + } match { + case Failure(err) => reply(Serial.CommandFailed(w, err), sender) + case Success(key) => clients addBinding (normal, sender) + } + + case u @ Serial.Unwatch(directory) => + val normal = Paths.get(directory).toAbsolutePath.toString + + clients.removeBinding(normal, sender) + + if (clients.get(normal).isEmpty && keys.get(normal).isDefined) { + keys(normal).cancel() + keys -= normal + } + + case Watcher.NewFile(directory, file) => + val normal = directory.toAbsolutePath + val absFile = normal resolve file + clients.getOrElse(normal.toString, Set.empty) foreach { ref => + reply(Serial.Connected(absFile.toString), ref) + } + + case ThreadDied(`watcher`, err) => throw err //go down with watcher thread + + } + + override def postStop() = { + watcher.close() + } + +} + +object Watcher { + private case class NewFile(directory: Path, file: Path) + + private class WatcherThread(actor: ActorRef) extends Thread { + + private val service = FileSystems.getDefault().newWatchService() + + def watch(directory: Path) = directory.register(service, ENTRY_CREATE) + + override def run(): Unit = { + var stop = false + while (!stop) { + try { + val key = service.take() + key.pollEvents() foreach { ev => + val event = ev.asInstanceOf[WatchEvent[Path]] + if (event.kind == ENTRY_CREATE) { + val directory = key.watchable().asInstanceOf[Path] + val file = event.context() + actor.tell(NewFile(directory, file), Actor.noSender) + } + } + key.reset() + } catch { + case _: InterruptedException => stop = true + case _: ClosedWatchServiceException => stop = true + case ex: Exception => actor.tell(ThreadDied(this, ex), Actor.noSender) + } + } + } + + def close() = service.close //causes the service to throw a ClosedWatchServiceException + } + + def apply(from: ActorRef) = Props(classOf[Watcher], Some(from)) + +} -- cgit v1.2.3