From fce963dab70b163a79cb52dff4bac3dfd32c5f37 Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Thu, 4 Feb 2016 22:57:01 -0800 Subject: move mock connection to stream (performance is really bad) --- .../scala/mavigator/dashboard/MavlinkSocket.scala | 7 +- .../src/main/resources/application.conf | 41 +------ .../src/main/scala/mavigator/Router.scala | 5 +- mavigator-uav/src/main/resources/application.conf | 37 ++++++ .../src/main/scala/mavigator/uav/Connection.scala | 73 ----------- .../main/scala/mavigator/uav/MockConnection.scala | 77 ------------ .../scala/mavigator/uav/SerialConnection.scala | 136 --------------------- .../src/main/scala/mavigator/uav/Uav.scala | 3 +- .../scala/mavigator/uav/mock/MockConnection.scala | 114 ++++++++--------- .../mavigator/uav/mock/RandomFlightPlan.scala | 2 +- 10 files changed, 105 insertions(+), 390 deletions(-) create mode 100644 mavigator-uav/src/main/resources/application.conf delete mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Connection.scala delete mode 100644 mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala delete mode 100644 mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala diff --git a/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala b/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala index add1da8..7f4ffdc 100644 --- a/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala +++ b/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala @@ -48,11 +48,10 @@ class MavlinkSocket(url: String, val remoteSystemId: Int) { case OverflowError => stats._overflows() += 1 }) -// private val connection = new dom.WebSocket(url) + private val connection = new dom.WebSocket(url) - //connection.binaryType = "arraybuffer" + connection.binaryType = "arraybuffer" - /* connection.onopen = (e: dom.Event) => { stats.open() = true } @@ -67,5 +66,5 @@ class MavlinkSocket(url: String, val remoteSystemId: Int) { connection.onclose = (e: dom.CloseEvent) => { stats.open() = false } - */ + } diff --git a/mavigator-server/src/main/resources/application.conf b/mavigator-server/src/main/resources/application.conf index 28511ad..85d609b 100644 --- a/mavigator-server/src/main/resources/application.conf +++ b/mavigator-server/src/main/resources/application.conf @@ -1,6 +1,6 @@ mavigator { - # Interface to listen for client connections + # Interface to listen on for client connections interface = "0.0.0.0" # Port on which to listen for client connections @@ -9,43 +9,4 @@ mavigator { # Mavlink system ID identifying this base station system_id = 1 - # Settings related to the connection with a UAV - connection { - - # The type of connection to use - # 'mock' or 'serial' - type = mock - - # Mavlink component ID used by this connection, - # in case it needs to inject messages. I.e. heartbeats - # will originate from this ID. - component_id = 1 - - # Interval in milliseconds between heartbeat messages injected by - # the connection - # 0 = no heartbeats injected - heartbeat = 2000 - - # Settings related to serial connections - serial { - # Serial port - port = "/dev/ttyUSB0" - # Baud rate (b/s) - baud = 115200 - # Use two stop bits - two_stop_bits = false - # Parity check - # 0 = None, 1 = Odd, 2 = Even - parity = 0 - } - - # Settings related to mock connections - mock { - # Mavlink system ID of the simulated UAV - remote_system_id = 42 - # Speed multiplier of simulation - prescaler = 1 - } - } - } \ No newline at end of file diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala index 1d2107a..6d442bf 100644 --- a/mavigator-server/src/main/scala/mavigator/Router.scala +++ b/mavigator-server/src/main/scala/mavigator/Router.scala @@ -31,7 +31,7 @@ object Router { path("info") { get { val f: Html = mavigator.views.html.dashboard( - "socket", + "ws://localhost:8080/mavlink", 0, 0, 0 @@ -57,8 +57,7 @@ object Router { } val toWebSocket = Flow[ByteString].map{bytes => - //BinaryMessage(bytes) - TextMessage(bytes.decodeString("UTF-8")) + BinaryMessage(bytes) } val bytes = Uav().connect() diff --git a/mavigator-uav/src/main/resources/application.conf b/mavigator-uav/src/main/resources/application.conf new file mode 100644 index 0000000..f468111 --- /dev/null +++ b/mavigator-uav/src/main/resources/application.conf @@ -0,0 +1,37 @@ +# Settings related to the connection with a UAV +mavigator.uav { + # The type of connection to use + # 'mock' or 'serial' + type = mock + + # Mavlink component ID used by this connection, + # in case it needs to inject messages. I.e. heartbeats + # will originate from this ID. + component_id = 1 + + # Interval in milliseconds between heartbeat messages injected by + # the connection + # 0 = no heartbeats injected + heartbeat = 2000 + + # Settings related to serial connections + serial { + # Serial port + port = "/dev/ttyUSB0" + # Baud rate (b/s) + baud = 115200 + # Use two stop bits + two_stop_bits = false + # Parity check + # 0 = None, 1 = Odd, 2 = Even + parity = 0 + } + + # Settings related to mock connections + mock { + # Mavlink system ID of the simulated UAV + remote_system_id = 42 + # Delay between simulated messages + frequency = 50 + } +} \ No newline at end of file diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala b/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala deleted file mode 100644 index 7399657..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala +++ /dev/null @@ -1,73 +0,0 @@ -package mavigator.uav - -import akka.actor.ActorLogging -import scala.collection.mutable.ArrayBuffer - -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Terminated -import akka.actor.actorRef2Scala -import akka.util.ByteString - -/** Protocol definition. */ -object Connection { - - /** Any messages received and transmitted by actors implementing this protocol. */ - sealed trait Message - - /** Messages emitted by actors implementing this protocol. */ - trait Event extends Message - - /** Received data from the uav (or any other systems on the link) */ - case class Received(bstr: ByteString) extends Event - - /** The connection closed or could not be opened */ - case class Closed(message: String) extends Event - - /** Messages that can be received by actors implementing this protocol. */ - trait Command extends Message - - /** Register the sender to be notified on events */ - case object Register extends Command - - case class Unregister(client: ActorRef) extends Command - - /** Send given bytes out to the uav (or any other systems on the link) */ - case class Send(bstr: ByteString) extends Command - -} - -/** Common behavior of connection actors. */ -trait Connection { myself: Actor with ActorLogging => - - private val _clients = new ArrayBuffer[ActorRef] - - /** Current clients that should be notified on incoming messages. */ - def clients = _clients.toSeq - - /** Adds a client to the client list and acquires a deathwatch. */ - protected def register(client: ActorRef): Unit = { - _clients += client - myself.context.watch(client) - myself.log.info("Client registered {}", client) - } - - /** Remove client and release deathwatch. */ - protected def unregister(client: ActorRef): Unit = if (clients contains client) { - _clients -= client - myself.context.unwatch(client) - myself.log.info("Client unregistered {}", client) - } - - /** Sends a message to all registered clients. */ - protected def sendAll(msg: Any): Unit = clients foreach (_ ! msg) - - /** Common registration behavior. Manages the messages `Register` and `Terminated` by - * registering and unregistering clients. */ - protected def handleRegistration: Receive = { - case Connection.Register => register(sender) - case Terminated(client) => unregister(client) - case other => myself.log.warning("Unknown message: {}", other) - } - -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala deleted file mode 100644 index 5949a3a..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala +++ /dev/null @@ -1,77 +0,0 @@ -package mavigator.uav - -import java.util.concurrent.TimeUnit.MILLISECONDS -import scala.concurrent.duration.FiniteDuration -import scala.util.Random -import org.mavlink.Packet -import org.mavlink.enums.MavAutopilot -import org.mavlink.enums.MavModeFlag -import org.mavlink.enums.MavState -import org.mavlink.enums.MavType -import org.mavlink.messages.Heartbeat -import Connection.Received -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.Props -import akka.util.ByteString -import scala.concurrent.duration._ -import org.mavlink.messages.Message -import mock.RandomFlightPlan - -class MockConnection( - localSystemId: Byte, - localComponentId: Byte, - remoteSystemId: Byte, - prescaler: Int -) - extends Actor with ActorLogging with Connection with MavlinkUtil { - - import context._ - - override val systemId = remoteSystemId - override val componentId = remoteSystemId - - val plan = new RandomFlightPlan - - def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) { - sendAll(Received(assemble(fct))) - } - def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) { - sendAll(Received(ByteString(fct))) - } - - override def preStart() = { - //increment state - system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) } - - //send messages - scheduleMessage(0.1.seconds * prescaler)(plan.position) - scheduleMessage(0.05.seconds * prescaler)(plan.attitude) - scheduleMessage(0.05.seconds * prescaler)(plan.motors) - scheduleMessage(0.1.seconds * prescaler)(plan.distance) - scheduleMessage(1.seconds)(plan.heartbeat) - - //simulate noisy line - scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc) - scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow) - } - - override def receive = handleRegistration - -} - -object MockConnection { - def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) = - Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler) -} - -object MockPackets { - val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte) - val invalidOverflow = { - val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42) - data(0) = -2 - data(1) = 2 - data(1) = -1 - data - } -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala deleted file mode 100644 index 24c70a1..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala +++ /dev/null @@ -1,136 +0,0 @@ -package mavigator.uav - -import java.util.concurrent.TimeUnit.MILLISECONDS - -import scala.concurrent.duration.FiniteDuration - -import org.mavlink.enums.MavAutopilot -import org.mavlink.enums.MavModeFlag -import org.mavlink.enums.MavState -import org.mavlink.enums.MavType -import org.mavlink.messages.Heartbeat - -import com.github.jodersky.flow.Parity -import com.github.jodersky.flow.Serial -import com.github.jodersky.flow.SerialSettings - -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.ActorRef -import akka.actor.Props -import akka.actor.Terminated -import akka.actor.actorRef2Scala -import akka.io.IO - -class SerialConnection( - val systemId: Byte, - val componentId: Byte, - heartbeatInterval: Option[FiniteDuration], - port: String, - settings: SerialSettings) extends Actor with ActorLogging with Connection with MavlinkUtil { - - import context._ - - override def preStart() = heartbeatInterval foreach { interval => - context.system.scheduler.schedule(interval, interval) { - self ! Connection.Send( - assemble( - Heartbeat( - MavType.MavTypeGeneric.toByte, - MavAutopilot.MavAutopilotGeneric.toByte, - 0, //no base mode - 0, //no custom mode - MavState.MavStateActive.toByte, - 0 //TODO properly implement read-only fields - ) - ) - ) - } - } - - def _closed: Receive = { - - case Connection.Register => - register(sender) - IO(Serial) ! Serial.Open(port, settings) - context become opening - - case Connection.Send(_) => - IO(Serial) ! Serial.Open(port, settings) - context become opening - - } - - def _opening: Receive = { - - case Serial.CommandFailed(cmd: Serial.Open, reason) => - sendAll(Connection.Closed(reason.toString)) - context become closed - - case Serial.Opened(_) => - context watch (sender) - context become opened(sender) - - case Connection.Send(_) => () // ignore - /* - * During opening, any outgoing messages are discarded. - * By using some kind of message stashing, maybe messages could be treated - * once the port has been opened. However, in such a case failure also needs - * to be considered, thus complicating the protocol. Since opening is typically - * quite fast and since mavlink uses heartbeats and acknowledgements (in certain - * circumstances) anyway, keeping messages is not really required. - */ - - } - - def _opened(operator: ActorRef): Receive = { - - case Terminated(`operator`) => - sendAll(Connection.Closed("Serial connection crashed.")) - context become closed - - case Serial.Closed => - sendAll(Connection.Closed("Serial connection was closed.")) - context become closed - - case Serial.Received(bstr) => - sendAll(Connection.Received(bstr)) - incoming.push(bstr) - - case Connection.Send(bstr) => - outgoing.push(bstr) - //no sending is currently enabled - - } - - def closed = _closed orElse handleRegistration - def opening = _opening orElse handleRegistration - def opened(op: ActorRef) = _opened(op) orElse handleRegistration - override def receive = closed - -} - -object SerialConnection { - def apply( - systemId: Byte, - componentId: Byte, - heartbeatInterval: Int, - port: String, - baud: Int, - tsb: Boolean, - parity: Int): Props = { - - val settings = SerialSettings( - baud, - 8, - tsb, - parity match { - case 0 => Parity.None - case 1 => Parity.Odd - case 2 => Parity.Even - }) - val hb = if (heartbeatInterval == 0) None else Some(FiniteDuration(heartbeatInterval, MILLISECONDS)) - - Props(classOf[SerialConnection], systemId, componentId, hb, port, settings) - } -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala index a1ef333..37f94b6 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -1,6 +1,7 @@ package mavigator package uav +import mock._ import akka._ import akka.actor._ import akka.util._ @@ -14,7 +15,7 @@ class Uav(system: ExtendedActorSystem) extends Extension { val t = scala.concurrent.duration.FiniteDuration(100, "ms") Flow.fromSinkAndSource( Sink.ignore, - Source.tick(t,t, ByteString("hello world")) + (new MockConnection(0,0,1)).data ) } diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala index f0ad972..94c1d40 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala @@ -2,78 +2,82 @@ package mavigator package uav package mock -import java.util.concurrent.TimeUnit.MILLISECONDS -import scala.concurrent.duration.FiniteDuration -import scala.util.Random -import org.mavlink.Packet -import org.mavlink.enums.MavAutopilot -import org.mavlink.enums.MavModeFlag -import org.mavlink.enums.MavState -import org.mavlink.enums.MavType -import org.mavlink.messages.Heartbeat -import Connection.Received -import akka.actor.Actor -import akka.actor.ActorLogging -import akka.actor.Props -import akka.util.ByteString -import scala.concurrent.duration._ import org.mavlink.messages.Message -import mock.RandomFlightPlan +import akka.stream.scaladsl._ +import scala.concurrent.duration._ +import scala.util.Random +import akka.stream._ +import akka.NotUsed +import akka.util._ +import org.mavlink._ class MockConnection( - localSystemId: Byte, - localComponentId: Byte, remoteSystemId: Byte, - prescaler: Int -) - extends Actor with ActorLogging with Connection with MavlinkUtil { + remoteComponentId: Byte, + prescaler: Double +) { + import MockConnection._ - import context._ + private def stream(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { + val dt = delaySeconds / prescaler + Flow[RandomFlightPlan].throttle( + elements = 1, + per = dt.seconds, + maximumBurst = 1, + ThrottleMode.Shaping + ).map(message) + } - override val systemId = remoteSystemId - override val componentId = remoteSystemId + def foo(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { + import GraphDSL.Implicits._ - val plan = new RandomFlightPlan + Source.fromGraph(GraphDSL.create() { implicit b => - def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) { - sendAll(Received(assemble(fct))) - } - def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) { - sendAll(Received(ByteString(fct))) + val plan = new RandomFlightPlan + + //graph components + val clock = Source.tick(1.seconds, 0.01.seconds, plan).map{plan => + plan.tick(0.01) + plan + } + val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) + val merge = b.add(Merge[Message](messages.length)) + + clock ~> bcast + for (message <- messages) { + bcast ~> message ~> merge + } + + SourceShape(merge.out) + }) } - override def preStart() = { - //increment state - system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) } + val messages: Source[Message, _] = foo( + stream(0.2)(_.position), + stream(0.05)(_.attitude), + stream(0.05)(_.motors), + stream(0.1)(_.distance) + ) - //send messages - scheduleMessage(0.1.seconds * prescaler)(plan.position) - scheduleMessage(0.05.seconds * prescaler)(plan.attitude) - scheduleMessage(0.05.seconds * prescaler)(plan.motors) - scheduleMessage(0.1.seconds * prescaler)(plan.distance) - scheduleMessage(1.seconds)(plan.heartbeat) - //simulate noisy line - scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc) - scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow) + private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId) + + /** Assembles a message into a bytestring representing a packet sent from this connection. */ + def assemble(message: Message): ByteString = { + val (messageId, payload) = Message.pack(message) + val packet: Packet = assembler.assemble(messageId, payload) + ByteString(packet.toArray) } - override def receive = handleRegistration + val data = messages.map{ msg => + println(msg) + assemble(msg) + } } object MockConnection { - def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) = - Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler) -} -object MockPackets { - val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte) - val invalidOverflow = { - val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42) - data(0) = -2 - data(1) = 2 - data(1) = -1 - data - } + final val T0 = 1.seconds + } diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala index 4a6520f..f24f090 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala @@ -8,7 +8,7 @@ import org.mavlink.enums._ import org.mavlink.messages._ class RandomFlightPlan { - + private var time: Double = 0 //current time in seconds private def millis = (time * 1000).toInt private def micros = (time * 1E6).toInt -- cgit v1.2.3