aboutsummaryrefslogtreecommitdiff
path: root/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-05-16 15:28:59 -0700
committerJakob Odersky <jakob@odersky.com>2016-06-09 03:30:35 -0700
commit92c4b3d41e06ad4b89004212c85248e9e6cd61d7 (patch)
tree69470f7c4ed48edaebea91964d7d552e7eaacf0d /flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
parentf6f26c2c9e3ec9bdd45fb384483b3450bef5984a (diff)
downloadakka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.tar.gz
akka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.tar.bz2
akka-serial-92c4b3d41e06ad4b89004212c85248e9e6cd61d7.zip
Move project to `ch.jodersky` and upgrade sbt-jni
Diffstat (limited to 'flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala')
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala65
1 files changed, 65 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
new file mode 100644
index 0000000..60b7c90
--- /dev/null
+++ b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
@@ -0,0 +1,65 @@
+package ch.jodersky.flow
+package stream
+package impl
+
+import scala.concurrent.Promise
+
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.SourceShape
+import akka.stream.stage.GraphStageLogic
+import ch.jodersky.flow.{Serial => CoreSerial}
+
+private[stream] class WatcherLogic(
+ shape: SourceShape[String],
+ ioManager: ActorRef,
+ ports: Set[String],
+ watchPromise: Promise[Serial.Watch])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+
+ implicit private def self = stageActor.ref
+
+ override def preStart(): Unit = {
+ getStageActor(receive)
+ stageActor watch ioManager
+ for (dir <- WatcherLogic.getDirs(ports)) {
+ ioManager ! CoreSerial.Watch(dir, skipInitial = false)
+ }
+ }
+
+ setHandler(shape.out, IgnoreTerminateOutput)
+
+ private def receive(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`ioManager`) =>
+ val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.Connected(port) =>
+ if (ports contains port) {
+ if (isAvailable(shape.out)) {
+ push(shape.out, port)
+ }
+ }
+
+ case other =>
+ failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))
+
+ }
+ }
+
+}
+
+private[stream] object WatcherLogic {
+ def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
+}