aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala')
-rw-r--r--stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala172
1 files changed, 172 insertions, 0 deletions
diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala
new file mode 100644
index 0000000..a650c82
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala
@@ -0,0 +1,172 @@
+package akka.serial
+package stream
+package impl
+
+import scala.concurrent.Promise
+
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.{FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
+import akka.util.ByteString
+
+import akka.serial.{Serial => CoreSerial, SerialSettings}
+
+/**
+ * Graph logic that handles establishing and forwarding serial communication.
+ * The underlying stream is closed when downstream (output) finishes,
+ * upstream (input) closes are ignored.
+ */
+private[stream] class SerialConnectionLogic(
+ shape: FlowShape[ByteString, ByteString],
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int,
+ connectionPromise: Promise[Serial.Connection])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+ import SerialConnectionLogic._
+
+ /** Receives data and writes it to the serial backend. */
+ private def in: Inlet[ByteString] = shape.in
+
+ /** Receives data from the serial backend and pushes it downstream. */
+ private def out: Outlet[ByteString] = shape.out
+
+ /** Implicit alias to stageActor so it will be used in "!" calls, without
+ * explicitly specifying a sender. */
+ implicit private def self = stageActor.ref
+
+ /**
+ * Input handler for an established connection.
+ * @param operator the operator actor of the established connection
+ */
+ class ConnectedInHandler(operator: ActorRef) extends InHandler {
+
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ require(elem != null) // reactive streams requirement
+ operator ! CoreSerial.Write(elem, _ => WriteAck)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ if (isClosed(out)) { // close serial connection if output is also closed
+ operator ! CoreSerial.Close
+ }
+ }
+
+ }
+
+ class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
+ // implicit alias to stage actor, so it will be used in "!" calls
+ implicit val self = stageActor.ref
+
+ override def onPull(): Unit = {
+ // serial connections are at the end of the "backpressure chain",
+ // they do not natively support backpressure (as does TCP for example)
+ // therefore nothing is done here
+ }
+
+ override def onDownstreamFinish(): Unit = {
+ // closing downstream also closes the underlying connection
+ operator ! CoreSerial.Close
+ }
+
+ }
+
+ override def preStart(): Unit = {
+ setKeepGoing(true) // serial connection operator will manage completing stage
+ getStageActor(connecting)
+ stageActor watch manager
+ manager ! CoreSerial.Open(port, settings, bufferSize)
+ }
+
+ setHandler(in, IgnoreTerminateInput)
+ setHandler(out, IgnoreTerminateOutput)
+
+ /** Initial behavior, before a serial connection is established. */
+ private def connecting(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`manager`) =>
+ val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.Opened(port) =>
+ val operator = sender
+ setHandler(in, new ConnectedInHandler(operator))
+ setHandler(out, new ConnectedOutHandler(operator))
+ stageActor become connected(operator)
+ connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
+ stageActor unwatch manager
+ stageActor watch operator
+ if (!isClosed(in)) {
+ pull(in) // start pulling input
+ }
+
+ case other =>
+ val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ }
+
+ }
+
+ /** Behaviour once a connection has been established. It is assumed that operator is not null. */
+ private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`operator`) =>
+ failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
+
+ case CoreSerial.Closed =>
+ completeStage()
+
+ case CoreSerial.Received(data) =>
+ if (isAvailable(out)) {
+ push(out, data)
+ } else if (failOnOverflow) {
+ /* Note that the native backend does not provide any way of informing about dropped serial
+ * data. However, in most cases, a computer capable of running akka-serial is also capable
+ * of processing incoming serial data at typical baud rates. Hence packets will usually
+ * only be dropped if an application that uses akka-serial backpressures, which can
+ * however be detected here. */
+ failStage(new StreamSerialException("Incoming serial data was dropped."))
+ }
+
+ case WriteAck =>
+ if (!isClosed(in)) {
+ pull(in)
+ }
+
+ case other =>
+ failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
+
+ }
+
+ }
+
+}
+
+private[stream] object SerialConnectionLogic {
+
+ case object WriteAck extends CoreSerial.Event
+
+}