aboutsummaryrefslogtreecommitdiff
path: root/flow-stream
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-01-24 20:21:17 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-03 20:46:28 -0800
commitf865a76c2f441f619b069505b73fcbd1cba1a67c (patch)
tree3f53c519f4575037bdebf8c8399ca25d50649543 /flow-stream
parent46c30908f827e27b58166f56efa4f15917c1af4f (diff)
downloadakka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.gz
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.bz2
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.zip
Add support for Akka streams
Diffstat (limited to 'flow-stream')
-rw-r--r--flow-stream/build.sbt7
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala59
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamSerialException.scala5
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala171
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala48
5 files changed, 290 insertions, 0 deletions
diff --git a/flow-stream/build.sbt b/flow-stream/build.sbt
new file mode 100644
index 0000000..183c20f
--- /dev/null
+++ b/flow-stream/build.sbt
@@ -0,0 +1,7 @@
+import flow.{FlowBuild, Dependencies}
+
+FlowBuild.commonSettings
+
+libraryDependencies += Dependencies.akkaActor
+libraryDependencies += Dependencies.akkaStream
+
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
new file mode 100644
index 0000000..26efcc9
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
@@ -0,0 +1,59 @@
+package com.github.jodersky.flow
+package stream
+
+import akka.actor._
+import akka.stream.scaladsl.Flow
+import akka.stream._
+import akka.stream.stage._
+import akka.dispatch.ExecutionContexts
+import akka.util.ByteString
+import com.github.jodersky.flow.{Serial => CoreSerial, _}
+import scala.concurrent._
+import akka.io._
+
+import impl._
+
+object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
+
+ /**
+ * Represents a prospective serial connection.
+ */
+ case class Connection(port: String, settings: SerialSettings)
+
+ def apply()(implicit system: ActorSystem): Serial = super.apply(system)
+
+ override def lookup() = Serial
+
+ override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)
+
+}
+
+/**
+ * Entry point to streaming over serial ports.
+ * The design of this API is inspired by Akka's Tcp streams.
+ */
+class Serial(system: ExtendedActorSystem) extends Extension {
+
+ /**
+ * Creates a Flow that will open a serial port when materialized.
+ * This Flow then represents an open serial connection: data pushed to its
+ * inlet will be written to the underlying serial port, and data received
+ * on the port will be emitted by its outlet.
+ * @param port name of serial port to open
+ * @param settings settings to use with serial port
+ * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
+ * @param bufferSize maximum read and write buffer sizes
+ * @return a Flow associated to the given serial port
+ */
+ def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
+ Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
+ new SerialConnectionStage(
+ IO(CoreSerial)(system),
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize
+ )
+ )
+
+}
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamSerialException.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamSerialException.scala
new file mode 100644
index 0000000..fad92fc
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/StreamSerialException.scala
@@ -0,0 +1,5 @@
+package com.github.jodersky.flow
+package stream
+
+/** Represents a generic exception occured during streaming of serial data. */
+class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
new file mode 100644
index 0000000..35be464
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
@@ -0,0 +1,171 @@
+package com.github.jodersky.flow
+package stream
+package impl
+
+import akka.actor._
+import akka.stream.stage.GraphStageLogic.StageActorRef
+import akka.util._
+import akka.stream._
+import akka.stream.stage._
+import scala.concurrent._
+import akka.io._
+import com.github.jodersky.flow.{Serial => CoreSerial}
+import akka.stream.impl.ReactiveStreamsCompliance
+
+
+/**
+ * Graph logic that handles establishing and forwarding serial communication.
+ * The underlying stream is closed when downstream (output) finishes,
+ * conversely upstream (input) closes are ignored.
+ */
+private[stream] class SerialConnectionLogic(
+ shape: FlowShape[ByteString, ByteString],
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int,
+ connectionPromise: Promise[Serial.Connection])
+ extends GraphStageLogic(shape) {
+ import GraphStageLogic._
+ import SerialConnectionLogic._
+
+ /** Receives data and writes it to the serial backend. */
+ private def in: Inlet[ByteString] = shape.in
+
+ /** Receives data from the serial backend and pushes it downstream. */
+ private def out: Outlet[ByteString] = shape.out
+
+ /** Implicit alias to stageActor so it will be used in "!" calls, without
+ * explicitly specifying a sender. */
+ implicit private def self = stageActor.ref
+
+ /** Input handler for an established connection.
+ * @param operator ther operator actor of the established connection
+ */
+ class ConnectedInHandler(operator: ActorRef) extends InHandler {
+
+ override def onPush(): Unit = {
+ val elem = grab(in)
+ require(elem != null) // reactive streams requirement
+ operator ! CoreSerial.Write(elem, _ => WriteAck)
+ }
+
+ override def onUpstreamFinish(): Unit = {
+ if (isClosed(out)) { // close serial connection if output is also closed
+ operator ! CoreSerial.Close
+ }
+ }
+
+ }
+
+ class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
+ // alias stage actor as implicit to it will be used in "!" calls
+ implicit val self = stageActor.ref
+
+ override def onPull(): Unit = {
+ // serial connections are at the end of the "backpressure chain",
+ // they do not natively support backpressure (as does TCP for example)
+ // therefore nothing is done here as
+ }
+
+ override def onDownstreamFinish(): Unit = {
+ // closing downstream also closes the underlying connection
+ operator ! CoreSerial.Close
+ }
+
+ }
+
+ override def preStart(): Unit = {
+ setKeepGoing(true) // serial connection operator will manage completing stage
+ getStageActor(connecting)
+ stageActor watch manager
+ manager ! CoreSerial.Open(port, settings, bufferSize)
+ }
+
+ setHandler(in, IgnoreTerminateInput)
+ setHandler(out, IgnoreTerminateOutput)
+
+ /** Initial behaviour, before a serial connection is established. */
+ private def connecting(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`manager`) =>
+ val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ case CoreSerial.Opened(port) =>
+ val operator = sender
+ setHandler(in, new ConnectedInHandler(operator))
+ setHandler(out, new ConnectedOutHandler(operator))
+ stageActor become connected(operator)
+ connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
+ stageActor unwatch manager
+ stageActor watch operator
+ pull(in) // start pulling input
+
+ case other =>
+ val ex = new StreamSerialException(s"Stage actor received unkown message [$other]")
+ failStage(ex)
+ connectionPromise.failure(ex)
+
+ }
+
+ }
+
+ /** Behaviour once a connection has been established. It is assumed that operator is not null. */
+ private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
+ val sender = event._1
+ val message = event._2
+
+ message match {
+
+ case Terminated(`operator`) =>
+ failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
+
+ case CoreSerial.CommandFailed(cmd, reason) =>
+ failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
+
+ case CoreSerial.Closed =>
+ completeStage()
+
+ case CoreSerial.Received(data) =>
+ if (isAvailable(out)) {
+ push(out, data)
+ } else if (failOnOverflow) {
+ /* Note that the native backend does not provide any way of informing about
+ * dropped serial data. However, in most cases, a computer capable of running flow
+ * is also capable of processing incoming serial data at typical baud rates.
+ * Hence packets will usually only be dropped if an application that uses flow
+ * backpressures, which can however be detected here. */
+ failStage(new StreamSerialException("Incoming serial data was dropped."))
+ }
+
+ case WriteAck =>
+ if (!isClosed(in)) {
+ pull(in)
+ }
+
+ case other =>
+ failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
+
+ }
+
+ }
+
+}
+
+private[stream] object SerialConnectionLogic {
+
+ case object WriteAck extends CoreSerial.Event
+
+}
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
new file mode 100644
index 0000000..2efe3e9
--- /dev/null
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
@@ -0,0 +1,48 @@
+package com.github.jodersky.flow
+package stream
+package impl
+
+import akka.actor._
+import akka.util._
+import akka.stream._
+import akka.stream.stage._
+import scala.concurrent._
+
+/**
+ * Graph stage that establishes and thereby materializes a serial connection.
+ * The actual connection logic is deferred to [[SerialConnectionLogic]].
+ */
+private[stream] class SerialConnectionStage(
+ manager: ActorRef,
+ port: String,
+ settings: SerialSettings,
+ failOnOverflow: Boolean,
+ bufferSize: Int
+) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] {
+
+ val in: Inlet[ByteString] = Inlet("Serial.in")
+ val out: Outlet[ByteString] = Outlet("Serial.out")
+
+ val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
+
+ override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
+ (GraphStageLogic, Future[Serial.Connection]) = {
+
+ val connectionPromise = Promise[Serial.Connection]
+
+ val logic = new SerialConnectionLogic(
+ shape,
+ manager,
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize,
+ connectionPromise
+ )
+
+ (logic, connectionPromise.future)
+ }
+
+ override def toString = s"Serial($port)"
+
+}