aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala
blob: afab60e2d94ff9f5b622e2294f00aa0b43ce758c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package akka.serial
package stream
package impl

import scala.concurrent.Promise

import akka.actor.{ActorRef, Terminated}
import akka.stream.SourceShape
import akka.stream.stage.GraphStageLogic
import akka.serial.{Serial => CoreSerial}

private[stream] class WatcherLogic(
  shape: SourceShape[String],
  ioManager: ActorRef,
  ports: Set[String],
  watchPromise: Promise[Serial.Watch])
    extends GraphStageLogic(shape) {
  import GraphStageLogic._

  implicit private def self = stageActor.ref

  override def preStart(): Unit = {
    getStageActor(receive)
    stageActor watch ioManager
    for (dir <- WatcherLogic.getDirs(ports)) {
      ioManager ! CoreSerial.Watch(dir, skipInitial = false)
    }
  }

  setHandler(shape.out, IgnoreTerminateOutput)

  private def receive(event: (ActorRef, Any)): Unit = {
    val sender = event._1
    val message = event._2

    message match {

      case Terminated(`ioManager`) =>
        val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
        failStage(ex)
        watchPromise.failure(ex)

      case CoreSerial.CommandFailed(cmd, reason) =>
        val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
        failStage(ex)
        watchPromise.failure(ex)

      case CoreSerial.Connected(port) =>
        if (ports contains port) {
          if (isAvailable(shape.out)) {
            push(shape.out, port)
          }
        }

      case other =>
        failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))

    }
  }

}

private[stream] object WatcherLogic {
  def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
}