aboutsummaryrefslogtreecommitdiff
path: root/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
blob: 764b054c4587db6230838e404cd3ca492700d3a6 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
package ch.jodersky.flow
package stream
package impl

import scala.concurrent.Promise

import akka.actor.{ActorRef, Terminated}
import akka.stream.{FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
import akka.util.ByteString

import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings}

/**
  * Graph logic that handles establishing and forwarding serial communication.
  * The underlying stream is closed when downstream (output) finishes,
  * upstream (input) closes are ignored.
  */
private[stream] class SerialConnectionLogic(
  shape: FlowShape[ByteString, ByteString],
  manager: ActorRef,
  port: String,
  settings: SerialSettings,
  failOnOverflow: Boolean,
  bufferSize: Int,
  connectionPromise: Promise[Serial.Connection])
    extends GraphStageLogic(shape) {
  import GraphStageLogic._
  import SerialConnectionLogic._

  /** Receives data and writes it to the serial backend. */
  private def in: Inlet[ByteString] = shape.in

  /** Receives data from the serial backend and pushes it downstream. */
  private def out: Outlet[ByteString] = shape.out

  /** Implicit alias to stageActor so it will be used in "!" calls, without
    * explicitly specifying a sender. */
  implicit private def self = stageActor.ref

  /**
    * Input handler for an established connection.
    * @param operator the operator actor of the established connection
    */
  class ConnectedInHandler(operator: ActorRef) extends InHandler {

    override def onPush(): Unit = {
      val elem = grab(in)
      require(elem != null) // reactive streams requirement
      operator ! CoreSerial.Write(elem, _ => WriteAck)
    }

    override def onUpstreamFinish(): Unit = {
      if (isClosed(out)) { // close serial connection if output is also closed
        operator ! CoreSerial.Close
      }
    }

  }

  class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
    // implicit alias to stage actor, so it will be used in "!" calls
    implicit val self = stageActor.ref

    override def onPull(): Unit = {
      // serial connections are at the end of the "backpressure chain",
      // they do not natively support backpressure (as does TCP for example)
      // therefore nothing is done here
    }

    override def onDownstreamFinish(): Unit = {
      // closing downstream also closes the underlying connection
      operator ! CoreSerial.Close
    }

  }

  override def preStart(): Unit = {
    setKeepGoing(true) // serial connection operator will manage completing stage
    getStageActor(connecting)
    stageActor watch manager
    manager ! CoreSerial.Open(port, settings, bufferSize)
  }

  setHandler(in, IgnoreTerminateInput)
  setHandler(out, IgnoreTerminateOutput)

  /** Initial behavior, before a serial connection is established. */
  private def connecting(event: (ActorRef, Any)): Unit = {
    val sender = event._1
    val message = event._2

    message match {

      case Terminated(`manager`) =>
        val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
        failStage(ex)
        connectionPromise.failure(ex)

      case CoreSerial.CommandFailed(cmd, reason) =>
        val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
        failStage(ex)
        connectionPromise.failure(ex)

      case CoreSerial.Opened(port) =>
        val operator = sender
        setHandler(in, new ConnectedInHandler(operator))
        setHandler(out, new ConnectedOutHandler(operator))
        stageActor become connected(operator)
        connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
        stageActor unwatch manager
        stageActor watch operator
        if (!isClosed(in)) {
          pull(in) // start pulling input
        }

      case other =>
        val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
        failStage(ex)
        connectionPromise.failure(ex)

    }

  }

  /** Behaviour once a connection has been established. It is assumed that operator is not null. */
  private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
    val sender = event._1
    val message = event._2

    message match {

      case Terminated(`operator`) =>
        failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))

      case CoreSerial.CommandFailed(cmd, reason) =>
        failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))

      case CoreSerial.Closed =>
        completeStage()

      case CoreSerial.Received(data) =>
        if (isAvailable(out)) {
          push(out, data)
        } else if (failOnOverflow) {
          /* Note that the native backend does not provide any way of informing about
           * dropped serial data. However, in most cases, a computer capable of running flow
           * is also capable of processing incoming serial data at typical baud rates.
           * Hence packets will usually only be dropped if an application that uses flow
           * backpressures, which can however be detected here. */
          failStage(new StreamSerialException("Incoming serial data was dropped."))
        }

      case WriteAck =>
        if (!isClosed(in)) {
          pull(in)
        }

      case other =>
        failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))

    }

  }

}

private[stream] object SerialConnectionLogic {

  case object WriteAck extends CoreSerial.Event

}