aboutsummaryrefslogtreecommitdiff
path: root/stream/src/main/scala/akka/serial/stream/impl/WatcherStage.scala
blob: 649249f225365629bcf58ca154009544580a06a2 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package akka.serial
package stream
package impl

import scala.concurrent.{Future, Promise}

import akka.actor.ActorRef
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic}


private[stream] class WatcherStage(
  ioManager: ActorRef,
  ports: Set[String]
) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] {

  val out = Outlet[String]("Watcher.out")

  val shape = new SourceShape(out)

  override def createLogicAndMaterializedValue(attributes: Attributes):
      (GraphStageLogic, Future[Serial.Watch]) = {

    val promise = Promise[Serial.Watch]

    val logic = new WatcherLogic(
      shape,
      ioManager,
      ports,
      promise
    )

    (logic, promise.future)
  }

  override def toString = s"Watcher($ports)"

}