1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
|
package ch.jodersky.flow
package stream
package impl
import scala.concurrent.{Future, Promise}
import akka.actor.ActorRef
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
import akka.util.ByteString
/**
* Graph stage that establishes and thereby materializes a serial connection.
* The actual connection logic is deferred to [[SerialConnectionLogic]].
*/
private[stream] class SerialConnectionStage(
manager: ActorRef,
port: String,
settings: SerialSettings,
failOnOverflow: Boolean,
bufferSize: Int
) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] {
val in: Inlet[ByteString] = Inlet("Serial.in")
val out: Outlet[ByteString] = Outlet("Serial.out")
val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
(GraphStageLogic, Future[Serial.Connection]) = {
val connectionPromise = Promise[Serial.Connection]
val logic = new SerialConnectionLogic(
shape,
manager,
port,
settings,
failOnOverflow,
bufferSize,
connectionPromise
)
(logic, connectionPromise.future)
}
override def toString = s"Serial($port)"
}
|