aboutsummaryrefslogtreecommitdiff
path: root/stream
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-08 21:16:25 +0100
committerJakob Odersky <jakob@odersky.com>2017-01-21 17:22:10 -0800
commit23959966760174477a6b0fcbf9dd1e8ef37c643b (patch)
tree9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /stream
parent6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff)
downloadakka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip
Rename project to akka-serial
Diffstat (limited to 'stream')
-rw-r--r--stream/build.sbt5
-rw-r--r--stream/src/main/scala/akka/serial/stream/Serial.scala68
-rw-r--r--stream/src/main/scala/akka/serial/stream/StreamSerialException.scala5
-rw-r--r--stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala4
-rw-r--r--stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala172
-rw-r--r--stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala49
-rw-r--r--stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala65
-rw-r--r--stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala38
-rw-r--r--stream/src/test/resources/application.conf3
-rw-r--r--stream/src/test/scala/akka/serial/stream/SerialSpec.scala51
10 files changed, 460 insertions, 0 deletions
diff --git a/stream/build.sbt b/stream/build.sbt
new file mode 100644
index 0000000..e66f1d5
--- /dev/null
+++ b/stream/build.sbt
@@ -0,0 +1,5 @@
+import akkaserial.Dependencies
+
+libraryDependencies += Dependencies.akkaActor
+libraryDependencies += Dependencies.akkaStream
+libraryDependencies += Dependencies.scalatest % "test"
diff --git a/stream/src/main/scala/akka/serial/stream/Serial.scala b/stream/src/main/scala/akka/serial/stream/Serial.scala
new file mode 100644
index 0000000..785001f
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/Serial.scala
@@ -0,0 +1,68 @@
+package akka.serial
+package stream
+
+import akka.stream.scaladsl.Source
+import scala.concurrent.Future
+
+import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
+import akka.io.IO
+import akka.stream.scaladsl.Flow
+import akka.util.ByteString
+
+import akka.serial.{Serial => CoreSerial}
+import impl._
+
+
+object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
+
+ /**
+ * Represents a prospective serial connection.
+ */
+ case class Connection(port: String, settings: SerialSettings)
+
+ case class Watch(ports: Set[String])
+
+ def apply()(implicit system: ActorSystem): Serial = super.apply(system)
+
+ override def lookup() = Serial
+
+ override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)
+
+}
+
+/**
+ * Entry point to streaming over serial ports.
+ * The design of this API is inspired by Akka's Tcp streams.
+ */
+class Serial(system: ExtendedActorSystem) extends Extension {
+
+ /**
+ * Creates a Flow that will open a serial port when materialized.
+ * This Flow then represents an open serial connection: data pushed to its
+ * inlet will be written to the underlying serial port, and data received
+ * on the port will be emitted by its outlet.
+ * @param port name of serial port to open
+ * @param settings settings to use with serial port
+ * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
+ * @param bufferSize maximum read and write buffer sizes
+ * @return a Flow associated to the given serial port
+ */
+ def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
+ Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
+ new SerialConnectionStage(
+ IO(CoreSerial)(system),
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize
+ )
+ )
+
+ def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
+ new WatcherStage(
+ IO(CoreSerial)(system),
+ ports
+ )
+ )
+
+}
diff --git a/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala
new file mode 100644
index 0000000..e2bb519
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/StreamSerialException.scala
@@ -0,0 +1,5 @@
+package akka.serial
+package stream
+
+/** Represents a generic exception occured during streaming of serial data. */
+class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala
new file mode 100644
index 0000000..aafbfd6
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/StreamWatcherException.scala
@@ -0,0 +1,4 @@
+package akka.serial
+package stream
+
+class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala
new file mode 100644
index 0000000..a650c82
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionLogic.scala
@@ -0,0 +1,172 @@
+package akka.serial
+package stream
+package impl
+
+import scala.concurrent.Promise
+
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.{FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
+import akka.util.ByteString
+
+import akka.serial.{Serial => CoreSerial, SerialSettings}
+
+/**
+ * Graph logic that handles establishing and forwarding serial communication.
+ * The underlying stream is closed when downstream (output) finishes,
+ * upstream (input) closes are ignored.
+ */
+private[stream] class SerialConnectionLogic(
+ shape: FlowShape[ByteString, ByteString],
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int,
+ connectionPromise: Promise[Serial.Connection])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+ import SerialConnectionLogic._
+
+ /** Receives data and writes it to the serial backend. */
+ private def in: Inlet[ByteString] = shape.in
+
+ /** Receives data from the serial backend and pushes it downstream. */
+ private def out: Outlet[ByteString] = shape.out
+
+ /** Implicit alias to stageActor so it will be used in "!" calls, without
+ * explicitly specifying a sender. */
+ implicit private def self = stageActor.ref
+
+ /**
+ * Input handler for an established connection.
+ * @param operator the operator actor of the established connection
+ */
+ class ConnectedInHandler(operator: ActorRef) extends InHandler {
+
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ require(elem != null) // reactive streams requirement
+ operator ! CoreSerial.Write(elem, _ => WriteAck)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ if (isClosed(out)) { // close serial connection if output is also closed
+ operator ! CoreSerial.Close
+ }
+ }
+
+ }
+
+ class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
+ // implicit alias to stage actor, so it will be used in "!" calls
+ implicit val self = stageActor.ref
+
+ override def onPull(): Unit = {
+ // serial connections are at the end of the "backpressure chain",
+ // they do not natively support backpressure (as does TCP for example)
+ // therefore nothing is done here
+ }
+
+ override def onDownstreamFinish(): Unit = {
+ // closing downstream also closes the underlying connection
+ operator ! CoreSerial.Close
+ }
+
+ }
+
+ override def preStart(): Unit = {
+ setKeepGoing(true) // serial connection operator will manage completing stage
+ getStageActor(connecting)
+ stageActor watch manager
+ manager ! CoreSerial.Open(port, settings, bufferSize)
+ }
+
+ setHandler(in, IgnoreTerminateInput)
+ setHandler(out, IgnoreTerminateOutput)
+
+ /** Initial behavior, before a serial connection is established. */
+ private def connecting(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`manager`) =>
+ val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.Opened(port) =>
+ val operator = sender
+ setHandler(in, new ConnectedInHandler(operator))
+ setHandler(out, new ConnectedOutHandler(operator))
+ stageActor become connected(operator)
+ connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
+ stageActor unwatch manager
+ stageActor watch operator
+ if (!isClosed(in)) {
+ pull(in) // start pulling input
+ }
+
+ case other =>
+ val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ }
+
+ }
+
+ /** Behaviour once a connection has been established. It is assumed that operator is not null. */
+ private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`operator`) =>
+ failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
+
+ case CoreSerial.Closed =>
+ completeStage()
+
+ case CoreSerial.Received(data) =>
+ if (isAvailable(out)) {
+ push(out, data)
+ } else if (failOnOverflow) {
+ /* Note that the native backend does not provide any way of informing about dropped serial
+ * data. However, in most cases, a computer capable of running akka-serial is also capable
+ * of processing incoming serial data at typical baud rates. Hence packets will usually
+ * only be dropped if an application that uses akka-serial backpressures, which can
+ * however be detected here. */
+ failStage(new StreamSerialException("Incoming serial data was dropped."))
+ }
+
+ case WriteAck =>
+ if (!isClosed(in)) {
+ pull(in)
+ }
+
+ case other =>
+ failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
+
+ }
+
+ }
+
+}
+
+private[stream] object SerialConnectionLogic {
+
+ case object WriteAck extends CoreSerial.Event
+
+}
diff --git a/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala
new file mode 100644
index 0000000..aa67943
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala
@@ -0,0 +1,49 @@
+package akka.serial
+package stream
+package impl
+
+import scala.concurrent.{Future, Promise}
+
+import akka.actor.ActorRef
+import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
+import akka.util.ByteString
+
+/**
+ * Graph stage that establishes and thereby materializes a serial connection.
+ * The actual connection logic is deferred to [[SerialConnectionLogic]].
+ */
+private[stream] class SerialConnectionStage(
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int
+) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] {
+
+ val in: Inlet[ByteString] = Inlet("Serial.in")
+ val out: Outlet[ByteString] = Outlet("Serial.out")
+
+ val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
+ (GraphStageLogic, Future[Serial.Connection]) = {
+
+ val connectionPromise = Promise[Serial.Connection]
+
+ val logic = new SerialConnectionLogic(
+ shape,
+ manager,
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize,
+ connectionPromise
+ )
+
+ (logic, connectionPromise.future)
+ }
+
+ override def toString = s"Serial($port)"
+
+}
diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala
new file mode 100644
index 0000000..afab60e
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherLogic.scala
@@ -0,0 +1,65 @@
+package akka.serial
+package stream
+package impl
+
+import scala.concurrent.Promise
+
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.SourceShape
+import akka.stream.stage.GraphStageLogic
+import akka.serial.{Serial => CoreSerial}
+
+private[stream] class WatcherLogic(
+ shape: SourceShape[String],
+ ioManager: ActorRef,
+ ports: Set[String],
+ watchPromise: Promise[Serial.Watch])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+
+ implicit private def self = stageActor.ref
+
+ override def preStart(): Unit = {
+ getStageActor(receive)
+ stageActor watch ioManager
+ for (dir <- WatcherLogic.getDirs(ports)) {
+ ioManager ! CoreSerial.Watch(dir, skipInitial = false)
+ }
+ }
+
+ setHandler(shape.out, IgnoreTerminateOutput)
+
+ private def receive(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`ioManager`) =>
+ val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ watchPromise.failure(ex)
+
+ case CoreSerial.Connected(port) =>
+ if (ports contains port) {
+ if (isAvailable(shape.out)) {
+ push(shape.out, port)
+ }
+ }
+
+ case other =>
+ failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))
+
+ }
+ }
+
+}
+
+private[stream] object WatcherLogic {
+ def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
+}
diff --git a/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala
new file mode 100644
index 0000000..649249f
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala
@@ -0,0 +1,38 @@
+package akka.serial
+package stream
+package impl
+
+import scala.concurrent.{Future, Promise}
+
+import akka.actor.ActorRef
+import akka.stream.{Attributes, Outlet, SourceShape}
+import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic}
+
+
+private[stream] class WatcherStage(
+ ioManager: ActorRef,
+ ports: Set[String]
+) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] {
+
+ val out = Outlet[String]("Watcher.out")
+
+ val shape = new SourceShape(out)
+
+ override def createLogicAndMaterializedValue(attributes: Attributes):
+ (GraphStageLogic, Future[Serial.Watch]) = {
+
+ val promise = Promise[Serial.Watch]
+
+ val logic = new WatcherLogic(
+ shape,
+ ioManager,
+ ports,
+ promise
+ )
+
+ (logic, promise.future)
+ }
+
+ override def toString = s"Watcher($ports)"
+
+}
diff --git a/stream/src/test/resources/application.conf b/stream/src/test/resources/application.conf
new file mode 100644
index 0000000..bdf80e2
--- /dev/null
+++ b/stream/src/test/resources/application.conf
@@ -0,0 +1,3 @@
+# Don't fill test output with log messages
+akka.stdout-loglevel = "OFF"
+akka.loglevel = "OFF" \ No newline at end of file
diff --git a/stream/src/test/scala/akka/serial/stream/SerialSpec.scala b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala
new file mode 100644
index 0000000..7a64383
--- /dev/null
+++ b/stream/src/test/scala/akka/serial/stream/SerialSpec.scala
@@ -0,0 +1,51 @@
+package akka.serial
+package stream
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Keep, Sink, Source}
+import akka.util.ByteString
+import org.scalatest._
+
+class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal {
+
+ implicit val system = ActorSystem("akka-serial-test")
+ implicit val materializer = ActorMaterializer()
+
+ override def afterAll {
+ system.terminate()
+ }
+
+ "Serial stream" should {
+ val data = ByteString(("hello world").getBytes("utf-8"))
+
+ "receive the same data it sends in an echo test" in {
+ withEcho { case (port, settings) =>
+ val graph = Source.single(data)
+ .via(Serial().open(port, settings)) // send to echo pty
+ .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS
+ .dropWhile(_ != data)
+ .toMat(Sink.head)(Keep.right)
+
+ Await.result(graph.run(), 2.seconds)
+ }
+ }
+
+ "fail if the underlying pty fails" in {
+ val result = withEcho { case (port, settings) =>
+ Source.single(data)
+ .via(Serial().open(port, settings))
+ .toMat(Sink.last)(Keep.right)
+ .run()}
+
+ intercept[StreamSerialException] {
+ Await.result(result, 10.seconds)
+ }
+ }
+
+ }
+
+}