aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
diff options
context:
space:
mode:
Diffstat (limited to 'mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala')
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala114
1 files changed, 59 insertions, 55 deletions
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
index f0ad972..94c1d40 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
@@ -2,78 +2,82 @@ package mavigator
package uav
package mock
-import java.util.concurrent.TimeUnit.MILLISECONDS
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Random
-import org.mavlink.Packet
-import org.mavlink.enums.MavAutopilot
-import org.mavlink.enums.MavModeFlag
-import org.mavlink.enums.MavState
-import org.mavlink.enums.MavType
-import org.mavlink.messages.Heartbeat
-import Connection.Received
-import akka.actor.Actor
-import akka.actor.ActorLogging
-import akka.actor.Props
-import akka.util.ByteString
-import scala.concurrent.duration._
import org.mavlink.messages.Message
-import mock.RandomFlightPlan
+import akka.stream.scaladsl._
+import scala.concurrent.duration._
+import scala.util.Random
+import akka.stream._
+import akka.NotUsed
+import akka.util._
+import org.mavlink._
class MockConnection(
- localSystemId: Byte,
- localComponentId: Byte,
remoteSystemId: Byte,
- prescaler: Int
-)
- extends Actor with ActorLogging with Connection with MavlinkUtil {
+ remoteComponentId: Byte,
+ prescaler: Double
+) {
+ import MockConnection._
- import context._
+ private def stream(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = {
+ val dt = delaySeconds / prescaler
+ Flow[RandomFlightPlan].throttle(
+ elements = 1,
+ per = dt.seconds,
+ maximumBurst = 1,
+ ThrottleMode.Shaping
+ ).map(message)
+ }
- override val systemId = remoteSystemId
- override val componentId = remoteSystemId
+ def foo(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = {
+ import GraphDSL.Implicits._
- val plan = new RandomFlightPlan
+ Source.fromGraph(GraphDSL.create() { implicit b =>
- def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(assemble(fct)))
- }
- def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(ByteString(fct)))
+ val plan = new RandomFlightPlan
+
+ //graph components
+ val clock = Source.tick(1.seconds, 0.01.seconds, plan).map{plan =>
+ plan.tick(0.01)
+ plan
+ }
+ val bcast = b.add(Broadcast[RandomFlightPlan](messages.length))
+ val merge = b.add(Merge[Message](messages.length))
+
+ clock ~> bcast
+ for (message <- messages) {
+ bcast ~> message ~> merge
+ }
+
+ SourceShape(merge.out)
+ })
}
- override def preStart() = {
- //increment state
- system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) }
+ val messages: Source[Message, _] = foo(
+ stream(0.2)(_.position),
+ stream(0.05)(_.attitude),
+ stream(0.05)(_.motors),
+ stream(0.1)(_.distance)
+ )
- //send messages
- scheduleMessage(0.1.seconds * prescaler)(plan.position)
- scheduleMessage(0.05.seconds * prescaler)(plan.attitude)
- scheduleMessage(0.05.seconds * prescaler)(plan.motors)
- scheduleMessage(0.1.seconds * prescaler)(plan.distance)
- scheduleMessage(1.seconds)(plan.heartbeat)
- //simulate noisy line
- scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc)
- scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow)
+ private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId)
+
+ /** Assembles a message into a bytestring representing a packet sent from this connection. */
+ def assemble(message: Message): ByteString = {
+ val (messageId, payload) = Message.pack(message)
+ val packet: Packet = assembler.assemble(messageId, payload)
+ ByteString(packet.toArray)
}
- override def receive = handleRegistration
+ val data = messages.map{ msg =>
+ println(msg)
+ assemble(msg)
+ }
}
object MockConnection {
- def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) =
- Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler)
-}
-object MockPackets {
- val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte)
- val invalidOverflow = {
- val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42)
- data(0) = -2
- data(1) = 2
- data(1) = -1
- data
- }
+ final val T0 = 1.seconds
+
}