aboutsummaryrefslogtreecommitdiff
path: root/mavigator-uav/src
diff options
context:
space:
mode:
Diffstat (limited to 'mavigator-uav/src')
-rw-r--r--mavigator-uav/src/main/resources/reference.conf23
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Backend.scala8
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Core.scala100
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled20
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/Uav.scala36
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala (renamed from mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala)37
-rw-r--r--mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala58
7 files changed, 223 insertions, 59 deletions
diff --git a/mavigator-uav/src/main/resources/reference.conf b/mavigator-uav/src/main/resources/reference.conf
index 3b43d37..5e66de3 100644
--- a/mavigator-uav/src/main/resources/reference.conf
+++ b/mavigator-uav/src/main/resources/reference.conf
@@ -1,30 +1,25 @@
# Settings related to the connection with a UAV
mavigator.uav {
+
# The type of connection to use
# 'mock' or 'serial'
- type = mock
-
- # Mavlink component ID used by this connection,
- # in case it needs to inject messages. I.e. heartbeats
- # will originate from this ID.
- component_id = 1
-
- # Interval in milliseconds between heartbeat messages injected by
- # the connection
- # 0 = no heartbeats injected
- heartbeat = 2000
+ type = serial
# Settings related to serial connections
serial {
# Serial port
- port = "/dev/ttyUSB0"
+ port = "/dev/ttyACM0"
# Baud rate (b/s)
- baud = 115200
+ baud = 57600
# Use two stop bits
two_stop_bits = false
# Parity check
# 0 = None, 1 = Odd, 2 = Even
parity = 0
+
+ # Delay between detection of serial port and attempt to open it.
+ # Set this to provide time for the device to initialise.
+ connection_delay = 1000
}
# Settings related to mock connections
@@ -32,7 +27,7 @@ mavigator.uav {
# Mavlink system ID of the simulated UAV
remote_system_id = 42
# Mavlink component ID of the simulated UAV
- remote_system_id = 0
+ remote_component_id = 0
# Divide simulated message frequency
prescaler = 1
}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala
new file mode 100644
index 0000000..84b9673
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala
@@ -0,0 +1,8 @@
+package mavigator
+package uav
+
+trait Backend {
+
+ def init(core: Core): Unit
+
+}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
new file mode 100644
index 0000000..4418dae
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala
@@ -0,0 +1,100 @@
+package mavigator
+package uav
+
+import akka.NotUsed
+import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props}
+import akka.stream.Materializer
+import akka.stream.actor.{ActorPublisher, ActorPublisherMessage}
+import akka.stream.scaladsl.{Flow, Sink, Source}
+import akka.util.ByteString
+import org.reactivestreams.{Publisher, Subscriber}
+
+/** A core enables dynamic creation and removal of clients and backend connections.
+ * It is essentially a dynamic stream multiplexer. */
+class Core(implicit val system: ActorSystem, val materializer: Materializer) {
+ import Core._
+
+ /** The actor responsible for forwarding messages from the backend
+ * to this core. */
+ private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher())
+
+ /** Publisher that forwards messages received from the backend. */
+ private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor)
+
+ private lazy val backend = Flow.fromSinkAndSourceMat(
+ Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser
+ Source.fromPublisher(backendPublisher)
+ )((toBackend, fromBackend) => (toBackend, fromBackend))
+
+ private lazy val clients = Flow.fromSinkAndSourceMat(
+ Sink.asPublisher[ByteString](true),
+ Source.asSubscriber[ByteString]
+ )((toClient, fromClient) => (toClient, fromClient))
+
+ private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) =>
+ toClient
+ }
+
+ private lazy val clientPublisher: Publisher[ByteString] = {
+ system.log.info("Core started.")
+ runnable.run()
+ }
+
+ def setBackend(): Flow[ByteString, ByteString, NotUsed] = {
+ val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete)
+ val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser
+ Flow.fromSinkAndSource(sink, source)
+ }
+
+ def connect(): Flow[ByteString, ByteString, NotUsed] = {
+ Flow.fromSinkAndSource(
+ Sink.ignore,
+ Source.fromPublisher(clientPublisher)
+ )
+ }
+
+}
+
+object Core {
+
+ private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging {
+ import akka.stream.actor.ActorPublisherMessage._
+
+ override def preStart() = {
+ log.info("Starting backend publisher actor...")
+ }
+
+ private var fromBackend: ByteString = null
+
+ def receive = {
+
+ case msg: ByteString =>
+ fromBackend = msg
+ deliver()
+
+ case BackendPublisher.BackendComplete =>
+ sys.error("Backend completed normally, this should not happen.")
+
+ // subscriber requests more
+ case ActorPublisherMessage.Request(_) =>
+ deliver()
+
+ //subscriber cancels
+ case ActorPublisherMessage.Cancel =>
+ sys.error("Subscriber cancelled backend stream, this should not happen.")
+
+ }
+
+ def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) {
+ onNext(fromBackend)
+ fromBackend = null
+ }
+
+ }
+
+ private object BackendPublisher {
+ case object BackendComplete
+ def apply(): Props = Props(classOf[BackendPublisher])
+ }
+
+}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled
deleted file mode 100644
index 5e48ea1..0000000
--- a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled
+++ /dev/null
@@ -1,20 +0,0 @@
-package mavigator
-package uav
-
-import akka.stream.scaladsl._
-import akka.stream._
-import org.reactivestreams._
-
-private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) {
-
- private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat(
- Sink.asPublisher[Out](fanout = true),
- Source.asSubscriber[In])((pub, sub) => (pub, sub))
-
- private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run()
-
- def connect(client: Flow[Out, In, _]) = {
- Source.fromPublisher(publisher).via(client).to(Sink.ignore).run()
- }
-
-}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
index 8f8c083..1387983 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala
@@ -2,40 +2,38 @@ package mavigator
package uav
import java.lang.IllegalArgumentException
-import mock._
+
import akka._
import akka.actor._
-import akka.util._
+import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
+import akka.util._
+import mock._
+import serial._
+
+//TODO: the whole backend system feels hacky, it probably needs a major redesign
class Uav(system: ExtendedActorSystem) extends Extension {
+ private val materializer = ActorMaterializer()(system)
+
private lazy val config = system.settings.config.getConfig("mavigator.uav")
private lazy val tpe = config.getString("type")
- private lazy val componentId = config.getInt("componentId").toByte
- private lazy val heartbeat = config.getInt("heartbeat")
- private lazy val connection = config.getConfig(tpe)
- lazy val source = tpe match {
- case "mock" =>
- new MockConnection(
- connection.getInt("remote_system_id").toByte,
- componentId,
- connection.getDouble("prescaler")
- )
-
- case "serial" => ???
+ private lazy val core = new Core()(system, materializer)
+ lazy val backend: Backend = tpe match {
+ case "mock" => MockBackend
+ case "serial" => SerialBackend
case _ => throw new IllegalArgumentException(s"Unsupported connection type: $tpe")
}
- def connect(): Flow[ByteString, ByteString, NotUsed] = {
- Flow.fromSinkAndSource(
- Sink.ignore,
- (new MockConnection(0,0,1)).data //TODO: use source instead of hardcoded value
- )
+ def init(): Unit = {
+ backend.init(core)
}
+ def connect(): Flow[ByteString, ByteString, NotUsed] = core.connect()
+
}
object Uav extends ExtensionId[Uav] with ExtensionIdProvider {
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala
index 58b4977..0b89fbd 100644
--- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala
+++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala
@@ -10,15 +10,16 @@ import akka.stream.Attributes._
import akka.stream.scaladsl._
import akka.util._
import org.mavlink._
-import org.mavlink.messages.{Heartbeat, Message}
+import org.mavlink.messages.Message
-class MockConnection(
+/** A test connection that produces random MAVLink messages. */
+class MockBackend(
remoteSystemId: Byte,
remoteComponentId: Byte,
prescaler: Double
) {
- import MockConnection._
+ import MockBackend._
private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId)
@@ -35,7 +36,7 @@ class MockConnection(
delayed(0.1)(_.distance)
)
- val data: Source[ByteString, NotUsed] = messages.map{ message =>
+ private val data: Source[ByteString, NotUsed] = messages.map{ message =>
val (messageId, payload) = Message.pack(message)
val packet = assembler.assemble(messageId, payload)
ByteString(packet.toArray)
@@ -43,7 +44,7 @@ class MockConnection(
}
-object MockConnection {
+object MockBackend extends Backend {
final val ClockTick: FiniteDuration = 0.02.seconds
@@ -52,7 +53,7 @@ object MockConnection {
Source.fromGraph(GraphDSL.create() { implicit b =>
val clock = Source.tick(ClockTick, ClockTick, plan) map { plan =>
- plan.tick(0.01)
+ plan.tick(ClockTick.toMillis / 1000.0)
plan
}
val bcast = b.add(Broadcast[RandomFlightPlan](messages.length))
@@ -67,4 +68,28 @@ object MockConnection {
})
}
+
+ override def init(core: Core) = {
+ import core.materializer
+ import core.system
+
+ system.log.info("Initializing mock backend...")
+
+ val config = system.settings.config.getConfig("mavigator.uav.mock")
+
+ val mock = new MockBackend(
+ config.getInt("remote_system_id").toByte,
+ config.getInt("remote_component_id").toByte,
+ config.getDouble("prescaler")
+ )
+
+ val mockFlow = Flow.fromSinkAndSource(
+ Sink.ignore,
+ mock.data
+ )
+
+ (mockFlow join core.setBackend()).run()
+
+ }
+
}
diff --git a/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala
new file mode 100644
index 0000000..a52d238
--- /dev/null
+++ b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala
@@ -0,0 +1,58 @@
+package mavigator
+package uav
+package serial
+
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.{Failure, Success}
+
+import akka.NotUsed
+import akka.stream.scaladsl.{Flow, Keep}
+import akka.util.ByteString
+import com.github.jodersky.flow.{Parity, SerialSettings}
+import com.github.jodersky.flow.stream.Serial
+import com.github.jodersky.flow.stream.Serial.Connection
+
+object SerialBackend extends Backend {
+
+ override def init(core: Core): Unit = {
+ import core.materializer
+ import core.system
+ import core.system.dispatcher
+
+ system.log.info("Initializing serial backend...")
+
+ val conf = system.settings.config.getConfig("mavigator.uav.serial")
+ val port = conf.getString("port")
+ val serialSettings = SerialSettings(
+ baud = conf.getInt("baud"),
+ twoStopBits = conf.getBoolean("two_stop_bits"),
+ parity = Parity(conf.getInt("parity"))
+ )
+
+ val connectionDelay = conf.getInt("connection_delay").millis
+
+ system.log.info("Waiting for serial device on " + port + "...")
+ Serial().watch(Set(port)).map{ port =>
+ system.log.info("Serial device connected on port " + port)
+ port
+ }.delay(connectionDelay).runForeach{ port =>
+ system.log.info("Opening serial port " + port)
+
+ val backend: Flow[ByteString, ByteString, NotUsed] = core.setBackend()
+
+ val uav: Flow[ByteString, ByteString, Future[Connection]] = Serial().open(port, serialSettings)
+
+ val connection = uav.joinMat(backend)(Keep.left).run().onComplete{
+ case Success(connection) =>
+ system.log.info("Successfully opened serial port " + connection.port)
+ case Failure(ex) =>
+ system.log.error(ex, "Error occurred while trying to open " + port)
+ }
+
+ }
+
+
+ }
+
+}