diff options
Diffstat (limited to 'flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala')
-rw-r--r-- | flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala | 49 |
1 files changed, 49 insertions, 0 deletions
diff --git a/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala new file mode 100644 index 0000000..ceeac01 --- /dev/null +++ b/flow-stream/src/main/scala/ch/jodersky/flow/stream/impl/SerialConnectionStage.scala @@ -0,0 +1,49 @@ +package ch.jodersky.flow +package stream +package impl + +import scala.concurrent.{Future, Promise} + +import akka.actor.ActorRef +import akka.stream.{Attributes, FlowShape, Inlet, Outlet} +import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue} +import akka.util.ByteString + +/** + * Graph stage that establishes and thereby materializes a serial connection. + * The actual connection logic is deferred to [[SerialConnectionLogic]]. + */ +private[stream] class SerialConnectionStage( + manager: ActorRef, + port: String, + settings: SerialSettings, + failOnOverflow: Boolean, + bufferSize: Int +) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] { + + val in: Inlet[ByteString] = Inlet("Serial.in") + val out: Outlet[ByteString] = Outlet("Serial.out") + + val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out) + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): + (GraphStageLogic, Future[Serial.Connection]) = { + + val connectionPromise = Promise[Serial.Connection] + + val logic = new SerialConnectionLogic( + shape, + manager, + port, + settings, + failOnOverflow, + bufferSize, + connectionPromise + ) + + (logic, connectionPromise.future) + } + + override def toString = s"Serial($port)" + +} |