From 05f2776ccb9989dbc359276c5c36c46d8282fc8f Mon Sep 17 00:00:00 2001 From: Jakob Odersky Date: Mon, 18 Apr 2016 03:15:43 -0700 Subject: Implement serial backend --- mavigator-uav/build.sbt | 1 + mavigator-uav/src/main/resources/reference.conf | 23 ++--- .../src/main/scala/mavigator/uav/Backend.scala | 8 ++ .../src/main/scala/mavigator/uav/Core.scala | 100 +++++++++++++++++++++ .../scala/mavigator/uav/Multiplexer.scala.disabled | 20 ----- .../src/main/scala/mavigator/uav/Uav.scala | 36 ++++---- .../scala/mavigator/uav/mock/MockBackend.scala | 95 ++++++++++++++++++++ .../scala/mavigator/uav/mock/MockConnection.scala | 70 --------------- .../scala/mavigator/uav/serial/SerialBackend.scala | 58 ++++++++++++ 9 files changed, 288 insertions(+), 123 deletions(-) create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Backend.scala create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Core.scala delete mode 100644 mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala delete mode 100644 mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala create mode 100644 mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala (limited to 'mavigator-uav') diff --git a/mavigator-uav/build.sbt b/mavigator-uav/build.sbt index abe935b..c2b97c6 100644 --- a/mavigator-uav/build.sbt +++ b/mavigator-uav/build.sbt @@ -7,5 +7,6 @@ libraryDependencies ++= Seq( Dependencies.akkaStream, Dependencies.flow, Dependencies.flowNative, + Dependencies.flowStream, Dependencies.reactiveStreams ) diff --git a/mavigator-uav/src/main/resources/reference.conf b/mavigator-uav/src/main/resources/reference.conf index 3b43d37..5e66de3 100644 --- a/mavigator-uav/src/main/resources/reference.conf +++ b/mavigator-uav/src/main/resources/reference.conf @@ -1,30 +1,25 @@ # Settings related to the connection with a UAV mavigator.uav { + # The type of connection to use # 'mock' or 'serial' - type = mock - - # Mavlink component ID used by this connection, - # in case it needs to inject messages. I.e. heartbeats - # will originate from this ID. - component_id = 1 - - # Interval in milliseconds between heartbeat messages injected by - # the connection - # 0 = no heartbeats injected - heartbeat = 2000 + type = serial # Settings related to serial connections serial { # Serial port - port = "/dev/ttyUSB0" + port = "/dev/ttyACM0" # Baud rate (b/s) - baud = 115200 + baud = 57600 # Use two stop bits two_stop_bits = false # Parity check # 0 = None, 1 = Odd, 2 = Even parity = 0 + + # Delay between detection of serial port and attempt to open it. + # Set this to provide time for the device to initialise. + connection_delay = 1000 } # Settings related to mock connections @@ -32,7 +27,7 @@ mavigator.uav { # Mavlink system ID of the simulated UAV remote_system_id = 42 # Mavlink component ID of the simulated UAV - remote_system_id = 0 + remote_component_id = 0 # Divide simulated message frequency prescaler = 1 } diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala new file mode 100644 index 0000000..84b9673 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Backend.scala @@ -0,0 +1,8 @@ +package mavigator +package uav + +trait Backend { + + def init(core: Core): Unit + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Core.scala b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala new file mode 100644 index 0000000..4418dae --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/Core.scala @@ -0,0 +1,100 @@ +package mavigator +package uav + +import akka.NotUsed +import akka.actor.{ActorLogging, ActorRef, ActorSystem, Props} +import akka.stream.Materializer +import akka.stream.actor.{ActorPublisher, ActorPublisherMessage} +import akka.stream.scaladsl.{Flow, Sink, Source} +import akka.util.ByteString +import org.reactivestreams.{Publisher, Subscriber} + +/** A core enables dynamic creation and removal of clients and backend connections. + * It is essentially a dynamic stream multiplexer. */ +class Core(implicit val system: ActorSystem, val materializer: Materializer) { + import Core._ + + /** The actor responsible for forwarding messages from the backend + * to this core. */ + private lazy val backendPublisherActor: ActorRef = system.actorOf(BackendPublisher()) + + /** Publisher that forwards messages received from the backend. */ + private lazy val backendPublisher: Publisher[ByteString] = ActorPublisher(backendPublisherActor) + + private lazy val backend = Flow.fromSinkAndSourceMat( + Sink.ignore, //FIXME: this will need to be changed for controlling the uav from the browser + Source.fromPublisher(backendPublisher) + )((toBackend, fromBackend) => (toBackend, fromBackend)) + + private lazy val clients = Flow.fromSinkAndSourceMat( + Sink.asPublisher[ByteString](true), + Source.asSubscriber[ByteString] + )((toClient, fromClient) => (toClient, fromClient)) + + private lazy val runnable = clients.joinMat(backend){ case ((toClient, fromClient), (toBackend, fromBackend)) => + toClient + } + + private lazy val clientPublisher: Publisher[ByteString] = { + system.log.info("Core started.") + runnable.run() + } + + def setBackend(): Flow[ByteString, ByteString, NotUsed] = { + val sink = Sink.actorRef(backendPublisherActor, BackendPublisher.BackendComplete) + val source = Source.empty[ByteString] //FIXME: this will need to be changed for controlling the uav from the browser + Flow.fromSinkAndSource(sink, source) + } + + def connect(): Flow[ByteString, ByteString, NotUsed] = { + Flow.fromSinkAndSource( + Sink.ignore, + Source.fromPublisher(clientPublisher) + ) + } + +} + +object Core { + + private class BackendPublisher extends ActorPublisher[ByteString] with ActorLogging { + import akka.stream.actor.ActorPublisherMessage._ + + override def preStart() = { + log.info("Starting backend publisher actor...") + } + + private var fromBackend: ByteString = null + + def receive = { + + case msg: ByteString => + fromBackend = msg + deliver() + + case BackendPublisher.BackendComplete => + sys.error("Backend completed normally, this should not happen.") + + // subscriber requests more + case ActorPublisherMessage.Request(_) => + deliver() + + //subscriber cancels + case ActorPublisherMessage.Cancel => + sys.error("Subscriber cancelled backend stream, this should not happen.") + + } + + def deliver() = if (fromBackend != null && totalDemand > 0 && isActive) { + onNext(fromBackend) + fromBackend = null + } + + } + + private object BackendPublisher { + case object BackendComplete + def apply(): Props = Props(classOf[BackendPublisher]) + } + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled b/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled deleted file mode 100644 index 5e48ea1..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/Multiplexer.scala.disabled +++ /dev/null @@ -1,20 +0,0 @@ -package mavigator -package uav - -import akka.stream.scaladsl._ -import akka.stream._ -import org.reactivestreams._ - -private[uav] class Multiplexer[In, Out](service: Flow[In, Out, _])(implicit materializer: Materializer) { - - private val endpoint: Flow[Out, In, (Publisher[Out], Subscriber[In])] = Flow.fromSinkAndSourceMat( - Sink.asPublisher[Out](fanout = true), - Source.asSubscriber[In])((pub, sub) => (pub, sub)) - - private lazy val (publisher, subscriber) = (service.joinMat(endpoint)(Keep.right)).run() - - def connect(client: Flow[Out, In, _]) = { - Source.fromPublisher(publisher).via(client).to(Sink.ignore).run() - } - -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala index 8f8c083..1387983 100644 --- a/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala +++ b/mavigator-uav/src/main/scala/mavigator/uav/Uav.scala @@ -2,40 +2,38 @@ package mavigator package uav import java.lang.IllegalArgumentException -import mock._ + import akka._ import akka.actor._ -import akka.util._ +import akka.stream.ActorMaterializer import akka.stream.scaladsl._ +import akka.util._ +import mock._ +import serial._ + +//TODO: the whole backend system feels hacky, it probably needs a major redesign class Uav(system: ExtendedActorSystem) extends Extension { + private val materializer = ActorMaterializer()(system) + private lazy val config = system.settings.config.getConfig("mavigator.uav") private lazy val tpe = config.getString("type") - private lazy val componentId = config.getInt("componentId").toByte - private lazy val heartbeat = config.getInt("heartbeat") - private lazy val connection = config.getConfig(tpe) - lazy val source = tpe match { - case "mock" => - new MockConnection( - connection.getInt("remote_system_id").toByte, - componentId, - connection.getDouble("prescaler") - ) - - case "serial" => ??? + private lazy val core = new Core()(system, materializer) + lazy val backend: Backend = tpe match { + case "mock" => MockBackend + case "serial" => SerialBackend case _ => throw new IllegalArgumentException(s"Unsupported connection type: $tpe") } - def connect(): Flow[ByteString, ByteString, NotUsed] = { - Flow.fromSinkAndSource( - Sink.ignore, - (new MockConnection(0,0,1)).data //TODO: use source instead of hardcoded value - ) + def init(): Unit = { + backend.init(core) } + def connect(): Flow[ByteString, ByteString, NotUsed] = core.connect() + } object Uav extends ExtensionId[Uav] with ExtensionIdProvider { diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala new file mode 100644 index 0000000..0b89fbd --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockBackend.scala @@ -0,0 +1,95 @@ +package mavigator +package uav +package mock + +import scala.concurrent.duration._ + +import akka.NotUsed +import akka.stream._ +import akka.stream.Attributes._ +import akka.stream.scaladsl._ +import akka.util._ +import org.mavlink._ +import org.mavlink.messages.Message + + +/** A test connection that produces random MAVLink messages. */ +class MockBackend( + remoteSystemId: Byte, + remoteComponentId: Byte, + prescaler: Double +) { + import MockBackend._ + + private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId) + + private def delayed(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { + val dt = delaySeconds / prescaler + Flow[RandomFlightPlan].withAttributes(inputBuffer(1,1)).delay(dt.seconds).map(message) + } + + private val messages: Source[Message, NotUsed] = fromPlan(new RandomFlightPlan)( + delayed(2)(_.heartbeat), + delayed(0.2)(_.position), + delayed(0.05)(_.attitude), + delayed(0.05)(_.motors), + delayed(0.1)(_.distance) + ) + + private val data: Source[ByteString, NotUsed] = messages.map{ message => + val (messageId, payload) = Message.pack(message) + val packet = assembler.assemble(messageId, payload) + ByteString(packet.toArray) + } + +} + +object MockBackend extends Backend { + + final val ClockTick: FiniteDuration = 0.02.seconds + + private def fromPlan(plan: RandomFlightPlan)(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { + import GraphDSL.Implicits._ + Source.fromGraph(GraphDSL.create() { implicit b => + + val clock = Source.tick(ClockTick, ClockTick, plan) map { plan => + plan.tick(ClockTick.toMillis / 1000.0) + plan + } + val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) + val merge = b.add(Merge[Message](messages.length)) + + clock ~> bcast + for (message <- messages) { + bcast ~> message ~> merge + } + + SourceShape(merge.out) + }) + } + + + override def init(core: Core) = { + import core.materializer + import core.system + + system.log.info("Initializing mock backend...") + + val config = system.settings.config.getConfig("mavigator.uav.mock") + + val mock = new MockBackend( + config.getInt("remote_system_id").toByte, + config.getInt("remote_component_id").toByte, + config.getDouble("prescaler") + ) + + val mockFlow = Flow.fromSinkAndSource( + Sink.ignore, + mock.data + ) + + (mockFlow join core.setBackend()).run() + + } + +} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala b/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala deleted file mode 100644 index 58b4977..0000000 --- a/mavigator-uav/src/main/scala/mavigator/uav/mock/MockConnection.scala +++ /dev/null @@ -1,70 +0,0 @@ -package mavigator -package uav -package mock - -import scala.concurrent.duration._ - -import akka.NotUsed -import akka.stream._ -import akka.stream.Attributes._ -import akka.stream.scaladsl._ -import akka.util._ -import org.mavlink._ -import org.mavlink.messages.{Heartbeat, Message} - - -class MockConnection( - remoteSystemId: Byte, - remoteComponentId: Byte, - prescaler: Double -) { - import MockConnection._ - - private lazy val assembler = new Assembler(remoteSystemId, remoteComponentId) - - private def delayed(delaySeconds: Double)(message: RandomFlightPlan => Message): Flow[RandomFlightPlan, Message, NotUsed] = { - val dt = delaySeconds / prescaler - Flow[RandomFlightPlan].withAttributes(inputBuffer(1,1)).delay(dt.seconds).map(message) - } - - private val messages: Source[Message, NotUsed] = fromPlan(new RandomFlightPlan)( - delayed(2)(_.heartbeat), - delayed(0.2)(_.position), - delayed(0.05)(_.attitude), - delayed(0.05)(_.motors), - delayed(0.1)(_.distance) - ) - - val data: Source[ByteString, NotUsed] = messages.map{ message => - val (messageId, payload) = Message.pack(message) - val packet = assembler.assemble(messageId, payload) - ByteString(packet.toArray) - } - -} - -object MockConnection { - - final val ClockTick: FiniteDuration = 0.02.seconds - - private def fromPlan(plan: RandomFlightPlan)(messages: Flow[RandomFlightPlan, Message, _]*): Source[Message, NotUsed] = { - import GraphDSL.Implicits._ - Source.fromGraph(GraphDSL.create() { implicit b => - - val clock = Source.tick(ClockTick, ClockTick, plan) map { plan => - plan.tick(0.01) - plan - } - val bcast = b.add(Broadcast[RandomFlightPlan](messages.length)) - val merge = b.add(Merge[Message](messages.length)) - - clock ~> bcast - for (message <- messages) { - bcast ~> message ~> merge - } - - SourceShape(merge.out) - }) - } - -} diff --git a/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala new file mode 100644 index 0000000..a52d238 --- /dev/null +++ b/mavigator-uav/src/main/scala/mavigator/uav/serial/SerialBackend.scala @@ -0,0 +1,58 @@ +package mavigator +package uav +package serial + +import scala.concurrent.Future +import scala.concurrent.duration._ +import scala.util.{Failure, Success} + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Keep} +import akka.util.ByteString +import com.github.jodersky.flow.{Parity, SerialSettings} +import com.github.jodersky.flow.stream.Serial +import com.github.jodersky.flow.stream.Serial.Connection + +object SerialBackend extends Backend { + + override def init(core: Core): Unit = { + import core.materializer + import core.system + import core.system.dispatcher + + system.log.info("Initializing serial backend...") + + val conf = system.settings.config.getConfig("mavigator.uav.serial") + val port = conf.getString("port") + val serialSettings = SerialSettings( + baud = conf.getInt("baud"), + twoStopBits = conf.getBoolean("two_stop_bits"), + parity = Parity(conf.getInt("parity")) + ) + + val connectionDelay = conf.getInt("connection_delay").millis + + system.log.info("Waiting for serial device on " + port + "...") + Serial().watch(Set(port)).map{ port => + system.log.info("Serial device connected on port " + port) + port + }.delay(connectionDelay).runForeach{ port => + system.log.info("Opening serial port " + port) + + val backend: Flow[ByteString, ByteString, NotUsed] = core.setBackend() + + val uav: Flow[ByteString, ByteString, Future[Connection]] = Serial().open(port, serialSettings) + + val connection = uav.joinMat(backend)(Keep.left).run().onComplete{ + case Success(connection) => + system.log.info("Successfully opened serial port " + connection.port) + case Failure(ex) => + system.log.error(ex, "Error occurred while trying to open " + port) + } + + } + + + } + +} -- cgit v1.2.3