diff options
Diffstat (limited to 'flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala')
-rw-r--r-- | flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala new file mode 100644 index 0000000..d478de8 --- /dev/null +++ b/flow-stream/src/main/scala/ch/jodersky/flow/stream/Serial.scala @@ -0,0 +1,67 @@ +package ch.jodersky.flow +package stream + +import akka.stream.scaladsl.Source +import scala.concurrent.Future + +import akka.actor.{Extension, ActorSystem, ExtendedActorSystem, ExtensionId, ExtensionIdProvider} +import akka.io.IO +import akka.stream.scaladsl.Flow +import akka.util.ByteString + +import ch.jodersky.flow.{Serial => CoreSerial} +import impl._ + +object Serial extends ExtensionId[Serial] with ExtensionIdProvider { + + /** + * Represents a prospective serial connection. + */ + case class Connection(port: String, settings: SerialSettings) + + case class Watch(ports: Set[String]) + + def apply()(implicit system: ActorSystem): Serial = super.apply(system) + + override def lookup() = Serial + + override def createExtension(system: ExtendedActorSystem): Serial = new Serial(system) + +} + +/** + * Entry point to streaming over serial ports. + * The design of this API is inspired by Akka's Tcp streams. + */ +class Serial(system: ExtendedActorSystem) extends Extension { + + /** + * Creates a Flow that will open a serial port when materialized. + * This Flow then represents an open serial connection: data pushed to its + * inlet will be written to the underlying serial port, and data received + * on the port will be emitted by its outlet. + * @param port name of serial port to open + * @param settings settings to use with serial port + * @param failOnOverflow when set, the returned Flow will fail when incoming data is dropped + * @param bufferSize maximum read and write buffer sizes + * @return a Flow associated to the given serial port + */ + def open(port: String, settings: SerialSettings, failOnOverflow: Boolean = false, bufferSize: Int = 1024): + Flow[ByteString, ByteString, Future[Serial.Connection]] = Flow.fromGraph( + new SerialConnectionStage( + IO(CoreSerial)(system), + port, + settings, + failOnOverflow, + bufferSize + ) + ) + + def watch(ports: Set[String]): Source[String, Future[Serial.Watch]] = Source.fromGraph( + new WatcherStage( + IO(CoreSerial)(system), + ports + ) + ) + +} |