aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Odersky <jakob@odersky.com>2016-02-04 22:57:01 -0800
committerJakob Odersky <jakob@odersky.com>2016-02-04 22:57:01 -0800
commitfce963dab70b163a79cb52dff4bac3dfd32c5f37 (patch)
treed28c88ce9f5432e5ed01407689153c297ea72be0
parentb57a8c710d94ab6ce35fbbb0b9fc00649792e4be (diff)
downloadmavigator-fce963dab70b163a79cb52dff4bac3dfd32c5f37.tar.gz
mavigator-fce963dab70b163a79cb52dff4bac3dfd32c5f37.tar.bz2
mavigator-fce963dab70b163a79cb52dff4bac3dfd32c5f37.zip
move mock connection to stream (performance is really bad)
-rw-r--r--mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala7
-rw-r--r--mavigator-server/src/main/resources/application.conf41
-rw-r--r--mavigator-server/src/main/scala/mavigator/Router.scala5
-rw-r--r--mavigator-uav/src/main/resources/application.conf37
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Connection.scala73
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala77
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala136
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Uav.scala3
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala114
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala2
10 files changed, 105 insertions, 390 deletions
diff --git a/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala b/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala
index add1da8..7f4ffdc 100644
--- a/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala
+++ b/mavigator-dashboard/src/main/scala/mavigator/dashboard/MavlinkSocket.scala
@@ -48,11 +48,10 @@ class MavlinkSocket(url: String, val remoteSystemId: Int) {
case OverflowError => stats._overflows() += 1
})
-// private val connection = new dom.WebSocket(url)
+ private val connection = new dom.WebSocket(url)
- //connection.binaryType = "arraybuffer"
+ connection.binaryType = "arraybuffer"
- /*
connection.onopen = (e: dom.Event) => {
stats.open() = true
}
@@ -67,5 +66,5 @@ class MavlinkSocket(url: String, val remoteSystemId: Int) {
connection.onclose = (e: dom.CloseEvent) => {
stats.open() = false
}
- */
+
}
diff --git a/mavigator-server/src/main/resources/application.conf b/mavigator-server/src/main/resources/application.conf
index 28511ad..85d609b 100644
--- a/mavigator-server/src/main/resources/application.conf
+++ b/mavigator-server/src/main/resources/application.conf
@@ -1,6 +1,6 @@
mavigator {
- # Interface to listen for client connections
+ # Interface to listen on for client connections
interface = "0.0.0.0"
# Port on which to listen for client connections
@@ -9,43 +9,4 @@ mavigator {
# Mavlink system ID identifying this base station
system_id = 1
- # Settings related to the connection with a UAV
- connection {
-
- # The type of connection to use
- # 'mock' or 'serial'
- type = mock
-
- # Mavlink component ID used by this connection,
- # in case it needs to inject messages. I.e. heartbeats
- # will originate from this ID.
- component_id = 1
-
- # Interval in milliseconds between heartbeat messages injected by
- # the connection
- # 0 = no heartbeats injected
- heartbeat = 2000
-
- # Settings related to serial connections
- serial {
- # Serial port
- port = "/dev/ttyUSB0"
- # Baud rate (b/s)
- baud = 115200
- # Use two stop bits
- two_stop_bits = false
- # Parity check
- # 0 = None, 1 = Odd, 2 = Even
- parity = 0
- }
-
- # Settings related to mock connections
- mock {
- # Mavlink system ID of the simulated UAV
- remote_system_id = 42
- # Speed multiplier of simulation
- prescaler = 1
- }
- }
-
} \ No newline at end of file
diff --git a/mavigator-server/src/main/scala/mavigator/Router.scala b/mavigator-server/src/main/scala/mavigator/Router.scala
index 1d2107a..6d442bf 100644
--- a/mavigator-server/src/main/scala/mavigator/Router.scala
+++ b/mavigator-server/src/main/scala/mavigator/Router.scala
@@ -31,7 +31,7 @@ object Router {
path("info") {
get {
val f: Html = mavigator.views.html.dashboard(
- "socket",
+ "ws://localhost:8080/mavlink",
0,
0,
0
@@ -57,8 +57,7 @@ object Router {
}
val toWebSocket = Flow[ByteString].map{bytes =>
- //BinaryMessage(bytes)
- TextMessage(bytes.decodeString("UTF-8"))
+ BinaryMessage(bytes)
}
val bytes = Uav().connect()
diff --git a/mavigator-uav/src/main/resources/application.conf b/mavigator-uav/src/main/resources/application.conf
new file mode 100644
index 0000000..f468111
--- /dev/null
+++ b/mavigator-uav/src/main/resources/application.conf
@@ -0,0 +1,37 @@
+# Settings related to the connection with a UAV
+mavigator.uav {
+ # The type of connection to use
+ # 'mock' or 'serial'
+ type = mock
+
+ # Mavlink component ID used by this connection,
+ # in case it needs to inject messages. I.e. heartbeats
+ # will originate from this ID.
+ component_id = 1
+
+ # Interval in milliseconds between heartbeat messages injected by
+ # the connection
+ # 0 = no heartbeats injected
+ heartbeat = 2000
+
+ # Settings related to serial connections
+ serial {
+ # Serial port
+ port = "/dev/ttyUSB0"
+ # Baud rate (b/s)
+ baud = 115200
+ # Use two stop bits
+ two_stop_bits = false
+ # Parity check
+ # 0 = None, 1 = Odd, 2 = Even
+ parity = 0
+ }
+
+ # Settings related to mock connections
+ mock {
+ # Mavlink system ID of the simulated UAV
+ remote_system_id = 42
+ # Delay between simulated messages
+ frequency = 50
+ }
+} \ No newline at end of file
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala b/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala
deleted file mode 100644
index 7399657..0000000
--- a/mavigator-uav/src/main/scala/mavigator/uav/Connection.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-package mavigator.uav
-
-import akka.actor.ActorLogging
-import scala.collection.mutable.ArrayBuffer
-
-import akka.actor.Actor
-import akka.actor.ActorRef
-import akka.actor.Terminated
-import akka.actor.actorRef2Scala
-import akka.util.ByteString
-
-/** Protocol definition. */
-object Connection {
-
- /** Any messages received and transmitted by actors implementing this protocol. */
- sealed trait Message
-
- /** Messages emitted by actors implementing this protocol. */
- trait Event extends Message
-
- /** Received data from the uav (or any other systems on the link) */
- case class Received(bstr: ByteString) extends Event
-
- /** The connection closed or could not be opened */
- case class Closed(message: String) extends Event
-
- /** Messages that can be received by actors implementing this protocol. */
- trait Command extends Message
-
- /** Register the sender to be notified on events */
- case object Register extends Command
-
- case class Unregister(client: ActorRef) extends Command
-
- /** Send given bytes out to the uav (or any other systems on the link) */
- case class Send(bstr: ByteString) extends Command
-
-}
-
-/** Common behavior of connection actors. */
-trait Connection { myself: Actor with ActorLogging =>
-
- private val _clients = new ArrayBuffer[ActorRef]
-
- /** Current clients that should be notified on incoming messages. */
- def clients = _clients.toSeq
-
- /** Adds a client to the client list and acquires a deathwatch. */
- protected def register(client: ActorRef): Unit = {
- _clients += client
- myself.context.watch(client)
- myself.log.info("Client registered {}", client)
- }
-
- /** Remove client and release deathwatch. */
- protected def unregister(client: ActorRef): Unit = if (clients contains client) {
- _clients -= client
- myself.context.unwatch(client)
- myself.log.info("Client unregistered {}", client)
- }
-
- /** Sends a message to all registered clients. */
- protected def sendAll(msg: Any): Unit = clients foreach (_ ! msg)
-
- /** Common registration behavior. Manages the messages `Register` and `Terminated` by
- * registering and unregistering clients. */
- protected def handleRegistration: Receive = {
- case Connection.Register => register(sender)
- case Terminated(client) => unregister(client)
- case other => myself.log.warning("Unknown message: {}", other)
- }
-
-}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala
deleted file mode 100644
index 5949a3a..0000000
--- a/mavigator-uav/src/main/scala/mavigator/uav/MockConnection.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-package mavigator.uav
-
-import java.util.concurrent.TimeUnit.MILLISECONDS
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Random
-import org.mavlink.Packet
-import org.mavlink.enums.MavAutopilot
-import org.mavlink.enums.MavModeFlag
-import org.mavlink.enums.MavState
-import org.mavlink.enums.MavType
-import org.mavlink.messages.Heartbeat
-import Connection.Received
-import akka.actor.Actor
-import akka.actor.ActorLogging
-import akka.actor.Props
-import akka.util.ByteString
-import scala.concurrent.duration._
-import org.mavlink.messages.Message
-import mock.RandomFlightPlan
-
-class MockConnection(
- localSystemId: Byte,
- localComponentId: Byte,
- remoteSystemId: Byte,
- prescaler: Int
-)
- extends Actor with ActorLogging with Connection with MavlinkUtil {
-
- import context._
-
- override val systemId = remoteSystemId
- override val componentId = remoteSystemId
-
- val plan = new RandomFlightPlan
-
- def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(assemble(fct)))
- }
- def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(ByteString(fct)))
- }
-
- override def preStart() = {
- //increment state
- system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) }
-
- //send messages
- scheduleMessage(0.1.seconds * prescaler)(plan.position)
- scheduleMessage(0.05.seconds * prescaler)(plan.attitude)
- scheduleMessage(0.05.seconds * prescaler)(plan.motors)
- scheduleMessage(0.1.seconds * prescaler)(plan.distance)
- scheduleMessage(1.seconds)(plan.heartbeat)
-
- //simulate noisy line
- scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc)
- scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow)
- }
-
- override def receive = handleRegistration
-
-}
-
-object MockConnection {
- def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) =
- Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler)
-}
-
-object MockPackets {
- val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte)
- val invalidOverflow = {
- val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42)
- data(0) = -2
- data(1) = 2
- data(1) = -1
- data
- }
-}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala
deleted file mode 100644
index 24c70a1..0000000
--- a/mavigator-uav/src/main/scala/mavigator/uav/SerialConnection.scala
+++ /dev/null
@@ -1,136 +0,0 @@
-package mavigator.uav
-
-import java.util.concurrent.TimeUnit.MILLISECONDS
-
-import scala.concurrent.duration.FiniteDuration
-
-import org.mavlink.enums.MavAutopilot
-import org.mavlink.enums.MavModeFlag
-import org.mavlink.enums.MavState
-import org.mavlink.enums.MavType
-import org.mavlink.messages.Heartbeat
-
-import com.github.jodersky.flow.Parity
-import com.github.jodersky.flow.Serial
-import com.github.jodersky.flow.SerialSettings
-
-import akka.actor.Actor
-import akka.actor.ActorLogging
-import akka.actor.ActorRef
-import akka.actor.Props
-import akka.actor.Terminated
-import akka.actor.actorRef2Scala
-import akka.io.IO
-
-class SerialConnection(
- val systemId: Byte,
- val componentId: Byte,
- heartbeatInterval: Option[FiniteDuration],
- port: String,
- settings: SerialSettings) extends Actor with ActorLogging with Connection with MavlinkUtil {
-
- import context._
-
- override def preStart() = heartbeatInterval foreach { interval =>
- context.system.scheduler.schedule(interval, interval) {
- self ! Connection.Send(
- assemble(
- Heartbeat(
- MavType.MavTypeGeneric.toByte,
- MavAutopilot.MavAutopilotGeneric.toByte,
- 0, //no base mode
- 0, //no custom mode
- MavState.MavStateActive.toByte,
- 0 //TODO properly implement read-only fields
- )
- )
- )
- }
- }
-
- def _closed: Receive = {
-
- case Connection.Register =>
- register(sender)
- IO(Serial) ! Serial.Open(port, settings)
- context become opening
-
- case Connection.Send(_) =>
- IO(Serial) ! Serial.Open(port, settings)
- context become opening
-
- }
-
- def _opening: Receive = {
-
- case Serial.CommandFailed(cmd: Serial.Open, reason) =>
- sendAll(Connection.Closed(reason.toString))
- context become closed
-
- case Serial.Opened(_) =>
- context watch (sender)
- context become opened(sender)
-
- case Connection.Send(_) => () // ignore
- /*
- * During opening, any outgoing messages are discarded.
- * By using some kind of message stashing, maybe messages could be treated
- * once the port has been opened. However, in such a case failure also needs
- * to be considered, thus complicating the protocol. Since opening is typically
- * quite fast and since mavlink uses heartbeats and acknowledgements (in certain
- * circumstances) anyway, keeping messages is not really required.
- */
-
- }
-
- def _opened(operator: ActorRef): Receive = {
-
- case Terminated(`operator`) =>
- sendAll(Connection.Closed("Serial connection crashed."))
- context become closed
-
- case Serial.Closed =>
- sendAll(Connection.Closed("Serial connection was closed."))
- context become closed
-
- case Serial.Received(bstr) =>
- sendAll(Connection.Received(bstr))
- incoming.push(bstr)
-
- case Connection.Send(bstr) =>
- outgoing.push(bstr)
- //no sending is currently enabled
-
- }
-
- def closed = _closed orElse handleRegistration
- def opening = _opening orElse handleRegistration
- def opened(op: ActorRef) = _opened(op) orElse handleRegistration
- override def receive = closed
-
-}
-
-object SerialConnection {
- def apply(
- systemId: Byte,
- componentId: Byte,
- heartbeatInterval: Int,
- port: String,
- baud: Int,
- tsb: Boolean,
- parity: Int): Props = {
-
- val settings = SerialSettings(
- baud,
- 8,
- tsb,
- parity match {
- case 0 => Parity.None
- case 1 => Parity.Odd
- case 2 => Parity.Even
- })
- val hb = if (heartbeatInterval == 0) None else Some(FiniteDuration(heartbeatInterval, MILLISECONDS))
-
- Props(classOf[SerialConnection], systemId, componentId, hb, port, settings)
- }
-}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
index a1ef333..37f94b6 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
@@ -1,6 +1,7 @@
package mavigator
package uav
+import mock._
import akka._
import akka.actor._
import akka.util._
@@ -14,7 +15,7 @@ class Uav(system: ExtendedActorSystem) extends Extension {
val t = scala.concurrent.duration.FiniteDuration(100, "ms")
Flow.fromSinkAndSource(
Sink.ignore,
- Source.tick(t,t, ByteString("hello world"))
+ (new MockConnection(0,0,1)).data
)
}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
index f0ad972..94c1d40 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
@@ -2,78 +2,82 @@ package mavigator
package uav
package mock
-import java.util.concurrent.TimeUnit.MILLISECONDS
-import scala.concurrent.duration.FiniteDuration
-import scala.util.Random
-import org.mavlink.Packet
-import org.mavlink.enums.MavAutopilot
-import org.mavlink.enums.MavModeFlag
-import org.mavlink.enums.MavState
-import org.mavlink.enums.MavType
-import org.mavlink.messages.Heartbeat
-import Connection.Received
-import akka.actor.Actor
-import akka.actor.ActorLogging
-import akka.actor.Props
-import akka.util.ByteString
-import scala.concurrent.duration._
import org.mavlink.messages.Message
-import mock.RandomFlightPlan
+import akka.stream.scaladsl._
+import scala.concurrent.duration._
+import scala.util.Random
+import akka.stream._
+import akka.NotUsed
+import akka.util._
+import org.mavlink._
class MockConnection(
- localSystemId: Byte,
- localComponentId: Byte,
remoteSystemId: Byte,
- prescaler: Int
-)
- extends Actor with ActorLogging with Connection with MavlinkUtil {
+ remoteComponentId: Byte,
+ prescaler: Double
+) {
+ import MockConnection._
- import context._
+ private def stream(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = {
+ val dt = delaySeconds / prescaler
+ Flow[RandomFlightPlan].throttle(
+ elements = 1,
+ per = dt.seconds,
+ maximumBurst = 1,
+ ThrottleMode.Shaping
+ ).map(message)
+ }
- override val systemId = remoteSystemId
- override val componentId = remoteSystemId
+ def foo(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = {
+ import GraphDSL.Implicits._
- val plan = new RandomFlightPlan
+ Source.fromGraph(GraphDSL.create() { implicit b =>
- def scheduleMessage(delay: FiniteDuration)(fct: => Message) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(assemble(fct)))
- }
- def scheduleBytes(delay: FiniteDuration)(fct: => Array[Byte]) = system.scheduler.schedule(delay, delay) {
- sendAll(Received(ByteString(fct)))
+ val plan = new RandomFlightPlan
+
+ //graph components
+ val clock = Source.tick(1.seconds, 0.01.seconds, plan).map{plan =>
+ plan.tick(0.01)
+ plan
+ }
+ val bcast = b.add(Broadcast[RandomFlightPlan](messages.length))
+ val merge = b.add(Merge[Message](messages.length))
+
+ clock ~> bcast
+ for (message <- messages) {
+ bcast ~> message ~> merge
+ }
+
+ SourceShape(merge.out)
+ })
}
- override def preStart() = {
- //increment state
- system.scheduler.schedule(0.01.seconds * prescaler, 0.01.seconds * prescaler) { plan.tick(0.01) }
+ val messages: Source[Message, _] = foo(
+ stream(0.2)(_.position),
+ stream(0.05)(_.attitude),
+ stream(0.05)(_.motors),
+ stream(0.1)(_.distance)
+ )
- //send messages
- scheduleMessage(0.1.seconds * prescaler)(plan.position)
- scheduleMessage(0.05.seconds * prescaler)(plan.attitude)
- scheduleMessage(0.05.seconds * prescaler)(plan.motors)
- scheduleMessage(0.1.seconds * prescaler)(plan.distance)
- scheduleMessage(1.seconds)(plan.heartbeat)
- //simulate noisy line
- scheduleBytes(0.3.seconds * prescaler)(MockPackets.invalidCrc)
- scheduleBytes(1.5.seconds * prescaler)(MockPackets.invalidOverflow)
+ private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId)
+
+ /** Assembles a message into a bytestring representing a packet sent from this connection. */
+ def assemble(message: Message): ByteString = {
+ val (messageId, payload) = Message.pack(message)
+ val packet: Packet = assembler.assemble(messageId, payload)
+ ByteString(packet.toArray)
}
- override def receive = handleRegistration
+ val data = messages.map{ msg =>
+ println(msg)
+ assemble(msg)
+ }
}
object MockConnection {
- def apply(systemId: Byte, componentId: Byte, remoteSystemId: Byte, prescaler: Int = 1) =
- Props(classOf[MockConnection], systemId, componentId, remoteSystemId, prescaler)
-}
-object MockPackets {
- val invalidCrc = Array(254, 1, 123, 13, 13).map(_.toByte)
- val invalidOverflow = {
- val data = Array.fill[Byte](Packet.MaxPayloadLength + 100)(42)
- data(0) = -2
- data(1) = 2
- data(1) = -1
- data
- }
+ final val T0 = 1.seconds
+
}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
index 4a6520f..f24f090 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/RandomFlightPlan.scala
@@ -8,7 +8,7 @@ import org.mavlink.enums._
import org.mavlink.messages._
class RandomFlightPlan {
-
+
private var time: Double = 0 //current time in seconds
private def millis = (time * 1000).toInt
private def micros = (time * 1E6).toInt