aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
diff options
context:
space:
mode:
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/Core.scala')
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Core.scala100
1 files changed, 100 insertions, 0 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
new file mode 100644
index 0000000..4418dae
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
@@ -0,0 +1,100 @@
+package mavigator
+package uav
+
+import akka.NotUsed
+import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
+import akka.stream.Materializer
+import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.util.ByteString
+import org.reactivestreams.{Publisher, Subscriber}
+
+/** A core enables dynamic creation and removal of clients and backend connections.
+ * It is essentially a dynamic stream multiplexer. */
+class Core(implicit val system: ActorSystem, val materializer: Materializer) {
+ import Core._
+
+ /** The actor responsible for forwarding messages from the backend
+ * to this core. */
+ private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher())
+
+ /** Publisher that forwards messages received from the backend. */
+ private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor)
+
+ private lazy val backend = Flow.fromSinkAndSourceMat(
+ Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser
+ Source.fromPublisher(backendPublisher)
+ )((toBackend, fromBackend) => (toBackend, fromBackend))
+
+ private lazy val clients = Flow.fromSinkAndSourceMat(
+ Sink.asPublisher[ByteString](true),
+ Source.asSubscriber[ByteString]
+ )((toClient, fromClient) => (toClient, fromClient))
+
+ private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) =>
+ toClient
+ }
+
+ private lazy val clientPublisher: Publisher[ByteString] = {
+ system.log.info("Core started.")
+ runnable.run()
+ }
+
+ def setBackend(): Flow[ByteString, ByteString, NotUsed] = {
+ val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete)
+ val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser
+ Flow.fromSinkAndSource(sink, source)
+ }
+
+ def connect(): Flow[ByteString, ByteString, NotUsed] = {
+ Flow.fromSinkAndSource(
+ Sink.ignore,
+ Source.fromPublisher(clientPublisher)
+ )
+ }
+
+}
+
+object Core {
+
+ private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {
+ import akka.stream.actor.ActorPublisherMessage._
+
+ override def preStart() = {
+ log.info("Starting backend publisher actor...")
+ }
+
+ private var fromBackend: ByteString = null
+
+ def receive = {
+
+ case msg: ByteString =>
+ fromBackend = msg
+ deliver()
+
+ case BackendPublisher.BackendComplete =>
+ sys.error("Backend completed normally, this should not happen.")
+
+ // subscriber requests more
+ case ActorPublisherMessage.Request(_) =>
+ deliver()
+
+ //subscriber cancels
+ case ActorPublisherMessage.Cancel =>
+ sys.error("Subscriber cancelled backend stream, this should not happen.")
+
+ }
+
+ def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) {
+ onNext(fromBackend)
+ fromBackend = null
+ }
+
+ }
+
+ private object BackendPublisher {
+ case object BackendComplete
+ def apply(): Props = Props(classOf[BackendPublisher])
+ }
+
+}