aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala
blob: 5e48ea17056de2b8c6309d0bce0389126c7d6f0e (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package mavigator
package uav

import akka.stream.scaladsl._
import akka.stream._
import org.reactivestreams._

private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) {

  private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat(
    Sink.asPublisher[Out](fanout = true),
    Source.asSubscriber[In])((pub, sub) => (pub, sub))

  private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run()

  def connect(client: Flow[Out, In, _]) = {
    Source.fromPublisher(publisher).via(client).to(Sink.ignore).run()
  }

}