diff options
author | Jakob Odersky <jakob@odersky.com> | 2016-04-18 02:34:57 -0700 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2016-04-18 02:38:54 -0700 |
commit | 3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d (patch) | |
tree | d6fda5890153c17ffaebbb6cc5c7d061be00d70a /flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala | |
parent | 36721ebc573e1bb5f3e3ab74ed2feb0f6bf0b6e7 (diff) | |
download | akka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.tar.gz akka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.tar.bz2 akka-serial-3a1b65b9c4f71d58b9f0e62c0052030aa38fb00d.zip |
Implement stream interface for watchers
Diffstat (limited to 'flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala')
-rw-r--r-- | flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala | 65 |
1 files changed, 65 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala new file mode 100644 index 0000000..11f695b --- /dev/null +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/WatcherLogic.scala @@ -0,0 +1,65 @@ +package com.github.jodersky.flow +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import com.github.jodersky.flow.{Serial => CoreSerial} + +private[stream] class WatcherLogic( + shape: SourceShape[String], + ioManager: ActorRef, + ports: Set[String], + watchPromise: Promise[Serial.Watch]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + + implicit private def self = stageActor.ref + + override def preStart(): Unit = { + getStageActor(receive) + stageActor watch ioManager + for (dir <- WatcherLogic.getDirs(ports)) { + ioManager ! CoreSerial.Watch(dir, skipInitial = false) + } + } + + setHandler(shape.out, IgnoreTerminateOutput) + + private def receive(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`ioManager`) => + val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.Connected(port) => + if (ports contains port) { + if (isAvailable(shape.out)) { + push(shape.out, port) + } + } + + case other => + failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) + + } + } + +} + +private[stream] object WatcherLogic { + def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) +} |