aboutsummaryrefslogblamecommitdiff
path: root/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
blob: 35be4641aa5632ec732ea471f65a54aa52879609 (plain) (tree)










































































































































































                                                                                                         
package com.github.jodersky.flow
package stream
package impl

import akka.actor._
import akka.stream.stage.GraphStageLogic.StageActorRef
import akka.util._
import akka.stream._
import akka.stream.stage._
import scala.concurrent._
import akka.io._
import com.github.jodersky.flow.{Serial => CoreSerial}
import akka.stream.impl.ReactiveStreamsCompliance


/**
  * Graph logic that handles establishing and forwarding serial communication.
  * The underlying stream is closed when downstream (output) finishes,
  * conversely upstream (input) closes are ignored.
  */
private[stream] class SerialConnectionLogic(
  shape: FlowShape[ByteString, ByteString],
  manager: ActorRef,
  port: String,
  settings: SerialSettings,
  failOnOverflow: Boolean,
  bufferSize: Int,
  connectionPromise: Promise[Serial.Connection])
    extends GraphStageLogic(shape) {
  import GraphStageLogic._
  import SerialConnectionLogic._

  /** Receives data and writes it to the serial backend. */
  private def in: Inlet[ByteString] = shape.in

  /** Receives data from the serial backend and pushes it downstream. */
  private def out: Outlet[ByteString] = shape.out

  /** Implicit alias to stageActor so it will be used in "!" calls, without
    * explicitly specifying a sender. */
  implicit private def self = stageActor.ref

  /** Input handler for an established connection.
    * @param operator ther operator actor of the established connection
    */
  class ConnectedInHandler(operator: ActorRef) extends InHandler {

    override def onPush(): Unit = {
      val elem = grab(in)
      require(elem != null) // reactive streams requirement
      operator ! CoreSerial.Write(elem, _ => WriteAck)
    }

    override def onUpstreamFinish(): Unit = {
      if (isClosed(out)) { // close serial connection if output is also closed
        operator ! CoreSerial.Close
      }
    }

  }

  class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
    // alias stage actor as implicit to it will be used in "!" calls
    implicit val self = stageActor.ref

    override def onPull(): Unit = {
      // serial connections are at the end of the "backpressure chain",
      // they do not natively support backpressure (as does TCP for example)
      // therefore nothing is done here as
    }

    override def onDownstreamFinish(): Unit = {
      // closing downstream also closes the underlying connection
      operator ! CoreSerial.Close
    }

  }

  override def preStart(): Unit = {
    setKeepGoing(true) // serial connection operator will manage completing stage
    getStageActor(connecting)
    stageActor watch manager
    manager ! CoreSerial.Open(port, settings, bufferSize)
  }

  setHandler(in, IgnoreTerminateInput)
  setHandler(out, IgnoreTerminateOutput)

  /** Initial behaviour, before a serial connection is established. */
  private def connecting(event: (ActorRef, Any)): Unit = {
    val sender = event._1
    val message = event._2

    message match {

      case Terminated(`manager`) =>
        val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
        failStage(ex)
        connectionPromise.failure(ex)

      case CoreSerial.CommandFailed(cmd, reason) =>
        val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
        failStage(ex)
        connectionPromise.failure(ex)

      case CoreSerial.Opened(port) =>
        val operator = sender
        setHandler(in, new ConnectedInHandler(operator))
        setHandler(out, new ConnectedOutHandler(operator))
        stageActor become connected(operator)
        connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
        stageActor unwatch manager
        stageActor watch operator
        pull(in) // start pulling input

      case other =>
        val ex = new StreamSerialException(s"Stage actor received unkown message [$other]")
        failStage(ex)
        connectionPromise.failure(ex)

    }

  }

  /** Behaviour once a connection has been established. It is assumed that operator is not null. */
  private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
    val sender = event._1
    val message = event._2

    message match {

      case Terminated(`operator`) =>
        failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))

      case CoreSerial.CommandFailed(cmd, reason) =>
        failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))

      case CoreSerial.Closed =>
        completeStage()

      case CoreSerial.Received(data) =>
        if (isAvailable(out)) {
          push(out, data)
        } else if (failOnOverflow) {
          /* Note that the native backend does not provide any way of informing about
           * dropped serial data. However, in most cases, a computer capable of running flow
           * is also capable of processing incoming serial data at typical baud rates.
           * Hence packets will usually only be dropped if an application that uses flow
           * backpressures, which can however be detected here. */
          failStage(new StreamSerialException("Incoming serial data was dropped."))
        }

      case WriteAck =>
        if (!isClosed(in)) {
          pull(in)
        }

      case other =>
        failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))

    }

  }

}

private[stream] object SerialConnectionLogic {

  case object WriteAck extends CoreSerial.Event

}