aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-03-11 22:37:57 -0800
committerJakob Odersky <jakob@odersky.com>2016-03-11 22:37:57 -0800
commitf514d9d013b8059adeb4e326845f0b2b5e93ab02 (patch)
tree0f046791437f1c92df7ce3bb65f923286bf8be08
parent0d319d22fc77df39563b743829d969f161d40e53 (diff)
downloadakka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.tar.gz
akka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.tar.bz2
akka-serial-f514d9d013b8059adeb4e326845f0b2b5e93ab02.zip
Clean up streaming code
Closes #30
-rw-r--r--flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala36
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala12
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala31
-rw-r--r--flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala11
4 files changed, 51 insertions, 39 deletions
diff --git a/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala
index fe3ed86..411c0fd 100644
--- a/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala
+++ b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala
@@ -1,16 +1,16 @@
package com.github.jodersky.flow
package samples.terminalstream
-import akka.actor._
-import akka.stream._
-import akka.stream.stage._
-import akka.stream.scaladsl._
-import akka.util._
-import scala.concurrent._
+import scala.concurrent.Future
import scala.concurrent.duration._
-import scala.util._
-import akka.Done
-import stream._
+import scala.io.StdIn
+
+import akka.actor.ActorSystem
+import akka.stream.ActorMaterializer
+import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
+import akka.util.ByteString
+
+import stream.Serial
object Main {
@@ -19,11 +19,25 @@ object Main {
implicit val system = ActorSystem("terminal-stream")
implicit val materializer = ActorMaterializer()
+ def ask(label: String, default: String) = {
+ print(label + " [" + default.toString + "]: ")
+ val in = StdIn.readLine()
+ println("")
+ if (in.isEmpty) default else in
+ }
+
def main(args: Array[String]): Unit = {
import system.dispatcher
+ val port = ask("Device", "/dev/ttyACM0")
+ val baud = ask("Baud rate", "115200").toInt
+ val cs = ask("Char size", "8").toInt
+ val tsb = ask("Use two stop bits", "false").toBoolean
+ val parity = Parity(ask("Parity (0=None, 1=Odd, 2=Even)", "0").toInt)
+ val settings = SerialSettings(baud, cs, tsb, parity)
+
val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] =
- Serial().open("/dev/ttyACM0", SerialSettings(115200))
+ Serial().open(port, settings)
val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data =>
println("server says: " + data.decodeString("UTF-8"))
@@ -40,7 +54,7 @@ object Main {
connection map { conn =>
println("Connected to " + conn.port)
- readLine("Press enter to exit")
+ StdIn.readLine("Press enter to exit")
} recover { case err =>
println("Cannot connect: " + err)
} andThen { case _ =>
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
index 26efcc9..6d250df 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/Serial.scala
@@ -1,16 +1,14 @@
package com.github.jodersky.flow
package stream
-import akka.actor._
+import scala.concurrent.Future
+
+import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
+import akka.io.IO
import akka.stream.scaladsl.Flow
-import akka.stream._
-import akka.stream.stage._
-import akka.dispatch.ExecutionContexts
import akka.util.ByteString
-import com.github.jodersky.flow.{Serial => CoreSerial, _}
-import scala.concurrent._
-import akka.io._
+import com.github.jodersky.flow.{Serial => CoreSerial}
import impl._
object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
index 35be464..526ea40 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionLogic.scala
@@ -2,21 +2,19 @@ package com.github.jodersky.flow
package stream
package impl
-import akka.actor._
-import akka.stream.stage.GraphStageLogic.StageActorRef
-import akka.util._
-import akka.stream._
-import akka.stream.stage._
-import scala.concurrent._
-import akka.io._
-import com.github.jodersky.flow.{Serial => CoreSerial}
-import akka.stream.impl.ReactiveStreamsCompliance
+import scala.concurrent.Promise
+import akka.actor.{ActorRef, Terminated}
+import akka.stream.{FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler}
+import akka.util.ByteString
+
+import com.github.jodersky.flow.{Serial => CoreSerial, SerialSettings}
/**
* Graph logic that handles establishing and forwarding serial communication.
* The underlying stream is closed when downstream (output) finishes,
- * conversely upstream (input) closes are ignored.
+ * upstream (input) closes are ignored.
*/
private[stream] class SerialConnectionLogic(
shape: FlowShape[ByteString, ByteString],
@@ -40,8 +38,9 @@ private[stream] class SerialConnectionLogic(
* explicitly specifying a sender. */
implicit private def self = stageActor.ref
- /** Input handler for an established connection.
- * @param operator ther operator actor of the established connection
+ /**
+ * Input handler for an established connection.
+ * @param operator the operator actor of the established connection
*/
class ConnectedInHandler(operator: ActorRef) extends InHandler {
@@ -60,13 +59,13 @@ private[stream] class SerialConnectionLogic(
}
class ConnectedOutHandler(operator: ActorRef) extends OutHandler {
- // alias stage actor as implicit to it will be used in "!" calls
+ // implicit alias to stage actor, so it will be used in "!" calls
implicit val self = stageActor.ref
override def onPull(): Unit = {
// serial connections are at the end of the "backpressure chain",
// they do not natively support backpressure (as does TCP for example)
- // therefore nothing is done here as
+ // therefore nothing is done here
}
override def onDownstreamFinish(): Unit = {
@@ -86,7 +85,7 @@ private[stream] class SerialConnectionLogic(
setHandler(in, IgnoreTerminateInput)
setHandler(out, IgnoreTerminateOutput)
- /** Initial behaviour, before a serial connection is established. */
+ /** Initial behavior, before a serial connection is established. */
private def connecting(event: (ActorRef, Any)): Unit = {
val sender = event._1
val message = event._2
@@ -114,7 +113,7 @@ private[stream] class SerialConnectionLogic(
pull(in) // start pulling input
case other =>
- val ex = new StreamSerialException(s"Stage actor received unkown message [$other]")
+ val ex = new StreamSerialException(s"Stage actor received unknown message [$other]")
failStage(ex)
connectionPromise.failure(ex)
diff --git a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
index 2efe3e9..bff1d26 100644
--- a/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
+++ b/flow-stream/src/main/scala/com/github/jodersky/flow/stream/impl/SerialConnectionStage.scala
@@ -2,11 +2,12 @@ package com.github.jodersky.flow
package stream
package impl
-import akka.actor._
-import akka.util._
-import akka.stream._
-import akka.stream.stage._
-import scala.concurrent._
+import scala.concurrent.{Future, Promise}
+
+import akka.actor.ActorRef
+import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
+import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
+import akka.util.ByteString
/**
* Graph stage that establishes and thereby materializes a serial connection.