diff options
Diffstat (limited to 'flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala')
-rw-r--r-- | flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala new file mode 100644 index 0000000..11f695b --- /dev/null +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala @@ -0,0 +1,65 @@ +package com.github.jodersky.flow +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import com.github.jodersky.flow.{Serial => CoreSerial} + +private[stream] class WatcherLogic( + shape: SourceShape[String], + ioManager: ActorRef, + ports: Set[String], + watchPromise: Promise[Serial.Watch]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + + implicit private def self = stageActor.ref + + override def preStart(): Unit = { + getStageActor(receive) + stageActor watch ioManager + for (dir <- WatcherLogic.getDirs(ports)) { + ioManager ! CoreSerial.Watch(dir, skipInitial = false) + } + } + + setHandler(shape.out, IgnoreTerminateOutput) + + private def receive(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`ioManager`) => + val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.Connected(port) => + if (ports contains port) { + if (isAvailable(shape.out)) { + push(shape.out, port) + } + } + + case other => + failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) + + } + } + +} + +private[stream] object WatcherLogic { + def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) +} |