From 5bfde985382d0cc664a9c9d60f203df9f64485c7 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Sat, 6 Feb 2016 17:03:52 -0800 Subject: Refactor message flows --- .../src/main/scala/mavigator/uav/Uav.scala | 1 - .../scala/mavigator/uav/mock/MockConnection.scala | 71 +++++++++++----------- .../mavigator/uav/mock/RandomFlightPlan.scala | 2 +- 3 files changed, 38 insertions(+), 36 deletions(-) diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala index 37f94b6..ed677ea 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -12,7 +12,6 @@ class Uav(system: ExtendedActorSystem) extends Extension { private lazy val config = system.settings.config.getConfig("mavigator.uav") def connect(): Flow[ByteString, ByteString, NotUsed] = { - val t = scala.concurrent.duration.FiniteDuration(100, "ms") Flow.fromSinkAndSource( Sink.ignore, (new MockConnection(0,0,1)).data diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala index 22adf0e..c7bc8d4 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala @@ -2,7 +2,8 @@ package mavigator package uav package mock -import org.mavlink.messages.Message +import org.mavlink.enums._ +import org.mavlink.messages.{Heartbeat, Message} import akka.stream.scaladsl._ import scala.concurrent.duration._ import scala.util.Random @@ -12,6 +13,9 @@ import akka.util._ import org.mavlink._ import Attributes._ + +//case class Heartbeat(`type`: Byte, autopilot: Byte, baseMode: Byte, customMode: Int, systemStatus: Byte, mavlinkVersion: Byte) extends Message + class MockConnection( remoteSystemId: Byte, remoteComponentId: Byte, @@ -19,40 +23,17 @@ class MockConnection( ) { import MockConnection._ - private def stream(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { + private def delayed(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { val dt = delaySeconds / prescaler - Flow[RandomFlightPlan].delay(dt.seconds).withAttributes(inputBuffer(1,1)).map(message) - } - - def foo(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { - import GraphDSL.Implicits._ - - Source.fromGraph(GraphDSL.create() { implicit b => - - val plan = new RandomFlightPlan - - //graph components - val clock = Source.tick(1.seconds, 0.01.seconds, plan).map{plan => - plan.tick(0.01) - plan - } - val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) - val merge = b.add(Merge[Message](messages.length)) - - clock ~> bcast - for (message <- messages) { - bcast ~> message ~> merge - } - - SourceShape(merge.out) - }) + Flow[RandomFlightPlan].withAttributes(inputBuffer(1,1)).delay(dt.seconds).map(message) } - val messages: Source[Message, _] = foo( - stream(0.2)(_.position), - stream(0.05)(_.attitude), - stream(0.05)(_.motors), - stream(0.1)(_.distance) + val messages: Source[Message, _] = streamFromPlan(new RandomFlightPlan)( + delayed(2)(_.heartbeat), + delayed(0.2)(_.position), + delayed(0.05)(_.attitude), + delayed(0.05)(_.motors), + delayed(0.1)(_.distance) ) @@ -66,7 +47,7 @@ class MockConnection( } val data = messages.map{ msg => - println(msg) + if (msg.isInstanceOf[Heartbeat]){println(msg)} assemble(msg) } @@ -74,6 +55,28 @@ class MockConnection( object MockConnection { - final val T0 = 1.seconds + final val ClockTick: FiniteDuration = 0.01.seconds + + private def streamFromPlan(plan: RandomFlightPlan)(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { + + import GraphDSL.Implicits._ + + Source.fromGraph(GraphDSL.create() { implicit b => + + val clock = Source.tick(ClockTick, ClockTick, plan) map { plan => + plan.tick(0.01) + plan + } + val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) + val merge = b.add(Merge[Message](messages.length)) + + clock ~> bcast + for (message <- messages) { + bcast ~> message ~> merge + } + + SourceShape(merge.out) + }) + } } diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala index f24f090..89e5a75 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala @@ -64,7 +64,7 @@ class RandomFlightPlan { 0 ) - def heartbeat = Heartbeat( + def heartbeat = Heartbeat( MavType.MavTypeGeneric.toByte, MavAutopilot.MavAutopilotGeneric.toByte, (MavModeFlag.MavModeFlagSafetyArmed | MavModeFlag.MavModeFlagManualInputEnabled).toByte, -- cgit v1.2.3