diff options
Diffstat (limited to 'stream')
10 files changed, 460 insertions, 0 deletions
diff --git a/stream/build.sbt b/stream/build.sbt new file mode 100644 index 0000000..e66f1d5 --- /dev/null +++ b/stream/build.sbt @@ -0,0 +1,5 @@ +import akkaserial.Dependencies + +libraryDependencies += Dependencies.akkaActor +libraryDependencies += Dependencies.akkaStream +libraryDependencies += Dependencies.scalatest % "test" diff --git a/stream/src/main/scala/akka/serial/stream/Serial.scala b/stream/src/main/scala/akka/serial/stream/Serial.scala new file mode 100644 index 0000000..785001f --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/Serial.scala @@ -0,0 +1,68 @@ +package akka.serial +package stream + +import akka.stream.scaladsl.Source +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +import akka.serial.{Serial => CoreSerial} +import impl._ + + +object Serial extends ExtensionId[Serial] with ExtensionIdProvider { + + /** + * Represents a prospective serial connection. + */ + case class Connection(port: String, settings: SerialSettings) + + case class Watch(ports: Set[String]) + + def apply()(implicit system: ActorSystem): Serial = super.apply(system) + + override def lookup() = Serial + + override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) + +} + +/** + * Entry point to streaming over serial ports. + * The design of this API is inspired by Akka's Tcp streams. + */ +class Serial(system: ExtendedActorSystem) extends Extension { + + /** + * Creates a Flow that will open a serial port when materialized. + * This Flow then represents an open serial connection: data pushed to its + * inlet will be written to the underlying serial port, and data received + * on the port will be emitted by its outlet. + * @param port name of serial port to open + * @param settings settings to use with serial port + * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped + * @param bufferSize maximum read and write buffer sizes + * @return a Flow associated to the given serial port + */ + def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): + Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( + new SerialConnectionStage( + IO(CoreSerial)(system), + port, + settings, + failOnOverflow, + bufferSize + ) + ) + + def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( + new WatcherStage( + IO(CoreSerial)(system), + ports + ) + ) + +} diff --git a/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala new file mode 100644 index 0000000..e2bb519 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala @@ -0,0 +1,5 @@ +package akka.serial +package stream + +/** Represents a generic exception occured during streaming of serial data. */ +class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala new file mode 100644 index 0000000..aafbfd6 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala @@ -0,0 +1,4 @@ +package akka.serial +package stream + +class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala new file mode 100644 index 0000000..a650c82 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala @@ -0,0 +1,172 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.{FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString + +import akka.serial.{Serial => CoreSerial, SerialSettings} + +/** + * Graph logic that handles establishing and forwarding serial communication. + * The underlying stream is closed when downstream (output) finishes, + * upstream (input) closes are ignored. + */ +private[stream] class SerialConnectionLogic( + shape: FlowShape[ByteString, ByteString], + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int, + connectionPromise: Promise[Serial.Connection]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + import SerialConnectionLogic._ + + /** Receives data and writes it to the serial backend. */ + private def in: Inlet[ByteString] = shape.in + + /** Receives data from the serial backend and pushes it downstream. */ + private def out: Outlet[ByteString] = shape.out + + /** Implicit alias to stageActor so it will be used in "!" calls, without + * explicitly specifying a sender. */ + implicit private def self = stageActor.ref + + /** + * Input handler for an established connection. + * @param operator the operator actor of the established connection + */ + class ConnectedInHandler(operator: ActorRef) extends InHandler { + + override def onPush(): Unit = { + val elem = grab(in) + require(elem != null) // reactive streams requirement + operator ! CoreSerial.Write(elem, _ => WriteAck) + } + + override def onUpstreamFinish(): Unit = { + if (isClosed(out)) { // close serial connection if output is also closed + operator ! CoreSerial.Close + } + } + + } + + class ConnectedOutHandler(operator: ActorRef) extends OutHandler { + // implicit alias to stage actor, so it will be used in "!" calls + implicit val self = stageActor.ref + + override def onPull(): Unit = { + // serial connections are at the end of the "backpressure chain", + // they do not natively support backpressure (as does TCP for example) + // therefore nothing is done here + } + + override def onDownstreamFinish(): Unit = { + // closing downstream also closes the underlying connection + operator ! CoreSerial.Close + } + + } + + override def preStart(): Unit = { + setKeepGoing(true) // serial connection operator will manage completing stage + getStageActor(connecting) + stageActor watch manager + manager ! CoreSerial.Open(port, settings, bufferSize) + } + + setHandler(in, IgnoreTerminateInput) + setHandler(out, IgnoreTerminateOutput) + + /** Initial behavior, before a serial connection is established. */ + private def connecting(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`manager`) => + val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) + failStage(ex) + connectionPromise.failure(ex) + + case CoreSerial.Opened(port) => + val operator = sender + setHandler(in, new ConnectedInHandler(operator)) + setHandler(out, new ConnectedOutHandler(operator)) + stageActor become connected(operator) + connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value + stageActor unwatch manager + stageActor watch operator + if (!isClosed(in)) { + pull(in) // start pulling input + } + + case other => + val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") + failStage(ex) + connectionPromise.failure(ex) + + } + + } + + /** Behaviour once a connection has been established. It is assumed that operator is not null. */ + private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`operator`) => + failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) + + case CoreSerial.CommandFailed(cmd, reason) => + failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) + + case CoreSerial.Closed => + completeStage() + + case CoreSerial.Received(data) => + if (isAvailable(out)) { + push(out, data) + } else if (failOnOverflow) { + /* Note that the native backend does not provide any way of informing about dropped serial + * data. However, in most cases, a computer capable of running akka-serial is also capable + * of processing incoming serial data at typical baud rates. Hence packets will usually + * only be dropped if an application that uses akka-serial backpressures, which can + * however be detected here. */ + failStage(new StreamSerialException("Incoming serial data was dropped.")) + } + + case WriteAck => + if (!isClosed(in)) { + pull(in) + } + + case other => + failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) + + } + + } + +} + +private[stream] object SerialConnectionLogic { + + case object WriteAck extends CoreSerial.Event + +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala new file mode 100644 index 0000000..aa67943 --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala @@ -0,0 +1,49 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString + +/** + * Graph stage that establishes and thereby materializes a serial connection. + * The actual connection logic is deferred to [[SerialConnectionLogic]]. + */ +private[stream] class SerialConnectionStage( + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int +) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { + + val in: Inlet[ByteString] = Inlet("Serial.in") + val out: Outlet[ByteString] = Outlet("Serial.out") + + val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): + (GraphStageLogic, Future[Serial.Connection]) = { + + val connectionPromise = Promise[Serial.Connection] + + val logic = new SerialConnectionLogic( + shape, + manager, + port, + settings, + failOnOverflow, + bufferSize, + connectionPromise + ) + + (logic, connectionPromise.future) + } + + override def toString = s"Serial($port)" + +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala new file mode 100644 index 0000000..afab60e --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala @@ -0,0 +1,65 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.Promise + +import akka.actor.{ActorRef, Terminated} +import akka.stream.SourceShape +import akka.stream.stage.GraphStageLogic +import akka.serial.{Serial => CoreSerial} + +private[stream] class WatcherLogic( + shape: SourceShape[String], + ioManager: ActorRef, + ports: Set[String], + watchPromise: Promise[Serial.Watch]) + extends GraphStageLogic(shape) { + import GraphStageLogic._ + + implicit private def self = stageActor.ref + + override def preStart(): Unit = { + getStageActor(receive) + stageActor watch ioManager + for (dir <- WatcherLogic.getDirs(ports)) { + ioManager ! CoreSerial.Watch(dir, skipInitial = false) + } + } + + setHandler(shape.out, IgnoreTerminateOutput) + + private def receive(event: (ActorRef, Any)): Unit = { + val sender = event._1 + val message = event._2 + + message match { + + case Terminated(`ioManager`) => + val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.CommandFailed(cmd, reason) => + val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) + failStage(ex) + watchPromise.failure(ex) + + case CoreSerial.Connected(port) => + if (ports contains port) { + if (isAvailable(shape.out)) { + push(shape.out, port) + } + } + + case other => + failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) + + } + } + +} + +private[stream] object WatcherLogic { + def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) +} diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala new file mode 100644 index 0000000..649249f --- /dev/null +++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala @@ -0,0 +1,38 @@ +package akka.serial +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, Outlet, SourceShape} +import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} + + +private[stream] class WatcherStage( + ioManager: ActorRef, + ports: Set[String] +) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { + + val out = Outlet[String]("Watcher.out") + + val shape = new SourceShape(out) + + override def createLogicAndMaterializedValue(attributes: Attributes): + (GraphStageLogic, Future[Serial.Watch]) = { + + val promise = Promise[Serial.Watch] + + val logic = new WatcherLogic( + shape, + ioManager, + ports, + promise + ) + + (logic, promise.future) + } + + override def toString = s"Watcher($ports)" + +} diff --git a/stream/src/test/resources/application.conf b/stream/src/test/resources/application.conf new file mode 100644 index 0000000..bdf80e2 --- /dev/null +++ b/stream/src/test/resources/application.conf @@ -0,0 +1,3 @@ +# Don't fill test output with log messages +akka.stdout-loglevel = "OFF" +akka.loglevel = "OFF"
\ No newline at end of file diff --git a/stream/src/test/scala/akka/serial/stream/SerialSpec.scala b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala new file mode 100644 index 0000000..7a64383 --- /dev/null +++ b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala @@ -0,0 +1,51 @@ +package akka.serial +package stream + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Keep, Sink, Source} +import akka.util.ByteString +import org.scalatest._ + +class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { + + implicit val system = ActorSystem("akka-serial-test") + implicit val materializer = ActorMaterializer() + + override def afterAll { + system.terminate() + } + + "Serial stream" should { + val data = ByteString(("hello world").getBytes("utf-8")) + + "receive the same data it sends in an echo test" in { + withEcho { case (port, settings) => + val graph = Source.single(data) + .via(Serial().open(port, settings)) // send to echo pty + .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS + .dropWhile(_ != data) + .toMat(Sink.head)(Keep.right) + + Await.result(graph.run(), 2.seconds) + } + } + + "fail if the underlying pty fails" in { + val result = withEcho { case (port, settings) => + Source.single(data) + .via(Serial().open(port, settings)) + .toMat(Sink.last)(Keep.right) + .run()} + + intercept[StreamSerialException] { + Await.result(result, 10.seconds) + } + } + + } + +} |