diff options
author | Jakob Odersky <jakob@odersky.com> | 2017-01-08 21:16:25 +0100 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2017-01-21 17:22:10 -0800 |
commit | 23959966760174477a6b0fcbf9dd1e8ef37c643b (patch) | |
tree | 9a0ee44eb43a8c13af57b0d06313f3aabf9e4555 /flow-stream | |
parent | 6c371ba6d69c891c1f0d6df00bb643e1d543cc9d (diff) | |
download | akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.gz akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.tar.bz2 akka-serial-23959966760174477a6b0fcbf9dd1e8ef37c643b.zip |
Rename project to akka-serial
Diffstat (limited to 'flow-stream')
9 files changed, 0 insertions, 456 deletions
diff --git a/flow-stream/build.sbt b/flow-stream/build.sbt deleted file mode 100644 index c9aa7eb..0000000 --- a/flow-stream/build.sbt +++ /dev/null @@ -1,5 +0,0 @@ -import flow.Dependencies - -libraryDependencies += Dependencies.akkaActor -libraryDependencies += Dependencies.akkaStream -libraryDependencies += Dependencies.scalatest % "test" diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala deleted file mode 100644 index d478de8..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala +++ /dev/null @@ -1,67 +0,0 @@ -package ch.jodersky.flow -package stream - -import akka.stream.scaladsl.Source -import scala.concurrent.Future - -import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} -import akka.io.IO -import akka.stream.scaladsl.Flow -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial} -import impl._ - -object Serial extends ExtensionId[Serial] with ExtensionIdProvider { - - /** - * Represents a prospective serial connection. - */ - case class Connection(port: String, settings: SerialSettings) - - case class Watch(ports: Set[String]) - - def apply()(implicit system: ActorSystem): Serial = super.apply(system) - - override def lookup() = Serial - - override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) - -} - -/** - * Entry point to streaming over serial ports. - * The design of this API is inspired by Akka's Tcp streams. - */ -class Serial(system: ExtendedActorSystem) extends Extension { - - /** - * Creates a Flow that will open a serial port when materialized. - * This Flow then represents an open serial connection: data pushed to its - * inlet will be written to the underlying serial port, and data received - * on the port will be emitted by its outlet. - * @param port name of serial port to open - * @param settings settings to use with serial port - * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped - * @param bufferSize maximum read and write buffer sizes - * @return a Flow associated to the given serial port - */ - def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): - Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( - new SerialConnectionStage( - IO(CoreSerial)(system), - port, - settings, - failOnOverflow, - bufferSize - ) - ) - - def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( - new WatcherStage( - IO(CoreSerial)(system), - ports - ) - ) - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala deleted file mode 100644 index 78438f9..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamSerialException.scala +++ /dev/null @@ -1,5 +0,0 @@ -package ch.jodersky.flow -package stream - -/** Represents a generic exception occured during streaming of serial data. */ -class StreamSerialException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala deleted file mode 100644 index 8eee61c..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/StreamWatcherException.scala +++ /dev/null @@ -1,4 +0,0 @@ -package ch.jodersky.flow -package stream - -class StreamWatcherException(message: String, cause: Throwable = null) extends RuntimeException(message, cause) diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala deleted file mode 100644 index 764b054..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionLogic.scala +++ /dev/null @@ -1,172 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.{FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} -import akka.util.ByteString - -import ch.jodersky.flow.{Serial => CoreSerial, SerialSettings} - -/** - * Graph logic that handles establishing and forwarding serial communication. - * The underlying stream is closed when downstream (output) finishes, - * upstream (input) closes are ignored. - */ -private[stream] class SerialConnectionLogic( - shape: FlowShape[ByteString, ByteString], - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int, - connectionPromise: Promise[Serial.Connection]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - import SerialConnectionLogic._ - - /** Receives data and writes it to the serial backend. */ - private def in: Inlet[ByteString] = shape.in - - /** Receives data from the serial backend and pushes it downstream. */ - private def out: Outlet[ByteString] = shape.out - - /** Implicit alias to stageActor so it will be used in "!" calls, without - * explicitly specifying a sender. */ - implicit private def self = stageActor.ref - - /** - * Input handler for an established connection. - * @param operator the operator actor of the established connection - */ - class ConnectedInHandler(operator: ActorRef) extends InHandler { - - override def onPush(): Unit = { - val elem = grab(in) - require(elem != null) // reactive streams requirement - operator ! CoreSerial.Write(elem, _ => WriteAck) - } - - override def onUpstreamFinish(): Unit = { - if (isClosed(out)) { // close serial connection if output is also closed - operator ! CoreSerial.Close - } - } - - } - - class ConnectedOutHandler(operator: ActorRef) extends OutHandler { - // implicit alias to stage actor, so it will be used in "!" calls - implicit val self = stageActor.ref - - override def onPull(): Unit = { - // serial connections are at the end of the "backpressure chain", - // they do not natively support backpressure (as does TCP for example) - // therefore nothing is done here - } - - override def onDownstreamFinish(): Unit = { - // closing downstream also closes the underlying connection - operator ! CoreSerial.Close - } - - } - - override def preStart(): Unit = { - setKeepGoing(true) // serial connection operator will manage completing stage - getStageActor(connecting) - stageActor watch manager - manager ! CoreSerial.Open(port, settings, bufferSize) - } - - setHandler(in, IgnoreTerminateInput) - setHandler(out, IgnoreTerminateOutput) - - /** Initial behavior, before a serial connection is established. */ - private def connecting(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`manager`) => - val ex = new StreamSerialException("The IO manager actor (Serial) has terminated. Stopping now.") - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamSerialException(s"Serial command [$cmd] failed", reason) - failStage(ex) - connectionPromise.failure(ex) - - case CoreSerial.Opened(port) => - val operator = sender - setHandler(in, new ConnectedInHandler(operator)) - setHandler(out, new ConnectedOutHandler(operator)) - stageActor become connected(operator) - connectionPromise.success(Serial.Connection(port, settings)) //complete materialized value - stageActor unwatch manager - stageActor watch operator - if (!isClosed(in)) { - pull(in) // start pulling input - } - - case other => - val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") - failStage(ex) - connectionPromise.failure(ex) - - } - - } - - /** Behaviour once a connection has been established. It is assumed that operator is not null. */ - private def connected(operator: ActorRef)(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`operator`) => - failStage(new StreamSerialException("The connection actor has terminated. Stopping now.")) - - case CoreSerial.CommandFailed(cmd, reason) => - failStage(new StreamSerialException(s"Serial command [$cmd] failed.", reason)) - - case CoreSerial.Closed => - completeStage() - - case CoreSerial.Received(data) => - if (isAvailable(out)) { - push(out, data) - } else if (failOnOverflow) { - /* Note that the native backend does not provide any way of informing about - * dropped serial data. However, in most cases, a computer capable of running flow - * is also capable of processing incoming serial data at typical baud rates. - * Hence packets will usually only be dropped if an application that uses flow - * backpressures, which can however be detected here. */ - failStage(new StreamSerialException("Incoming serial data was dropped.")) - } - - case WriteAck => - if (!isClosed(in)) { - pull(in) - } - - case other => - failStage(new StreamSerialException(s"Stage actor received unkown message [$other]")) - - } - - } - -} - -private[stream] object SerialConnectionLogic { - - case object WriteAck extends CoreSerial.Event - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala deleted file mode 100644 index ceeac01..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala +++ /dev/null @@ -1,49 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, FlowShape, Inlet, Outlet} -import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} -import akka.util.ByteString - -/** - * Graph stage that establishes and thereby materializes a serial connection. - * The actual connection logic is deferred to [[SerialConnectionLogic]]. - */ -private[stream] class SerialConnectionStage( - manager: ActorRef, - port: String, - settings: SerialSettings, - failOnOverflow: Boolean, - bufferSize: Int -) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { - - val in: Inlet[ByteString] = Inlet("Serial.in") - val out: Outlet[ByteString] = Outlet("Serial.out") - - val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) - - override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): - (GraphStageLogic, Future[Serial.Connection]) = { - - val connectionPromise = Promise[Serial.Connection] - - val logic = new SerialConnectionLogic( - shape, - manager, - port, - settings, - failOnOverflow, - bufferSize, - connectionPromise - ) - - (logic, connectionPromise.future) - } - - override def toString = s"Serial($port)" - -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala deleted file mode 100644 index 60b7c90..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherLogic.scala +++ /dev/null @@ -1,65 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.Promise - -import akka.actor.{ActorRef, Terminated} -import akka.stream.SourceShape -import akka.stream.stage.GraphStageLogic -import ch.jodersky.flow.{Serial => CoreSerial} - -private[stream] class WatcherLogic( - shape: SourceShape[String], - ioManager: ActorRef, - ports: Set[String], - watchPromise: Promise[Serial.Watch]) - extends GraphStageLogic(shape) { - import GraphStageLogic._ - - implicit private def self = stageActor.ref - - override def preStart(): Unit = { - getStageActor(receive) - stageActor watch ioManager - for (dir <- WatcherLogic.getDirs(ports)) { - ioManager ! CoreSerial.Watch(dir, skipInitial = false) - } - } - - setHandler(shape.out, IgnoreTerminateOutput) - - private def receive(event: (ActorRef, Any)): Unit = { - val sender = event._1 - val message = event._2 - - message match { - - case Terminated(`ioManager`) => - val ex = new StreamWatcherException("The serial IO manager has terminated. Stopping now.") - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.CommandFailed(cmd, reason) => - val ex = new StreamWatcherException(s"Serial command [$cmd] failed", reason) - failStage(ex) - watchPromise.failure(ex) - - case CoreSerial.Connected(port) => - if (ports contains port) { - if (isAvailable(shape.out)) { - push(shape.out, port) - } - } - - case other => - failStage(new StreamWatcherException(s"Stage actor received unkown message [$other]")) - - } - } - -} - -private[stream] object WatcherLogic { - def getDirs(ports: Set[String]): Set[String] = ports.map(_.split("/").init.mkString("/")) -} diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala deleted file mode 100644 index 82fad69..0000000 --- a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/WatcherStage.scala +++ /dev/null @@ -1,38 +0,0 @@ -package ch.jodersky.flow -package stream -package impl - -import scala.concurrent.{Future, Promise} - -import akka.actor.ActorRef -import akka.stream.{Attributes, Outlet, SourceShape} -import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic} - - -private[stream] class WatcherStage( - ioManager: ActorRef, - ports: Set[String] -) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] { - - val out = Outlet[String]("Watcher.out") - - val shape = new SourceShape(out) - - override def createLogicAndMaterializedValue(attributes: Attributes): - (GraphStageLogic, Future[Serial.Watch]) = { - - val promise = Promise[Serial.Watch] - - val logic = new WatcherLogic( - shape, - ioManager, - ports, - promise - ) - - (logic, promise.future) - } - - override def toString = s"Watcher($ports)" - -} diff --git a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala b/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala deleted file mode 100644 index 1a1ebdc..0000000 --- a/flow-stream/src/test/scala/ch/jodersky/flow/stream/SerialSpec.scala +++ /dev/null @@ -1,51 +0,0 @@ -package ch.jodersky.flow -package stream - -import scala.concurrent.Await -import scala.concurrent.duration._ - -import akka.actor.ActorSystem -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{Keep, Sink, Source} -import akka.util.ByteString -import org.scalatest._ - -class SerialSpec extends WordSpec with BeforeAndAfterAll with PseudoTerminal { - - implicit val system = ActorSystem("flow-test") - implicit val materializer = ActorMaterializer() - - override def afterAll { - system.terminate() - } - - "Serial stream" should { - val data = ByteString(("hello world").getBytes("utf-8")) - - "receive the same data it sends in an echo test" in { - withEcho { case (port, settings) => - val graph = Source.single(data) - .via(Serial().open(port, settings)) // send to echo pty - .scan(ByteString.empty)(_ ++ _) // received elements could potentially be split by OS - .dropWhile(_ != data) - .toMat(Sink.head)(Keep.right) - - Await.result(graph.run(), 2.seconds) - } - } - - "fail if the underlying pty fails" in { - val result = withEcho { case (port, settings) => - Source.single(data) - .via(Serial().open(port, settings)) - .toMat(Sink.last)(Keep.right) - .run()} - - intercept[StreamSerialException] { - Await.result(result, 10.seconds) - } - } - - } - -} |