diff options
4 files changed, 51 insertions, 39 deletions
diff --git a/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala index fe3ed86..411c0fd 100644 --- a/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala +++ b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala @@ -1,16 +1,16 @@ package com.github.jodersky.flow package samples.terminalstream -import akka.actor._ -import akka.stream._ -import akka.stream.stage._ -import akka.stream.scaladsl._ -import akka.util._ -import scala.concurrent._ +import scala.concurrent.Future import scala.concurrent.duration._ -import scala.util._ -import akka.Done -import stream._ +import scala.io.StdIn + +import akka.actor.ActorSystem +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Flow, Keep, Sink, Source} +import akka.util.ByteString + +import stream.Serial object Main { @@ -19,11 +19,25 @@ object Main { implicit val system = ActorSystem("terminal-stream") implicit val materializer = ActorMaterializer() + def ask(label: String, default: String) = { + print(label + " [" + default.toString + "]: ") + val in = StdIn.readLine() + println("") + if (in.isEmpty) default else in + } + def main(args: Array[String]): Unit = { import system.dispatcher + val port = ask("Device", "/dev/ttyACM0") + val baud = ask("Baud rate", "115200").toInt + val cs = ask("Char size", "8").toInt + val tsb = ask("Use two stop bits", "false").toBoolean + val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt) + val settings = SerialSettings(baud, cs, tsb, parity) + val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] = - Serial().open("/dev/ttyACM0", SerialSettings(115200)) + Serial().open(port, settings) val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data => println("server says: " + data.decodeString("UTF-8")) @@ -40,7 +54,7 @@ object Main { connection map { conn => println("Connected to " + conn.port) - readLine("Press enter to exit") + StdIn.readLine("Press enter to exit") } recover { case err => println("Cannot connect: " + err) } andThen { case _ => diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala index 26efcc9..6d250df 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala @@ -1,16 +1,14 @@ package com.github.jodersky.flow package stream -import akka.actor._ +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO import akka.stream.scaladsl.Flow -import akka.stream._ -import akka.stream.stage._ -import akka.dispatch.ExecutionContexts import akka.util.ByteString -import com.github.jodersky.flow.{Serial => CoreSerial, _} -import scala.concurrent._ -import akka.io._ +import com.github.jodersky.flow.{Serial => CoreSerial} import impl._ object Serial extends ExtensionId[Serial] with ExtensionIdProvider { diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala index 35be464..526ea40 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala @@ -2,21 +2,19 @@ package com.github.jodersky.flow package stream package impl -import akka.actor._ -import akka.stream.stage.GraphStageLogic.StageActorRef -import akka.util._ -import akka.stream._ -import akka.stream.stage._ -import scala.concurrent._ -import akka.io._ -import com.github.jodersky.flow.{Serial => CoreSerial} -import akka.stream.impl.ReactiveStreamsCompliance +import scala.concurrent.Promise +import akka.actor.{ActorRef, Terminated} +import akka.stream.{FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler} +import akka.util.ByteString + +import com.github.jodersky.flow.{Serial => CoreSerial, SerialSettings} /** * Graph logic that handles establishing and forwarding serial communication. * The underlying stream is closed when downstream (output) finishes, - * conversely upstream (input) closes are ignored. + * upstream (input) closes are ignored. */ private[stream] class SerialConnectionLogic( shape: FlowShape[ByteString, ByteString], @@ -40,8 +38,9 @@ private[stream] class SerialConnectionLogic( * explicitly specifying a sender. */ implicit private def self = stageActor.ref - /** Input handler for an established connection. - * @param operator ther operator actor of the established connection + /** + * Input handler for an established connection. + * @param operator the operator actor of the established connection */ class ConnectedInHandler(operator: ActorRef) extends InHandler { @@ -60,13 +59,13 @@ private[stream] class SerialConnectionLogic( } class ConnectedOutHandler(operator: ActorRef) extends OutHandler { - // alias stage actor as implicit to it will be used in "!" calls + // implicit alias to stage actor, so it will be used in "!" calls implicit val self = stageActor.ref override def onPull(): Unit = { // serial connections are at the end of the "backpressure chain", // they do not natively support backpressure (as does TCP for example) - // therefore nothing is done here as + // therefore nothing is done here } override def onDownstreamFinish(): Unit = { @@ -86,7 +85,7 @@ private[stream] class SerialConnectionLogic( setHandler(in, IgnoreTerminateInput) setHandler(out, IgnoreTerminateOutput) - /** Initial behaviour, before a serial connection is established. */ + /** Initial behavior, before a serial connection is established. */ private def connecting(event: (ActorRef, Any)): Unit = { val sender = event._1 val message = event._2 @@ -114,7 +113,7 @@ private[stream] class SerialConnectionLogic( pull(in) // start pulling input case other => - val ex = new StreamSerialException(s"Stage actor received unkown message [$other]") + val ex = new StreamSerialException(s"Stage actor received unknown message [$other]") failStage(ex) connectionPromise.failure(ex) diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala index 2efe3e9..bff1d26 100644 --- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala +++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala @@ -2,11 +2,12 @@ package com.github.jodersky.flow package stream package impl -import akka.actor._ -import akka.util._ -import akka.stream._ -import akka.stream.stage._ -import scala.concurrent._ +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString /** * Graph stage that establishes and thereby materializes a serial connection. |