aboutsummaryrefslogtreecommitdiff
path: root/core/src/main/scala/akka/serial/Watcher.scala
blob: faaf39ac77ea15be6b9986577e36669e702d0ecd (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package akka.serial

import akka.actor.{ Actor, ActorRef, Props, Terminated }
import java.nio.file.{ ClosedWatchServiceException, FileSystems, Files, Path, Paths, WatchEvent, WatchKey }
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._
import scala.collection.mutable.{ HashMap, Map, MultiMap, Set }
import scala.util.{ Failure, Success, Try }

private[serial] class Watcher(from: Option[ActorRef]) extends Actor {

  case class WatcherDied(reason: Throwable)
  object WatcherThread extends Thread {
    import Watcher.NewFile

    private val service = FileSystems.getDefault().newWatchService()

    def register(directory: Path) = directory.register(service, ENTRY_CREATE)

    override def run(): Unit = {
      this.setName("serial-port-watcher")
      var stop = false
      while (!stop) {
        try {
          val key = service.take()
          key.pollEvents() foreach { ev =>
            val event = ev.asInstanceOf[WatchEvent[Path]]
            if (event.kind == ENTRY_CREATE) {
              val directory = key.watchable().asInstanceOf[Path]
              val file = event.context()
              self.tell(NewFile(directory, file), Actor.noSender)
            }
          }
          key.reset()
        } catch {
          case _: InterruptedException => stop = true
          case _: ClosedWatchServiceException => stop = true
          case ex: Exception => self.tell(WatcherDied(ex), Actor.noSender)
        }
      }
    }

    def close() = service.close // causes the service to throw a ClosedWatchServiceException
  }


  // directory -> subscribers
  private val clients: MultiMap[String, ActorRef] = new HashMap[String, Set[ActorRef]] with MultiMap[String, ActorRef]

  // directory -> watchkey
  private val keys: Map[String, WatchKey] = Map.empty

  def subscribe(directory: String, client: ActorRef): WatchKey = {
    val normal = Paths.get(directory).toAbsolutePath
    val index = normal.toString
    val key = keys.getOrElseUpdate(index, WatcherThread.register(normal))
    clients addBinding (index, client)
    key
  }

  def unsubscribe(directory: String, client: ActorRef): Unit = {
    val index = Paths.get(directory).toAbsolutePath.toString

    clients removeBinding (index, sender)

    if (clients.get(index).isEmpty && keys.get(index).isDefined) {
      keys(index).cancel()
      keys -= index
    }
  }

  def reply(msg: Any, sender: ActorRef) = {
    val origin = from match {
      case Some(ref) => ref
      case None => self
    }
    sender.tell(msg, origin)
  }

  override def preStart() = {
    WatcherThread.setDaemon(true)
    WatcherThread.start()
  }

  override def receive = {

    case w @ Serial.Watch(directory, skipInitial) =>
      val normalPath = Paths.get(directory).toAbsolutePath
      val normal = normalPath.toString

      Try {
        subscribe(directory, sender)
      } match {
        case Failure(err) => reply(Serial.CommandFailed(w, err), sender)
        case Success(key) =>
          context watch sender
          if (!skipInitial) {
            Files.newDirectoryStream(normalPath) foreach { path =>
              if (!Files.isDirectory(path)) {
                reply(Serial.Connected(path.toString), sender)
              }
            }
          }
      }

    case u @ Serial.Unwatch(directory) =>
      val normal = Paths.get(directory).toAbsolutePath.toString

      clients.removeBinding(normal, sender)

      if (clients.get(normal).isEmpty && keys.get(normal).isDefined) {
        keys(normal).cancel()
        keys -= normal
      }

    case Terminated(client) =>
      for ((directory, c) <- clients if c == client) {
        unsubscribe(directory, client)
      }

    case Watcher.NewFile(directory, file) =>
      val normal = directory.toAbsolutePath
      val absFile = normal resolve file
      clients.getOrElse(normal.toString, Set.empty) foreach { client =>
        reply(Serial.Connected(absFile.toString), client)
      }

    case WatcherDied(err) => throw err // go down with watcher thread

  }

  override def postStop() = {
    WatcherThread.close()
  }

}

private[serial] object Watcher {
  private case class NewFile(directory: Path, file: Path)

  def apply(from: ActorRef) = Props(classOf[Watcher], Some(from))

}