diff options
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/Core.scala')
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Core.scala | 100 |
1 files changed, 100 insertions, 0 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala new file mode 100644 index 0000000..4418dae --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala @@ -0,0 +1,100 @@ +package mavigator +package uav + +import akka.NotUsed +import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props} +import akka.stream.Materializer +import akka.stream.actor.{ActorPublisher, ActorPublisherMessage} +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.util.ByteString +import org.reactivestreams.{Publisher, Subscriber} + +/** A core enables dynamic creation and removal of clients and backend connections. + * It is essentially a dynamic stream multiplexer. */ +class Core(implicit val system: ActorSystem, val materializer: Materializer) { + import Core._ + + /** The actor responsible for forwarding messages from the backend + * to this core. */ + private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher()) + + /** Publisher that forwards messages received from the backend. */ + private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor) + + private lazy val backend = Flow.fromSinkAndSourceMat( + Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser + Source.fromPublisher(backendPublisher) + )((toBackend, fromBackend) => (toBackend, fromBackend)) + + private lazy val clients = Flow.fromSinkAndSourceMat( + Sink.asPublisher[ByteString](true), + Source.asSubscriber[ByteString] + )((toClient, fromClient) => (toClient, fromClient)) + + private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) => + toClient + } + + private lazy val clientPublisher: Publisher[ByteString] = { + system.log.info("Core started.") + runnable.run() + } + + def setBackend(): Flow[ByteString, ByteString, NotUsed] = { + val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete) + val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser + Flow.fromSinkAndSource(sink, source) + } + + def connect(): Flow[ByteString, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + Sink.ignore, + Source.fromPublisher(clientPublisher) + ) + } + +} + +object Core { + + private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging { + import akka.stream.actor.ActorPublisherMessage._ + + override def preStart() = { + log.info("Starting backend publisher actor...") + } + + private var fromBackend: ByteString = null + + def receive = { + + case msg: ByteString => + fromBackend = msg + deliver() + + case BackendPublisher.BackendComplete => + sys.error("Backend completed normally, this should not happen.") + + // subscriber requests more + case ActorPublisherMessage.Request(_) => + deliver() + + //subscriber cancels + case ActorPublisherMessage.Cancel => + sys.error("Subscriber cancelled backend stream, this should not happen.") + + } + + def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) { + onNext(fromBackend) + fromBackend = null + } + + } + + private object BackendPublisher { + case object BackendComplete + def apply(): Props = Props(classOf[BackendPublisher]) + } + +} |