aboutsummaryrefslogtreecommitdiff
path: root/flow-stream
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2017-01-08 21:16:25 +0100
committerJakob Odersky <jakob@odersky.com>2017-01-21 17:22:10 -0800
commit23959966760174477a6b0fcbf9dd1e8ef37c643b (patch)
tree9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /flow-stream
parent6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff)
downloadakka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2
akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip
Rename project to akka-serial
Diffstat (limited to 'flow-stream')
-rw-r--r--flow-stream/build.sbt5
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala67
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala5
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala4
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala172
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala49
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala65
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala38
-rw-r--r--flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala51
9 files changed, 0 insertions, 456 deletions
diff --git a/flow-stream/build.sbt b/flow-stream/build.sbt
deleted file mode 100644
index c9aa7eb..0000000
--- a/flow-stream/build.sbt
+++ /dev/null
@@ -1,5 +0,0 @@
-import flow.Dependencies
-
-libraryDependencies += Dependencies.akkaActor
-libraryDependencies += Dependencies.akkaStream
-libraryDependencies += Dependencies.scalatest % "test"
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala
deleted file mode 100644
index d478de8..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala
+++ /dev/null
@@ -1,67 +0,0 @@
-package ch.jodersky.flow
-package stream
-
-import akka.stream.scaladsl.Source
-import scala.concurrent.Future
-
-import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
-import akka.io.IO
-import akka.stream.scaladsl.Flow
-import akka.util.ByteString
-
-import ch.jodersky.flow.{Serial => CoreSerial}
-import impl._
-
-object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
-
- /**
- * Represents a prospective serial connection.
- */
- case class Connection(port: String, settings: SerialSettings)
-
- case class Watch(ports: Set[String])
-
- def apply()(implicit system: ActorSystem): Serial = super.apply(system)
-
- override def lookup() = Serial
-
- override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)
-
-}
-
-/**
- * Entry point to streaming over serial ports.
- * The design of this API is inspired by Akka's Tcp streams.
- */
-class Serial(system: ExtendedActorSystem) extends Extension {
-
- /**
- * Creates a Flow that will open a serial port when materialized.
- * This Flow then represents an open serial connection: data pushed to its
- * inlet will be written to the underlying serial port, and data received
- * on the port will be emitted by its outlet.
- * @param port name of serial port to open
- * @param settings settings to use with serial port
- * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
- * @param bufferSize maximum read and write buffer sizes
- * @return a Flow associated to the given serial port
- */
- def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
- Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
- new SerialConnectionStage(
- IO(CoreSerial)(system),
- port,
- settings,
- failOnOverflow,
- bufferSize
- )
- )
-
- def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
- new WatcherStage(
- IO(CoreSerial)(system),
- ports
- )
- )
-
-}
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala
deleted file mode 100644
index 78438f9..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala
+++ /dev/null
@@ -1,5 +0,0 @@
-package ch.jodersky.flow
-package stream
-
-/** Represents a generic exception occured during streaming of serial data. */
-class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala
deleted file mode 100644
index 8eee61c..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala
+++ /dev/null
@@ -1,4 +0,0 @@
-package ch.jodersky.flow
-package stream
-
-class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause)
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
deleted file mode 100644
index 764b054..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-package ch.jodersky.flow
-package stream
-package impl
-
-import scala.concurrent.Promise
-
-import akka.actor.{ActorRef, Terminated}
-import akka.stream.{FlowShape, Inlet, Outlet}
-import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
-import akka.util.ByteString
-
-import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings}
-
-/**
- * Graph logic that handles establishing and forwarding serial communication.
- * The underlying stream is closed when downstream (output) finishes,
- * upstream (input) closes are ignored.
- */
-private[stream] class SerialConnectionLogic(
- shape: FlowShape[ByteString, ByteString],
- manager: ActorRef,
- port: String,
- settings: SerialSettings,
- failOnOverflow: Boolean,
- bufferSize: Int,
- connectionPromise: Promise[Serial.Connection])
- extends GraphStageLogic(shape) {
- import GraphStageLogic._
- import SerialConnectionLogic._
-
- /** Receives data and writes it to the serial backend. */
- private def in: Inlet[ByteString] = shape.in
-
- /** Receives data from the serial backend and pushes it downstream. */
- private def out: Outlet[ByteString] = shape.out
-
- /** Implicit alias to stageActor so it will be used in "!" calls, without
- * explicitly specifying a sender. */
- implicit private def self = stageActor.ref
-
- /**
- * Input handler for an established connection.
- * @param operator the operator actor of the established connection
- */
- class ConnectedInHandler(operator: ActorRef) extends InHandler {
-
- override def onPush(): Unit = {
- val elem = grab(in)
- require(elem != null) // reactive streams requirement
- operator ! CoreSerial.Write(elem, _ => WriteAck)
- }
-
- override def onUpstreamFinish(): Unit = {
- if (isClosed(out)) { // close serial connection if output is also closed
- operator ! CoreSerial.Close
- }
- }
-
- }
-
- class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
- // implicit alias to stage actor, so it will be used in "!" calls
- implicit val self = stageActor.ref
-
- override def onPull(): Unit = {
- // serial connections are at the end of the "backpressure chain",
- // they do not natively support backpressure (as does TCP for example)
- // therefore nothing is done here
- }
-
- override def onDownstreamFinish(): Unit = {
- // closing downstream also closes the underlying connection
- operator ! CoreSerial.Close
- }
-
- }
-
- override def preStart(): Unit = {
- setKeepGoing(true) // serial connection operator will manage completing stage
- getStageActor(connecting)
- stageActor watch manager
- manager ! CoreSerial.Open(port, settings, bufferSize)
- }
-
- setHandler(in, IgnoreTerminateInput)
- setHandler(out, IgnoreTerminateOutput)
-
- /** Initial behavior, before a serial connection is established. */
- private def connecting(event: (ActorRef, Any)): Unit = {
- val sender = event._1
- val message = event._2
-
- message match {
-
- case Terminated(`manager`) =>
- val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.")
- failStage(ex)
- connectionPromise.failure(ex)
-
- case CoreSerial.CommandFailed(cmd, reason) =>
- val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason)
- failStage(ex)
- connectionPromise.failure(ex)
-
- case CoreSerial.Opened(port) =>
- val operator = sender
- setHandler(in, new ConnectedInHandler(operator))
- setHandler(out, new ConnectedOutHandler(operator))
- stageActor become connected(operator)
- connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value
- stageActor unwatch manager
- stageActor watch operator
- if (!isClosed(in)) {
- pull(in) // start pulling input
- }
-
- case other =>
- val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
- failStage(ex)
- connectionPromise.failure(ex)
-
- }
-
- }
-
- /** Behaviour once a connection has been established. It is assumed that operator is not null. */
- private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = {
- val sender = event._1
- val message = event._2
-
- message match {
-
- case Terminated(`operator`) =>
- failStage(new StreamSerialException("The connection actor has terminated. Stopping now."))
-
- case CoreSerial.CommandFailed(cmd, reason) =>
- failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason))
-
- case CoreSerial.Closed =>
- completeStage()
-
- case CoreSerial.Received(data) =>
- if (isAvailable(out)) {
- push(out, data)
- } else if (failOnOverflow) {
- /* Note that the native backend does not provide any way of informing about
- * dropped serial data. However, in most cases, a computer capable of running flow
- * is also capable of processing incoming serial data at typical baud rates.
- * Hence packets will usually only be dropped if an application that uses flow
- * backpressures, which can however be detected here. */
- failStage(new StreamSerialException("Incoming serial data was dropped."))
- }
-
- case WriteAck =>
- if (!isClosed(in)) {
- pull(in)
- }
-
- case other =>
- failStage(new StreamSerialException(s"Stage actor received unkown message [$other]"))
-
- }
-
- }
-
-}
-
-private[stream] object SerialConnectionLogic {
-
- case object WriteAck extends CoreSerial.Event
-
-}
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala
deleted file mode 100644
index ceeac01..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-package ch.jodersky.flow
-package stream
-package impl
-
-import scala.concurrent.{Future, Promise}
-
-import akka.actor.ActorRef
-import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
-import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
-import akka.util.ByteString
-
-/**
- * Graph stage that establishes and thereby materializes a serial connection.
- * The actual connection logic is deferred to [[SerialConnectionLogic]].
- */
-private[stream] class SerialConnectionStage(
- manager: ActorRef,
- port: String,
- settings: SerialSettings,
- failOnOverflow: Boolean,
- bufferSize: Int
-) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] {
-
- val in: Inlet[ByteString] = Inlet("Serial.in")
- val out: Outlet[ByteString] = Outlet("Serial.out")
-
- val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
-
- override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
- (GraphStageLogic, Future[Serial.Connection]) = {
-
- val connectionPromise = Promise[Serial.Connection]
-
- val logic = new SerialConnectionLogic(
- shape,
- manager,
- port,
- settings,
- failOnOverflow,
- bufferSize,
- connectionPromise
- )
-
- (logic, connectionPromise.future)
- }
-
- override def toString = s"Serial($port)"
-
-}
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
deleted file mode 100644
index 60b7c90..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-package ch.jodersky.flow
-package stream
-package impl
-
-import scala.concurrent.Promise
-
-import akka.actor.{ActorRef, Terminated}
-import akka.stream.SourceShape
-import akka.stream.stage.GraphStageLogic
-import ch.jodersky.flow.{Serial => CoreSerial}
-
-private[stream] class WatcherLogic(
- shape: SourceShape[String],
- ioManager: ActorRef,
- ports: Set[String],
- watchPromise: Promise[Serial.Watch])
- extends GraphStageLogic(shape) {
- import GraphStageLogic._
-
- implicit private def self = stageActor.ref
-
- override def preStart(): Unit = {
- getStageActor(receive)
- stageActor watch ioManager
- for (dir <- WatcherLogic.getDirs(ports)) {
- ioManager ! CoreSerial.Watch(dir, skipInitial = false)
- }
- }
-
- setHandler(shape.out, IgnoreTerminateOutput)
-
- private def receive(event: (ActorRef, Any)): Unit = {
- val sender = event._1
- val message = event._2
-
- message match {
-
- case Terminated(`ioManager`) =>
- val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.")
- failStage(ex)
- watchPromise.failure(ex)
-
- case CoreSerial.CommandFailed(cmd, reason) =>
- val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason)
- failStage(ex)
- watchPromise.failure(ex)
-
- case CoreSerial.Connected(port) =>
- if (ports contains port) {
- if (isAvailable(shape.out)) {
- push(shape.out, port)
- }
- }
-
- case other =>
- failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]"))
-
- }
- }
-
-}
-
-private[stream] object WatcherLogic {
- def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/"))
-}
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala
deleted file mode 100644
index 82fad69..0000000
--- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala
+++ /dev/null
@@ -1,38 +0,0 @@
-package ch.jodersky.flow
-package stream
-package impl
-
-import scala.concurrent.{Future, Promise}
-
-import akka.actor.ActorRef
-import akka.stream.{Attributes, Outlet, SourceShape}
-import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic}
-
-
-private[stream] class WatcherStage(
- ioManager: ActorRef,
- ports: Set[String]
-) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] {
-
- val out = Outlet[String]("Watcher.out")
-
- val shape = new SourceShape(out)
-
- override def createLogicAndMaterializedValue(attributes: Attributes):
- (GraphStageLogic, Future[Serial.Watch]) = {
-
- val promise = Promise[Serial.Watch]
-
- val logic = new WatcherLogic(
- shape,
- ioManager,
- ports,
- promise
- )
-
- (logic, promise.future)
- }
-
- override def toString = s"Watcher($ports)"
-
-}
diff --git a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala b/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala
deleted file mode 100644
index 1a1ebdc..0000000
--- a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala
+++ /dev/null
@@ -1,51 +0,0 @@
-package ch.jodersky.flow
-package stream
-
-import scala.concurrent.Await
-import scala.concurrent.duration._
-
-import akka.actor.ActorSystem
-import akka.stream.ActorMaterializer
-import akka.stream.scaladsl.{Keep, Sink, Source}
-import akka.util.ByteString
-import org.scalatest._
-
-class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal {
-
- implicit val system = ActorSystem("flow-test")
- implicit val materializer = ActorMaterializer()
-
- override def afterAll {
- system.terminate()
- }
-
- "Serial stream" should {
- val data = ByteString(("hello world").getBytes("utf-8"))
-
- "receive the same data it sends in an echo test" in {
- withEcho { case (port, settings) =>
- val graph = Source.single(data)
- .via(Serial().open(port, settings)) // send to echo pty
- .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS
- .dropWhile(_ != data)
- .toMat(Sink.head)(Keep.right)
-
- Await.result(graph.run(), 2.seconds)
- }
- }
-
- "fail if the underlying pty fails" in {
- val result = withEcho { case (port, settings) =>
- Source.single(data)
- .via(Serial().open(port, settings))
- .toMat(Sink.last)(Keep.right)
- .run()}
-
- intercept[StreamSerialException] {
- Await.result(result, 10.seconds)
- }
- }
-
- }
-
-}