aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
diff options
context:
space:
mode:
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/Core.scala')
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Core.scala60
1 files changed, 57 insertions, 3 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
index 4418dae..02a8779 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
@@ -5,9 +5,12 @@ import akka.NotUsed
import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
import akka.stream.Materializer
import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
-import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.stream.scaladsl.{ Flow, RunnableGraph, Sink, Source }
import akka.util.ByteString
+import org.mavlink.Assembler
+import org.mavlink.messages._
import org.reactivestreams.{Publisher, Subscriber}
+import scala.concurrent.duration._
/** A core enables dynamic creation and removal of clients and backend connections.
* It is essentially a dynamic stream multiplexer. */
@@ -31,8 +34,18 @@ class Core(implicit val system: ActorSystem, val materializer: Materializer) {
Source.asSubscriber[ByteString]
)((toClient, fromClient) => (toClient, fromClient))
- private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) =>
- toClient
+ private lazy val runnable: RunnableGraph[Publisher[ByteString]] = {
+ val timer = Source.tick(2.seconds, 2.seconds, ())
+ val generator: Source[ByteString, _] = timer.flatMapConcat{ _ =>
+ Util.barrelRoll via Util.assembler
+ }
+
+ val merged: Flow[ByteString, ByteString, _] = Util.merge(generator, backend)
+
+ merged.joinMat(clients){case (_, (toClient, _)) =>
+ toClient
+ }
+
}
private lazy val clientPublisher: Publisher[ByteString] = {
@@ -55,6 +68,47 @@ class Core(implicit val system: ActorSystem, val materializer: Materializer) {
}
+object Util {
+ import akka.stream.scaladsl._
+ import akka.stream.FlowShape
+
+
+ def merge[A](preferred: Source[A, _], other: Flow[A, A, _]): Flow[A, A, _] = {
+ val graph = GraphDSL.create(preferred, other)((_, _)) { implicit builder =>
+ (pref, flow) =>
+
+ import GraphDSL.Implicits._
+
+ val merge = builder.add(MergePreferred[A](1))
+
+ pref.out ~> merge.preferred
+ flow.out ~> merge.in(0)
+
+ FlowShape(flow.in, merge.out)
+ }
+ Flow.fromGraph(graph)
+ }
+
+ def assembler: Flow[Message, ByteString, _] = Flow[Message].map{ msg =>
+ val as = new Assembler(1, 1)
+ val (id, payload) = Message.pack(msg)
+ val bytes = as.assemble(id, payload).toArray
+ ByteString(bytes)
+ }
+
+ def barrelRoll(): Source[Message, _] = {
+ val angle: Source[Float, _] =
+ Source.tick(10.millis, 10.millis, 0.1f).scan(0.0f)(_+_).takeWhile(_ < 2 * math.Pi)
+ val attitude = angle.map{ a =>
+ Attitude(0,a,0,0,0,0,0)
+ }
+
+ Source.single(Stability(0)) concat attitude concat Source.single(Stability(1))
+ }
+
+}
+
+
object Core {
private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {