path: root/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
diff options
Diffstat (limited to 'flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala')
1 files changed, 171 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
new file mode 100644
index 0000000..35be464
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
@@ -0,0 +1,171 @@
+package com.github.jodersky.flow
+package stream
+package impl
+import akka.actor._
+import akka.stream.stage.GraphStageLogic.StageActorRef
+import akka.util._
+import akka.stream._
+import akka.stream.stage._
+import scala.concurrent._
+import akka.io._
+import com.github.jodersky.flow.{Serial => CoreSerial}
+import akka.stream.impl.ReactiveStreamsCompliance
+ * Graph logic that handles establishing and forwarding serial communication.
+ * The underlying stream is closed when downstream (output) finishes,
+ * conversely upstream (input) closes are ignored.
+ */
+private[stream] class SerialConnectionLogic(
+ shape: FlowShape[ByteString, ByteString],
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int,
+ connectionPromise: Promise[Serial.Connection])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+ import SerialConnectionLogic._
+ /** Receives data and writes it to the serial backend. */
+ private def in: Inlet[ByteString] = shape.in
+ /** Receives data from the serial backend and pushes it downstream. */
+ private def out: Outlet[ByteString] = shape.out
+ /** Implicit alias to stageActor so it will be used in "!" calls, without
+ * explicitly specifying a sender. */
+ implicit private def self = stageActor.ref
+ /** Input handler for an established connection.
+ * @param operator ther operator actor of the established connection
+ */
+ class ConnectedInHandler(operator: ActorRef) extends InHandler {
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ require(elem != null) // reactive streams requirement
+ operator ! CoreSerial.Write(elem, _ => WriteAck)
+ }
+ override def onUpstreamFinish(): Unit = {
+ if (isClosed(out)) { // close serial connection if output is also closed
+ operator ! CoreSerial.Close
+ }
+ }
+ }
+ class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
+ // alias stage actor as implicit to it will be used in "!" calls
+ implicit val self = stageActor.ref
+ override def onPull(): Unit = {
+ // serial connections are at the end of the "backpressure chain",
+ // they do not natively support backpressure (as does TCP for example)
+ // therefore nothing is done here as
+ }
+ override def onDownstreamFinish(): Unit = {
+ // closing downstream also closes the underlying connection
+ operator ! CoreSerial.Close
+ }
+ }
+ override def preStart(): Unit = {
+ setKeepGoing(true) // serial connection operator will manage completing stage
+ getStageActor(connecting)
+ stageActor watch manager
+ manager ! CoreSerial.Open(port, settings, bufferSize)
+ }
+ setHandler(in, IgnoreTerminateInput)
+ setHandler(out, IgnoreTerminateOutput)
+ /** Initial behaviour, before a serial connection is established. */
+ private def connecting(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+ message match {
+ case Terminated(`manager`) =>
+ val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
+ failStage(ex)
+ connectionPromise.failure(ex)
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ connectionPromise.failure(ex)
+ case CoreSerial.Opened(port) =>
+ val operator = sender
+ setHandler(in, new ConnectedInHandler(operator))
+ setHandler(out, new ConnectedOutHandler(operator))
+ stageActor become connected(operator)
+ connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
+ stageActor unwatch manager
+ stageActor watch operator
+ pull(in) // start pulling input
+ case other =>
+ val ex = new StreamSerialException(s"Stage actor received unkown message [$other]")
+ failStage(ex)
+ connectionPromise.failure(ex)
+ }
+ }
+ /** Behaviour once a connection has been established. It is assumed that operator is not null. */
+ private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+ message match {
+ case Terminated(`operator`) =>
+ failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
+ case CoreSerial.Closed =>
+ completeStage()
+ case CoreSerial.Received(data) =>
+ if (isAvailable(out)) {
+ push(out, data)
+ } else if (failOnOverflow) {
+ /* Note that the native backend does not provide any way of informing about
+ * dropped serial data. However, in most cases, a computer capable of running flow
+ * is also capable of processing incoming serial data at typical baud rates.
+ * Hence packets will usually only be dropped if an application that uses flow
+ * backpressures, which can however be detected here. */
+ failStage(new StreamSerialException("Incoming serial data was dropped."))
+ }
+ case WriteAck =>
+ if (!isClosed(in)) {
+ pull(in)
+ }
+ case other =>
+ failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
+ }
+ }
+private[stream] object SerialConnectionLogic {
+ case object WriteAck extends CoreSerial.Event