summaryrefslogblamecommitdiff
path: root/scalalib/src/test/resource/better-files/akka/README.md
blob: 391cec2e152d04515967d467c35ff94885688e4d (plain) (tree)









































































































































































































































































































































































































                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                           
Reproduction of [this Java Advent article](http://www.javaadvent.com/2015/12/reactive-file-system-monitoring-using-akka-actors.html)

-----

In this article, we will discuss:

0. File system monitoring using [Java NIO.2][nio2]
1. Common pitfalls of the default Java library
2. Design a simple thread-based file system monitor
3. Use the above to design a reactive file system monitor using the [actor][akka] [model][actorModel]

**Note**: Although all the code samples here are in Scala, it can be rewritten in simple Java too. To quickly familiarize yourself with Scala syntax, [here is a very short and nice Scala cheatsheet][cheatsheet]. For a more comprehensive guide to Scala for Java programmers, [consult this][cheatsheet2] (not needed to follow this article).

For the absolute shortest cheatsheet, the following Java code:

```java
public void foo(int x, int y) {
  int z = x + y
  if (z == 1) {
    System.out.println(x);
  } else {
    System.out.println(y);
  }
}
```

is equivalent to the following Scala code:

```scala
def foo(x: Int, y: Int): Unit = {
  val z: Int = x + y
  z match {
   case 1 => println(x)
   case _ => println(y)
  }
}
```


All the code presented here is available under MIT license as part of the [better-files][better-files-watcher]  library on [GitHub][better-files].

-----------

Let's say you are tasked to build a cross-platform desktop file-search engine. You quickly realize that after the initial indexing of all the files, you need to also quickly reindex any new files (or directories) that got created or updated. A naive way would be to simply rescan the entire file system every few minutes; but that would be incredibly inefficient since most operating systems expose file system notification APIs that allow the application programmer to register callbacks for changes e.g. [ionotify][ionotify-wiki] in Linux, [FSEvenets][fsevents-wiki] in Mac and [FindFirstChangeNotification][FindFirstChangeNotification] in Windows. 

But now you are stuck dealing with OS-specific APIs! Thankfully, beginning Java SE 7, we have a platform independent abstraction for watching file system changes via the [WatchService API][javadoc-watchservice]. The WatchService API was developed as part of [Java NIO.2][nio2-wiki], under [JSR-51][jsr-51] and here is a "hello world" example of using it to watch a given [Path][javadoc-path]:

```scala
import java.nio.file._
import java.nio.file.StandardWatchEventKinds._
import scala.collection.JavaConversions._

def watch(directory: Path): Unit = {
  // First create the service
  val service: WatchService = directory.getFileSystem.newWatchService()

  // Register the service to the path and also specify which events we want to be notified about
  directory.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)

  while (true) {
    val key: WatchKey = service.take()  // Wait for this key to be signalled
    for {event <- key.pollEvents()} {
      // event.context() is the path to the file that got changed  
      event.kind() match {
        case ENTRY_CREATE => println(s"${event.context()} got created")
        case ENTRY_MODIFY => println(s"${event.context()} got modified")
        case ENTRY_DELETE => println(s"${event.context()} got deleted")        
        case _ => 
          // This can happen when OS discards or loses an event. 
          // See: http://docs.oracle.com/javase/8/docs/api/java/nio/file/StandardWatchEventKinds.html#OVERFLOW
          println(s"Unknown event $event happened at ${event.context()}")
      }
    }
    key.reset()  // Do not forget to do this!! See: http://stackoverflow.com/questions/20180547/
  }
}
```

Although the above is a good first attempt, it lacks in several aspects:

0. **Bad Design**: The above code looks unnatural and you probably had to [look it up on StackOverflow][so-down] to get it right. Can we do better?
2. **Bad Design**: The code does not do a very good job of handling errors. What happens when we encounter a file we could not open?
3. **Gotcha**: The Java API only allows us to watch the directory for changes to its direct children; it [does not recursively watch a directory][so-recursive-watching] for you.
4. **Gotcha**: The Java API [does not allow us to watch a single file][so-only-watch-dirs], only a directory.
5. **Gotcha**: Even if we resolve the aformentioned issues, the Java API [does not automatically start watching a new child file][so-autowatch] or directory created under the root. 
6. **Bad Design**: The code as implemented above, exposes a blocking/polling, thread-based model. Can we use a better concurrency abstraction?

-----------


Let's start with each of the above concerns.

* **A better interface**: Here is what *my ideal* interface would look like:

```scala
abstract class FileMonitor(root: Path) {
  def start(): Unit
  def onCreate(path: Path): Unit
  def onModify(path: Path): Unit
  def onDelete(path: Path): Unit  
  def stop(): Unit
}
```

That way, I can simply write the example code as:

```scala
val watcher = new FileMonitor(myFile) {
  override def onCreate(path: Path) = println(s"$path got created")  
  override def onModify(path: Path) = println(s"$path got modified")    
  override def onDelete(path: Path) = println(s"$path got deleted")  
}
watcher.start()
```

Ok, let's try to adapt the first example using a Java `Thread` so that we can expose "my ideal interface":

```scala
trait FileMonitor {                               // My ideal interface
  val root: Path                                  // starting file  
  def start(): Unit                               // start the monitor 
  def onCreate(path: Path) = {}                   // on-create callback 
  def onModify(path: Path) = {}                   // on-modify callback 
  def onDelete(path: Path) = {}                   // on-delete callback 
  def onUnknownEvent(event: WatchEvent[_]) = {}   // handle lost/discarded events
  def onException(e: Throwable) = {}              // handle errors e.g. a read error
  def stop(): Unit                                // stop the monitor
}
```

And here is a very basic thread-based implementation:

```scala
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true)        // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)    
  })

  val service = root.getFileSystem.newWatchService()

  override def run() = Iterator.continually(service.take()).foreach(process)

  override def interrupt() = {
    service.close()
    super.interrupt()
  }

  override def start() = {
    watch(root)
    super.start()
  }

  protected[this] def watch(file: Path): Unit = {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
  }

  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] => dispatch(event.kind(), event.context())      
      case event => onUnknownEvent(event)
    }
    key.reset()
  }

  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)
    }
  }
}
```

The above looks much cleaner! Now we can watch files to our heart's content without poring over the details of JavaDocs by simply implementing the `onCreate(path)`, `onModify(path)`, `onDelete(path)` etc.

* **Exception handling**: This is already done above. `onException` gets called whenever we encounter an exception and the invoker can decide what to do next by implementing it.

* **Recursive watching**: The Java API **does not allow recursive watching of directories**. We need to modify the `watch(file)` to recursively attach the watcher:

```scala
def watch(file: Path, recursive: Boolean = true): Unit = {
  if (Files.isDirectory(file)) {
    file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)              
     // recursively call watch on children of this file  
     if (recursive) { 
       Files.list(file).iterator() foreach {f => watch(f, recursive)}
     } 
  }
}
```

* **Watching regular files**: As mentioned before, the Java API **can only watch directories**. One hack we can do to watch single files is to set a watcher on its parent directory and only react if the event trigerred on the file itself.

```scala
override def start() = {
  if (Files.isDirectory(root)) {
    watch(root, recursive = true) 
  } else {
    watch(root.getParent, recursive = false)
  }
  super.start()
}
```

And, now in `process(key)`, we make sure we react to either a directory or that file only:

```scala
def reactTo(target: Path) = Files.isDirectory(root) || (root == target)
```

And, we check before `dispatch` now:

```scala
case event: WatchEvent[Path] =>
  val target = event.context()
  if (reactTo(target)) {
    dispatch(event.kind(), target)
  }
```

* **Auto-watching new items**: The Java API, **does not auto-watch any new sub-files**. We can address this by attaching the watcher ourselves in `process(key)` when an `ENTRY_CREATE` event is fired: 

```scala
if (reactTo(target)) {
  if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
    watch(root.resolve(target))
  }
  dispatch(event.kind(), target)
}
```

Putting it all together, we have our final  [`FileMonitor.scala`][FileMonitor.scala]:

```scala
class ThreadFileMonitor(val root: Path) extends Thread with FileMonitor {
  setDaemon(true) // daemonize this thread
  setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler {
    override def uncaughtException(thread: Thread, exception: Throwable) = onException(exception)    
  })

  val service = root.getFileSystem.newWatchService()

  override def run() = Iterator.continually(service.take()).foreach(process)

  override def interrupt() = {
    service.close()
    super.interrupt()
  }

  override def start() = {
    if (Files.isDirectory(root)) {
      watch(root, recursive = true) 
    } else {
      watch(root.getParent, recursive = false)
    }
    super.start()
  }

  protected[this] def watch(file: Path, recursive: Boolean = true): Unit = {
    if (Files.isDirectory(file)) {
      file.register(service, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY)
      if (recursive) {
        Files.list(file).iterator() foreach {f => watch(f, recursive)}
      }  
    }
  }

  private[this] def reactTo(target: Path) = Files.isDirectory(root) || (root == target)

  protected[this] def process(key: WatchKey) = {
    key.pollEvents() foreach {
      case event: WatchEvent[Path] =>
        val target = event.context()
        if (reactTo(target)) {
          if (Files.isDirectory(root) && event.kind() == ENTRY_CREATE) {
            watch(root.resolve(target))
          }
          dispatch(event.kind(), target)
        }
      case event => onUnknownEvent(event)
    }
    key.reset()
  }

  def dispatch(eventType: WatchEvent.Kind[Path], file: Path): Unit = {
    eventType match {
      case ENTRY_CREATE => onCreate(file)
      case ENTRY_MODIFY => onModify(file)
      case ENTRY_DELETE => onDelete(file)
    }
  }
}
```

-----
Now, that we have addressed all the gotchas and distanced ourselves from the intricacies of the WatchService API, we are still tightly coupled to the thread-based API. 
We will use the above class to expose a different concurrency model, namely, the [actor model][actorModel2] instead to design a reactive, dynamic and resilient file-system watcher using [Akka][akka-docs]. Although the [construction of Akka actors][akka-actors] is beyond the scope of this article, we will present a very simple actor that uses the `ThreadFileMonitor`:

```scala
import java.nio.file.{Path, WatchEvent}

import akka.actor._

class FileWatcher(file: Path) extends ThreadFileMonitor(file) with Actor {
  import FileWatcher._

  // MultiMap from Events to registered callbacks
  protected[this] val callbacks = newMultiMap[Event, Callback]  

  // Override the dispatcher from ThreadFileMonitor to inform the actor of a new event
  override def dispatch(event: Event, file: Path) = self ! Message.NewEvent(event, file)  

  // Override the onException from the ThreadFileMonitor
  override def onException(exception: Throwable) = self ! Status.Failure(exception)

  // when actor starts, start the ThreadFileMonitor
  override def preStart() = super.start()   
  
  // before actor stops, stop the ThreadFileMonitor
  override def postStop() = super.interrupt()

  override def receive = {
    case Message.NewEvent(event, target) if callbacks contains event => 
       callbacks(event) foreach {f => f(event -> target)}

    case Message.RegisterCallback(events, callback) => 
       events foreach {event => callbacks.addBinding(event, callback)}

    case Message.RemoveCallback(event, callback) => 
       callbacks.removeBinding(event, callback)
  }
}

object FileWatcher {
  type Event = WatchEvent.Kind[Path]
  type Callback = PartialFunction[(Event, Path), Unit]

  sealed trait Message
  object Message {
    case class NewEvent(event: Event, file: Path) extends Message
    case class RegisterCallback(events: Seq[Event], callback: Callback) extends Message
    case class RemoveCallback(event: Event, callback: Callback) extends Message
  }
}
```

This allows us to dynamically register and remove callbacks to react to file system events:

```scala
// initialize the actor instance
val system = ActorSystem("mySystem") 
val watcher: ActorRef = system.actorOf(Props(new FileWatcher(Paths.get("/home/pathikrit"))))

// util to create a RegisterCallback message for the actor
def when(events: Event*)(callback: Callback): Message = {
  Message.RegisterCallback(events.distinct, callback)
}

// send the register callback message for create/modify events
watcher ! when(events = ENTRY_CREATE, ENTRY_MODIFY) {   
  case (ENTRY_CREATE, file) => println(s"$file got created")
  case (ENTRY_MODIFY, file) => println(s"$file got modified")
}
```

Full source: [`FileWatcher.scala`][FileWatcher.scala]

-----

[actorModel]: https://en.wikipedia.org/wiki/Actor_model
[actorModel2]: http://berb.github.io/diploma-thesis/original/054_actors.html
[akka]: http://akka.io
[akka-actors]: http://doc.akka.io/docs/akka/snapshot/scala/actors.html
[akka-docs]: http://doc.akka.io/docs/akka/2.4.1/java.html
[better-files]: https://github.com/pathikrit/better-files
[better-files-watcher]: https://github.com/pathikrit/better-files#akka-file-watcher
[cheatsheet]: http://learnxinyminutes.com/docs/scala/
[cheatsheet2]: http://techblog.realestate.com.au/java-to-scala-cheatsheet/
[FileWatcher.scala]: https://github.com/pathikrit/better-files/blob/2ea6bb694551f1fe6e9ce58dbd1b814391a02e5a/akka/src/main/scala/better/files/FileWatcher.scala
[FileMonitor.scala]: https://github.com/pathikrit/better-files/blob/2ea6bb694551f1fe6e9ce58dbd1b814391a02e5a/core/src/main/scala/better/files/FileMonitor.scala
[FindFirstChangeNotification]: https://msdn.microsoft.com/en-us/library/aa364417(VS.85).aspx
[fsevents-wiki]: https://en.wikipedia.org/wiki/FSEvents
[ionotify-wiki]: https://en.wikipedia.org/wiki/Inotify
[nio2]: https://docs.oracle.com/javase/tutorial/essential/io/fileio.html
[nio2-wiki]: https://en.wikipedia.org/wiki/Non-blocking_I/O_(Java)
[jsr-51]: https://www.jcp.org/en/jsr/detail?id=51
[javadoc-path]: https://docs.oracle.com/javase/8/docs/api/java/nio/file/Path.html
[javadoc-watchservice]: https://docs.oracle.com/javase/8/docs/api/java/nio/file/WatchService.html
[so-autowatch]: https://github.com/lloydmeta/schwatcher/issues/44
[so-down]: http://memecrunch.com/meme/YBHZ/stackoverflow-is-down/image.jpg
[so-recursive-watching]: http://stackoverflow.com/questions/18701242/how-to-watch-a-folder-and-subfolders-for-changes
[so-only-watch-dirs]: http://stackoverflow.com/questions/16251273/can-i-watch-for-single-file-change-with-watchservice-not-the-whole-directory