blob: 11f695b278306e1a696851f0ba905d43f29d9a5e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
|
package com.github.jodersky.flow
package stream
package impl
import scala.concurrent.Promise
import akka.actor.{ActorRef, Terminated}
import akka.stream.SourceShape
import akka.stream.stage.GraphStageLogic
import com.github.jodersky.flow.{Serial => CoreSerial}
private[stream] class WatcherLogic(
shape: SourceShape[String],
ioManager: ActorRef,
ports: Set[String],
watchPromise: Promise[Serial.Watch])
extends GraphStageLogic(shape) {
import GraphStageLogic._
implicit private def self = stageActor.ref
override def preStart(): Unit = {
getStageActor(receive)
stageActor watch ioManager
for (dir <- WatcherLogic.getDirs(ports)) {
ioManager ! CoreSerial.Watch(dir, skipInitial = false)
}
}
setHandler(shape.out, IgnoreTerminateOutput)
private def receive(event: (ActorRef, Any)): Unit = {
val sender = event._1
val message = event._2
message match {
case Terminated(`ioManager`) =>
val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
failStage(ex)
watchPromise.failure(ex)
case CoreSerial.CommandFailed(cmd, reason) =>
val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
failStage(ex)
watchPromise.failure(ex)
case CoreSerial.Connected(port) =>
if (ports contains port) {
if (isAvailable(shape.out)) {
push(shape.out, port)
}
}
case other =>
failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))
}
}
}
private[stream] object WatcherLogic {
def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
}
|