aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-02-06 17:03:52 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-06 17:03:52 -0800
commit5bfde985382d0cc664a9c9d60f203df9f64485c7 (patch)
tree4e5af45801c3e57f3142b26770701a7449570a9c
parentda2840940d8db1b0338f01c77b1a521bbb728e20 (diff)
downloadmavigator-5bfde985382d0cc664a9c9d60f203df9f64485c7.tar.gz
mavigator-5bfde985382d0cc664a9c9d60f203df9f64485c7.tar.bz2
mavigator-5bfde985382d0cc664a9c9d60f203df9f64485c7.zip
Refactor message flows
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Uav.scala1
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala71
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala2
3 files changed, 38 insertions, 36 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
index 37f94b6..ed677ea 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
@@ -12,7 +12,6 @@ class Uav(system: ExtendedActorSystem) extends Extension {
private lazy val config = system.settings.config.getConfig("mavigator.uav")
def connect(): Flow[ByteString, ByteString, NotUsed] = {
- val t = scala.concurrent.duration.FiniteDuration(100, "ms")
Flow.fromSinkAndSource(
Sink.ignore,
(new MockConnection(0,0,1)).data
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
index 22adf0e..c7bc8d4 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
@@ -2,7 +2,8 @@ package mavigator
package uav
package mock
-import org.mavlink.messages.Message
+import org.mavlink.enums._
+import org.mavlink.messages.{Heartbeat, Message}
import akka.stream.scaladsl._
import scala.concurrent.duration._
import scala.util.Random
@@ -12,6 +13,9 @@ import akka.util._
import org.mavlink._
import Attributes._
+
+//case class Heartbeat(`type`: Byte, autopilot: Byte, baseMode: Byte, customMode: Int, systemStatus: Byte, mavlinkVersion: Byte) extends Message
+
class MockConnection(
remoteSystemId: Byte,
remoteComponentId: Byte,
@@ -19,40 +23,17 @@ class MockConnection(
) {
import MockConnection._
- private def stream(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = {
+ private def delayed(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = {
val dt = delaySeconds / prescaler
- Flow[RandomFlightPlan].delay(dt.seconds).withAttributes(inputBuffer(1,1)).map(message)
- }
-
- def foo(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = {
- import GraphDSL.Implicits._
-
- Source.fromGraph(GraphDSL.create() { implicit b =>
-
- val plan = new RandomFlightPlan
-
- //graph components
- val clock = Source.tick(1.seconds, 0.01.seconds, plan).map{plan =>
- plan.tick(0.01)
- plan
- }
- val bcast = b.add(Broadcast[RandomFlightPlan](messages.length))
- val merge = b.add(Merge[Message](messages.length))
-
- clock ~> bcast
- for (message <- messages) {
- bcast ~> message ~> merge
- }
-
- SourceShape(merge.out)
- })
+ Flow[RandomFlightPlan].withAttributes(inputBuffer(1,1)).delay(dt.seconds).map(message)
}
- val messages: Source[Message, _] = foo(
- stream(0.2)(_.position),
- stream(0.05)(_.attitude),
- stream(0.05)(_.motors),
- stream(0.1)(_.distance)
+ val messages: Source[Message, _] = streamFromPlan(new RandomFlightPlan)(
+ delayed(2)(_.heartbeat),
+ delayed(0.2)(_.position),
+ delayed(0.05)(_.attitude),
+ delayed(0.05)(_.motors),
+ delayed(0.1)(_.distance)
)
@@ -66,7 +47,7 @@ class MockConnection(
}
val data = messages.map{ msg =>
- println(msg)
+ if (msg.isInstanceOf[Heartbeat]){println(msg)}
assemble(msg)
}
@@ -74,6 +55,28 @@ class MockConnection(
object MockConnection {
- final val T0 = 1.seconds
+ final val ClockTick: FiniteDuration = 0.01.seconds
+
+ private def streamFromPlan(plan: RandomFlightPlan)(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = {
+
+ import GraphDSL.Implicits._
+
+ Source.fromGraph(GraphDSL.create() { implicit b =>
+
+ val clock = Source.tick(ClockTick, ClockTick, plan) map { plan =>
+ plan.tick(0.01)
+ plan
+ }
+ val bcast = b.add(Broadcast[RandomFlightPlan](messages.length))
+ val merge = b.add(Merge[Message](messages.length))
+
+ clock ~> bcast
+ for (message <- messages) {
+ bcast ~> message ~> merge
+ }
+
+ SourceShape(merge.out)
+ })
+ }
}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
index f24f090..89e5a75 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
@@ -64,7 +64,7 @@ class RandomFlightPlan {
0
)
- def heartbeat = Heartbeat(
+ def heartbeat = Heartbeat(
MavType.MavTypeGeneric.toByte,
MavAutopilot.MavAutopilotGeneric.toByte,
(MavModeFlag.MavModeFlagSafetyArmed | MavModeFlag.MavModeFlagManualInputEnabled).toByte,