diff options
author | Jakob Odersky <jakob@odersky.com> | 2016-02-24 20:33:51 -0800 |
---|---|---|
committer | Jakob Odersky <jakob@odersky.com> | 2016-02-24 20:33:51 -0800 |
commit | 8186b3622ce1c9d2b50df3d264ab526dc1e61d77 (patch) | |
tree | b4446408291c9f179e1c270a561523023ac6a105 /mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala | |
parent | 6c4e8849d753734b3e50523dcdb372fbdbccc2c1 (diff) | |
parent | a41de68066007852d7d3dbf019d75b4caf7463ad (diff) | |
download | mavigator-8186b3622ce1c9d2b50df3d264ab526dc1e61d77.tar.gz mavigator-8186b3622ce1c9d2b50df3d264ab526dc1e61d77.tar.bz2 mavigator-8186b3622ce1c9d2b50df3d264ab526dc1e61d77.zip |
Merge branch 'akka-streams'
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala')
-rw-r--r-- | mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala new file mode 100644 index 0000000..58b4977 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala @@ -0,0 +1,70 @@ +package mavigator +package uav +package mock + +import scala.concurrent.duration._ + +import akka.NotUsed +import akka.stream._ +import akka.stream.Attributes._ +import akka.stream.scaladsl._ +import akka.util._ +import org.mavlink._ +import org.mavlink.messages.{Heartbeat, Message} + + +class MockConnection( + remoteSystemId: Byte, + remoteComponentId: Byte, + prescaler: Double +) { + import MockConnection._ + + private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId) + + private def delayed(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { + val dt = delaySeconds / prescaler + Flow[RandomFlightPlan].withAttributes(inputBuffer(1,1)).delay(dt.seconds).map(message) + } + + private val messages: Source[Message, NotUsed] = fromPlan(new RandomFlightPlan)( + delayed(2)(_.heartbeat), + delayed(0.2)(_.position), + delayed(0.05)(_.attitude), + delayed(0.05)(_.motors), + delayed(0.1)(_.distance) + ) + + val data: Source[ByteString, NotUsed] = messages.map{ message => + val (messageId, payload) = Message.pack(message) + val packet = assembler.assemble(messageId, payload) + ByteString(packet.toArray) + } + +} + +object MockConnection { + + final val ClockTick: FiniteDuration = 0.02.seconds + + private def fromPlan(plan: RandomFlightPlan)(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { + import GraphDSL.Implicits._ + Source.fromGraph(GraphDSL.create() { implicit b => + + val clock = Source.tick(ClockTick, ClockTick, plan) map { plan => + plan.tick(0.01) + plan + } + val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) + val merge = b.add(Merge[Message](messages.length)) + + clock ~> bcast + for (message <- messages) { + bcast ~> message ~> merge + } + + SourceShape(merge.out) + }) + } + +} |