aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/impl/SerialConnectionStage.scala
blob: aa6794345aba2181affd7a6ddb696caebc483b22 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package akka.serial
package stream
package impl

import scala.concurrent.{Future, Promise}

import akka.actor.ActorRef
import akka.stream.{Attributes, FlowShape, Inlet, Outlet}
import akka.stream.stage.{GraphStageLogic, GraphStageWithMaterializedValue}
import akka.util.ByteString

/**
  * Graph stage that establishes and thereby materializes a serial connection.
  * The actual connection logic is deferred to [[SerialConnectionLogic]].
  */
private[stream] class SerialConnectionStage(
  manager: ActorRef,
  port: String,
  settings: SerialSettings,
  failOnOverflow: Boolean,
  bufferSize: Int
) extends GraphStageWithMaterializedValue[FlowShape[ByteString, ByteString], Future[Serial.Connection]] {

  val in: Inlet[ByteString] = Inlet("Serial.in")
  val out: Outlet[ByteString] = Outlet("Serial.out")

  val shape: FlowShape[ByteString, ByteString] = FlowShape(in, out)

  override def createLogicAndMaterializedValue(inheritedAttributes: Attributes):
      (GraphStageLogic, Future[Serial.Connection]) = {

    val connectionPromise = Promise[Serial.Connection]

    val logic = new SerialConnectionLogic(
      shape,
      manager,
      port,
      settings,
      failOnOverflow,
      bufferSize,
      connectionPromise
    )

    (logic, connectionPromise.future)
  }

  override def toString = s"Serial($port)"

}