blob: 5e48ea17056de2b8c6309d0bce0389126c7d6f0e (
plain) (
blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
package mavigator
package uav
import akka.stream.scaladsl._
import akka.stream._
import org.reactivestreams._
private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) {
private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat(
Sink.asPublisher[Out](fanout = true),
Source.asSubscriber[In])((pub, sub) => (pub, sub))
private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run()
def connect(client: Flow[Out, In, _]) = {
Source.fromPublisher(publisher).via(client).to(Sink.ignore).run()
}
}
|