aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/Serial.scala
diff options
context:
space:
mode:
Diffstat (limited to 'stream/src/main/scala/akka/serial/stream/Serial.scala')
-rw-r--r--stream/src/main/scala/akka/serial/stream/Serial.scala68
1 files changed, 68 insertions, 0 deletions
diff --git a/stream/src/main/scala/akka/serial/stream/Serial.scala b/stream/src/main/scala/akka/serial/stream/Serial.scala
new file mode 100644
index 0000000..785001f
--- /dev/null
+++ b/stream/src/main/scala/akka/serial/stream/Serial.scala
@@ -0,0 +1,68 @@
+package akka.serial
+package stream
+
+import akka.stream.scaladsl.Source
+import scala.concurrent.Future
+
+import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider}
+import akka.io.IO
+import akka.stream.scaladsl.Flow
+import akka.util.ByteString
+
+import akka.serial.{Serial => CoreSerial}
+import impl._
+
+
+object Serial extends ExtensionId[Serial] with ExtensionIdProvider {
+
+ /**
+ * Represents a prospective serial connection.
+ */
+ case class Connection(port: String, settings: SerialSettings)
+
+ case class Watch(ports: Set[String])
+
+ def apply()(implicit system: ActorSystem): Serial = super.apply(system)
+
+ override def lookup() = Serial
+
+ override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system)
+
+}
+
+/**
+ * Entry point to streaming over serial ports.
+ * The design of this API is inspired by Akka's Tcp streams.
+ */
+class Serial(system: ExtendedActorSystem) extends Extension {
+
+ /**
+ * Creates a Flow that will open a serial port when materialized.
+ * This Flow then represents an open serial connection: data pushed to its
+ * inlet will be written to the underlying serial port, and data received
+ * on the port will be emitted by its outlet.
+ * @param port name of serial port to open
+ * @param settings settings to use with serial port
+ * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped
+ * @param bufferSize maximum read and write buffer sizes
+ * @return a Flow associated to the given serial port
+ */
+ def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024):
+ Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph(
+ new SerialConnectionStage(
+ IO(CoreSerial)(system),
+ port,
+ settings,
+ failOnOverflow,
+ bufferSize
+ )
+ )
+
+ def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph(
+ new WatcherStage(
+ IO(CoreSerial)(system),
+ ports
+ )
+ )
+
+}