diff options
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/Core.scala')
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/Core.scala | 60 |
1 files changed, 57 insertions, 3 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala index 4418dae..02a8779 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala @@ -5,9 +5,12 @@ import akka.NotUsed import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props} import akka.stream.Materializer import akka.stream.actor.{ActorPublisher, ActorPublisherMessage} -import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.stream.scaladsl.{ Flow, RunnableGraph, Sink, Source } import akka.util.ByteString +import org.mavlink.Assembler +import org.mavlink.messages._ import org.reactivestreams.{Publisher, Subscriber} +import scala.concurrent.duration._ /** A core enables dynamic creation and removal of clients and backend connections. * It is essentially a dynamic stream multiplexer. */ @@ -31,8 +34,18 @@ class Core(implicit val system: ActorSystem, val materializer: Materializer) { Source.asSubscriber[ByteString] )((toClient, fromClient) => (toClient, fromClient)) - private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) => - toClient + private lazy val runnable: RunnableGraph[Publisher[ByteString]] = { + val timer = Source.tick(2.seconds, 2.seconds, ()) + val generator: Source[ByteString, _] = timer.flatMapConcat{ _ => + Util.barrelRoll via Util.assembler + } + + val merged: Flow[ByteString, ByteString, _] = Util.merge(generator, backend) + + merged.joinMat(clients){case (_, (toClient, _)) => + toClient + } + } private lazy val clientPublisher: Publisher[ByteString] = { @@ -55,6 +68,47 @@ class Core(implicit val system: ActorSystem, val materializer: Materializer) { } +object Util { + import akka.stream.scaladsl._ + import akka.stream.FlowShape + + + def merge[A](preferred: Source[A, _], other: Flow[A, A, _]): Flow[A, A, _] = { + val graph = GraphDSL.create(preferred, other)((_, _)) { implicit builder => + (pref, flow) => + + import GraphDSL.Implicits._ + + val merge = builder.add(MergePreferred[A](1)) + + pref.out ~> merge.preferred + flow.out ~> merge.in(0) + + FlowShape(flow.in, merge.out) + } + Flow.fromGraph(graph) + } + + def assembler: Flow[Message, ByteString, _] = Flow[Message].map{ msg => + val as = new Assembler(1, 1) + val (id, payload) = Message.pack(msg) + val bytes = as.assemble(id, payload).toArray + ByteString(bytes) + } + + def barrelRoll(): Source[Message, _] = { + val angle: Source[Float, _] = + Source.tick(10.millis, 10.millis, 0.1f).scan(0.0f)(_+_).takeWhile(_ < 2 * math.Pi) + val attitude = angle.map{ a => + Attitude(0,a,0,0,0,0,0) + } + + Source.single(Stability(0)) concat attitude concat Source.single(Stability(1)) + } + +} + + object Core { private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging { |