aboutsummaryrefslogtreecommitdiff
path: root/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
diff options
context:
space:
mode:
Diffstat (limited to 'flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala')
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala172
1 files changed, 0 insertions, 172 deletions
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
deleted file mode 100644
index 764b054..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-package ch.jodersky.flow
-package stream
-package impl
-
-import scala.concurrent.Promise
-
-import akka.actor.{ActorRef, Terminated}
-import akka.stream.{FlowShape, Inlet, Outlet}
-import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
-import akka.util.ByteString
-
-import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings}
-
-/**
- * Graph logic that handles establishing and forwarding serial communication.
- * The underlying stream is closed when downstream (output) finishes,
- * upstream (input) closes are ignored.
- */
-private[stream] class SerialConnectionLogic(
- shape: FlowShape[ByteString, ByteString],
- manager: ActorRef,
- port: String,
- settings: SerialSettings,
- failOnOverflow: Boolean,
- bufferSize: Int,
- connectionPromise: Promise[Serial.Connection])
- extends GraphStageLogic(shape) {
- import GraphStageLogic._
- import SerialConnectionLogic._
-
- /** Receives data and writes it to the serial backend. */
- private def in: Inlet[ByteString] = shape.in
-
- /** Receives data from the serial backend and pushes it downstream. */
- private def out: Outlet[ByteString] = shape.out
-
- /** Implicit alias to stageActor so it will be used in "!" calls, without
- * explicitly specifying a sender. */
- implicit private def self = stageActor.ref
-
- /**
- * Input handler for an established connection.
- * @param operator the operator actor of the established connection
- */
- class ConnectedInHandler(operator: ActorRef) extends InHandler {
-
- override def onPush(): Unit = {
- val elem = grab(in)
- require(elem != null) // reactive streams requirement
- operator ! CoreSerial.Write(elem, _ => WriteAck)
- }
-
- override def onUpstreamFinish(): Unit = {
- if (isClosed(out)) { // close serial connection if output is also closed
- operator ! CoreSerial.Close
- }
- }
-
- }
-
- class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
- // implicit alias to stage actor, so it will be used in "!" calls
- implicit val self = stageActor.ref
-
- override def onPull(): Unit = {
- // serial connections are at the end of the "backpressure chain",
- // they do not natively support backpressure (as does TCP for example)
- // therefore nothing is done here
- }
-
- override def onDownstreamFinish(): Unit = {
- // closing downstream also closes the underlying connection
- operator ! CoreSerial.Close
- }
-
- }
-
- override def preStart(): Unit = {
- setKeepGoing(true) // serial connection operator will manage completing stage
- getStageActor(connecting)
- stageActor watch manager
- manager ! CoreSerial.Open(port, settings, bufferSize)
- }
-
- setHandler(in, IgnoreTerminateInput)
- setHandler(out, IgnoreTerminateOutput)
-
- /** Initial behavior, before a serial connection is established. */
- private def connecting(event: (ActorRef, Any)): Unit = {
- val sender = event._1
- val message = event._2
-
- message match {
-
- case Terminated(`manager`) =>
- val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
- failStage(ex)
- connectionPromise.failure(ex)
-
- case CoreSerial.CommandFailed(cmd, reason) =>
- val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
- failStage(ex)
- connectionPromise.failure(ex)
-
- case CoreSerial.Opened(port) =>
- val operator = sender
- setHandler(in, new ConnectedInHandler(operator))
- setHandler(out, new ConnectedOutHandler(operator))
- stageActor become connected(operator)
- connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
- stageActor unwatch manager
- stageActor watch operator
- if (!isClosed(in)) {
- pull(in) // start pulling input
- }
-
- case other =>
- val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
- failStage(ex)
- connectionPromise.failure(ex)
-
- }
-
- }
-
- /** Behaviour once a connection has been established. It is assumed that operator is not null. */
- private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
- val sender = event._1
- val message = event._2
-
- message match {
-
- case Terminated(`operator`) =>
- failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
-
- case CoreSerial.CommandFailed(cmd, reason) =>
- failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
-
- case CoreSerial.Closed =>
- completeStage()
-
- case CoreSerial.Received(data) =>
- if (isAvailable(out)) {
- push(out, data)
- } else if (failOnOverflow) {
- /* Note that the native backend does not provide any way of informing about
- * dropped serial data. However, in most cases, a computer capable of running flow
- * is also capable of processing incoming serial data at typical baud rates.
- * Hence packets will usually only be dropped if an application that uses flow
- * backpressures, which can however be detected here. */
- failStage(new StreamSerialException("Incoming serial data was dropped."))
- }
-
- case WriteAck =>
- if (!isClosed(in)) {
- pull(in)
- }
-
- case other =>
- failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
-
- }
-
- }
-
-}
-
-private[stream] object SerialConnectionLogic {
-
- case object WriteAck extends CoreSerial.Event
-
-}