blob: 649249f225365629bcf58ca154009544580a06a2 (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package akka.serial
package stream
package impl
import scala.concurrent.{Future, Promise}
import akka.actor.ActorRef
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{GraphStageWithMaterializedValue, GraphStageLogic}
private[stream] class WatcherStage(
ioManager: ActorRef,
ports: Set[String]
) extends GraphStageWithMaterializedValue[SourceShape[String], Future[Serial.Watch]] {
val out = Outlet[String]("Watcher.out")
val shape = new SourceShape(out)
override def createLogicAndMaterializedValue(attributes: Attributes):
(GraphStageLogic, Future[Serial.Watch]) = {
val promise = Promise[Serial.Watch]
val logic = new WatcherLogic(
shape,
ioManager,
ports,
promise
)
(logic, promise.future)
}
override def toString = s"Watcher($ports)"
}
|