aboutsummaryrefslogtreecommitdiff
path: root/flow-samples
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-01-24 20:21:17 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-03 20:46:28 -0800
commitf865a76c2f441f619b069505b73fcbd1cba1a67c (patch)
tree3f53c519f4575037bdebf8c8399ca25d50649543 /flow-samples
parent46c30908f827e27b58166f56efa4f15917c1af4f (diff)
downloadakka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.gz
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.tar.bz2
akka-serial-f865a76c2f441f619b069505b73fcbd1cba1a67c.zip
Add support for Akka streams
Diffstat (limited to 'flow-samples')
-rw-r--r--flow-samples/terminal-stream/build.sbt3
-rw-r--r--flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala53
2 files changed, 56 insertions, 0 deletions
diff --git a/flow-samples/terminal-stream/build.sbt b/flow-samples/terminal-stream/build.sbt
new file mode 100644
index 0000000..5fbdd89
--- /dev/null
+++ b/flow-samples/terminal-stream/build.sbt
@@ -0,0 +1,3 @@
+import flow.{FlowBuild}
+
+FlowBuild.commonSettings
diff --git a/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala
new file mode 100644
index 0000000..ae1239c
--- /dev/null
+++ b/flow-samples/terminal-stream/src/main/scala/com/github/jodersky/flow/samples/terminalstream/Main.scala
@@ -0,0 +1,53 @@
+package com.github.jodersky.flow
+package samples.terminalstream
+
+import akka.actor._
+import akka.stream._
+import akka.stream.stage._
+import akka.stream.scaladsl._
+import akka.util._
+import akka.stream.io._
+import scala.concurrent._
+import scala.concurrent.duration._
+import scala.util._
+import akka.Done
+import stream._
+
+object Main {
+
+ final val Delay = FiniteDuration(500, MILLISECONDS)
+
+ implicit val system = ActorSystem("terminal-stream")
+ implicit val materializer = ActorMaterializer()
+
+ def main(args: Array[String]): Unit = {
+ import system.dispatcher
+
+ val serial: Flow[ByteString, ByteString, Future[Serial.Connection]] =
+ Serial().open("/dev/ttyACM0", SerialSettings(115200))
+
+ val printer: Sink[ByteString, _] = Sink.foreach[ByteString]{data =>
+ println("server says: " + data.decodeString("UTF-8"))
+ }
+
+ val ticker: Source[ByteString, _] = Source.tick(Delay, Delay, ()).scan(0){case (x, _) =>
+ x + 1
+ }.map{ x =>
+ println(x)
+ ByteString(x.toString)
+ }
+
+ val connection: Future[Serial.Connection] = ticker.viaMat(serial)(Keep.right).to(printer).run()
+
+ connection map { conn =>
+ println("Connected to " + conn.port)
+ readLine("Press enter to exit")
+ } recover { case err =>
+ println("Cannot connect: " + err)
+ } andThen { case _ =>
+ system.terminate()
+ }
+
+ }
+
+}