aboutsummaryrefslogtreecommitdiff
path: root/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala
diff options
context:
space:
mode:
Diffstat (limited to 'flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala')
-rw-r--r--flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala67
1 files changed, 67 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala
new file mode 100644
index 0000000..d478de8
--- /dev/null
+++ b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala
@@ -0,0 +1,67 @@
+package ch.jodersky.flow
+package stream
+
+import akka.stream.scaladsl.Source
+import scala.concurrent.Future
+
+import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
+import akka.io.IO
+import akka.stream.scaladsl.Flow
+import akka.util.ByteString
+
+import ch.jodersky.flow.{Serial => CoreSerial}
+import impl._
+
+object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
+
+ /**
+ * Represents a prospective serial connection.
+ */
+ case class Connection(port: String, settings: SerialSettings)
+
+ case class Watch(ports: Set[String])
+
+ def apply()(implicit system: ActorSystem): Serial = super.apply(system)
+
+ override def lookup() = Serial
+
+ override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)
+
+}
+
+/**
+ * Entry point to streaming over serial ports.
+ * The design of this API is inspired by Akka's Tcp streams.
+ */
+class Serial(system: ExtendedActorSystem) extends Extension {
+
+ /**
+ * Creates a Flow that will open a serial port when materialized.
+ * This Flow then represents an open serial connection: data pushed to its
+ * inlet will be written to the underlying serial port, and data received
+ * on the port will be emitted by its outlet.
+ * @param port name of serial port to open
+ * @param settings settings to use with serial port
+ * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
+ * @param bufferSize maximum read and write buffer sizes
+ * @return a Flow associated to the given serial port
+ */
+ def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
+ Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
+ new SerialConnectionStage(
+ IO(CoreSerial)(system),
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize
+ )
+ )
+
+ def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
+ new WatcherStage(
+ IO(CoreSerial)(system),
+ ports
+ )
+ )
+
+}